# mysql-sync-os **Repository Path**: cocoknight/mysql-sync-os ## Basic Information - **Project Name**: mysql-sync-os - **Description**: MySQL 通过binlog日志同步OpenSearch - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-29 - **Last Updated**: 2026-01-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mysql binlog同步到es ### 步骤 #### 1、获取mysql表结构,并自动创建es索引 * 1.1、获取mysql表结构 * 1.2、检查es索引是否创建,如已创建,检查表结构hash是否变动,变动的话,则重建索引(字段新增仅新增该字段,并同步)(待完成) * 1.3、根据表结构,自动创建es索引 * 1.4、将表结构持久化缓存起来(待完成) * 1.5、使用消息队列批量写入ES * 1.6、使用分片hash对账法,每天晚上定时对账数据。(待完成) * 1.7、添加手动数据对账功能,或者异步同步功能(添加新表同步完数据后再修改表名)。(待完成) #### 2、监听mysql binlog日志,自动同步es索引 ## 依赖 > 1、mysql > 2、elasticsearch > 3、debezium > 4、disruptor ### OpenSearch安装 * 文档地址: https://docs.opensearch.org.cn/docs/latest/install-and-configure/install-opensearch/index/ * jdk8对应版本: https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.20/opensearch-1.3.20-windows-x64.zip * ik分词安装: ``` bash opensearch-plugin install https://get.infini.cloud/opensearch/analysis-ik/1.3.20 ``` * 配置: config\\opensearch.yml ``` yml http.port: 9920 plugins.security.disabled: true ``` # MySQL Binlog 同步到 OpenSearch 本项目是一个基于 Debezium 和 Disruptor 的高性能 MySQL Binlog 同步工具,能够实时将 MySQL 数据变更同步到 OpenSearch/Elasticsearch。 ## 功能特性 - **自动索引创建**: 根据 MySQL 表结构自动创建 OpenSearch 索引,支持表结构变更检测和增量更新 - **实时同步**: 通过 Debezium 监听 MySQL Binlog,实时捕获数据变更事件 - **高性能处理**: 使用 Disruptor 无锁队列实现高吞吐量的消息处理 - **批量写入**: 支持批量写入 OpenSearch,提高写入效率 - **重试机制**: 失败数据自动重试,确保数据最终一致性 - **尾数据检测**: 定时检测队列中的尾数据,确保数据及时写入 ## 技术栈 - **Spring Boot**: 应用框架 - **Debezium**: CDC (Change Data Capture) 组件,用于捕获 MySQL Binlog - **Disruptor**: 高性能并发框架,用于消息队列处理 - **OpenSearch**: 目标搜索引擎 - **MySQL**: 源数据库 ## 快速开始 ### 1. 环境要求 - JDK 8+ - MySQL 5.7+ / 8.0+ - OpenSearch 1.3.x ### 2. 安装 OpenSearch ```bash # 下载 OpenSearch (以 1.3.20 版本为例) wget https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.20/opensearch-1.3.20-linux-x64.tar.gz # 解压 tar -xzf opensearch-1.3.20-linux-x64.tar.gz # 安装 IK 分词插件 ./opensearch-plugin install https://get.infini.cloud/opensearch/analysis-ik/1.3.20 # 修改配置文件 opensearch.yml http.port: 9920 plugins.security.disabled: true # 启动 OpenSearch ./opensearch/bin/opensearch ``` ### 3. 配置 MySQL 确保 MySQL 已开启 Binlog,参考配置: ```ini [mysqld] server-id = 1 log_bin = /var/log/mysql/mysql-bin.log binlog_format = ROW expire_logs_days = 10 ``` ### 4. 配置应用 在 `application.yml` 中配置相关参数: ```yaml # OpenSearch 配置 open-search: host: localhost port: 9920 scheme: http prefix: sync_ # Debezium 配置 debezium: databases: - name: test_db enabled: true host: localhost port: 3306 username: root password: your_password server-id: 1 table: - your_table_name offset-path: /data/offsets history-path: /data/history # 批量队列配置 batch: queue: capacity: 1024 batch-size: 100 max-wait-time: 1000 tail-data-check-interval: 1000 thread-pool: core-pool-size: 4 max-pool-size: 8 queue-capacity: 512 ``` ### 5. 运行应用 ```bash # 编译 mvn clean package -DskipTests # 运行 java -jar mysql-sync-os.jar ``` ## 核心组件说明 ### InitIndex 应用启动时自动执行,负责: - 获取 MySQL 所有表结构 - 检查 OpenSearch 索引是否存在 - 如索引不存在或表结构已变更,自动创建/更新索引 - 将表结构持久化缓存 ### MysqlListener 使用 Debezium 监听 MySQL Binlog: - 接收 CDC 变更事件 - 解析变更数据 (INSERT/UPDATE/DELETE) - 将消息投递到 Disruptor 队列 ### BatchQueueService 基于 Disruptor 的高性能消息队列: - 无锁队列,高吞吐量 - 支持批量处理 - 尾数据定时检测 ### MessageBatchHandler 批量消息处理器: - 聚合消息,达到批量条件后同步 - 支持定时触发和数量触发 - 失败数据自动重试 - 异步写入 OpenSearch ### OpenSearchSyncService OpenSearch 同步服务: - 单条和批量数据同步 - 数据格式转换 (驼峰命名) - 批量写入和错误处理 ### DatabaseMetaService 数据库元数据服务: - 获取表结构信息 - 字段类型映射 - 表结构变更检测 ## 工作流程 1. **应用启动**: InitIndex 自动创建 OpenSearch 索引 2. **监听 binlog**: MysqlListener 启动 Debezium 引擎,监听 MySQL Binlog 3. **接收变更**: 收到变更事件后,解析并投递到 Disruptor 队列 4. **批量处理**: MessageBatchHandler 聚合消息,达到条件后批量写入 5. **同步到 ES**: OpenSearchSyncService 将数据写入 OpenSearch 6. **重试机制**: 失败数据自动重试,确保数据一致性 ## 注意事项 - 确保 MySQL 已开启 Binlog 且格式为 ROW - Debezium 偏移量文件路径需要有读写权限 - 建议配置足够的 Disruptor 队列容量以应对流量高峰 - 定期清理 Debezium 的历史文件和偏移量文件 ## License 本项目遵循开源协议。