# mysql_to_es **Repository Path**: slcnx/mysql_to_es ## Basic Information - **Project Name**: mysql_to_es - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-04-21 - **Last Updated**: 2025-04-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MySQL到Elasticsearch同步工具 这是一个轻量级的MySQL到Elasticsearch数据同步工具,支持配置化、增量同步、数据处理等功能。 ## 特性 - 支持配置文件(Python或YAML格式) - 支持自定义SQL查询 - **智能批量处理:自动检测大型表并基于AST分批查询** - 支持多种操作(insert、update、upsert) - 支持分批处理大量数据 - 支持数据处理管道(设置ID、字段过滤、字段重命名等) - 增量同步(基于主键ID) - 详细的日志记录 - Docker 容器化支持 ## 已知问题 ### MySQL表行数估计误差 同步程序在决定是否分批处理数据时,会从`information_schema.TABLES`的`TABLE_ROWS`字段获取表行数估计值。这个值可能与实际行数有显著差异,导致以下问题: - 对于InnoDB表,`TABLE_ROWS`只是统计采样的估计值,不是精确计数 - 统计信息不会自动实时更新,需要执行`ANALYZE TABLE`命令刷新 - 小表和中等大小的表可能有较大的估计误差 - 刚导入的数据可能导致统计信息不准确 **解决方法**: 1. 对于重要表,定期执行`ANALYZE TABLE`更新统计信息 2. 如果需要精确计数,可修改代码使用`SELECT COUNT(*)`替代`TABLE_ROWS` 3. 对于已知的大表,可在配置中强制启用分批处理 ## 本地安装依赖 ```bash pip install pymysql elasticsearch pyyaml ``` ## 本地使用方法 1. 创建配置文件(支持Python或YAML格式) 2. 执行同步命令: ```bash python mysql_to_es_sync.py -c config.py ``` ## 智能批量处理 工具会自动检测需要批量处理的查询: 1. 对于不包含LIMIT子句的SELECT语句 2. 先检查表的记录数 3. **当表的记录数超过30万行且主键为int类型时,自动基于AST转换为分批查询** ```python # 简单的查询 "sql": "select * from large_table;" # 自动转换为(如果表大于30万行且主键为int类型) "sql": "select * from large_table WHERE id > {id} ORDER BY id LIMIT 8000;" ``` **注意:** 非整数类型主键的表(如UUID、字符串等)即使数据量很大,也不会进行分批处理,而是直接执行原始SQL查询。这样避免了对非整数类型使用范围查询可能导致的问题。 基于AST(抽象语法树)的SQL解析能够更精确地处理复杂SQL语句,自动添加WHERE和LIMIT条件,实现高效的分批读取MySQL数据并同步到ES。 这样用户无需担心大表带来的性能问题,工具会自动进行优化处理。 ## Docker 容器化使用 ### 准备工作 1. 创建配置文件目录 ```bash mkdir -p config data logs ``` 2. 将您的配置文件放入 config 目录 ```bash cp example_config.py config/config.py # 根据需要修改配置参数 ``` ### Docker 启动方式 #### 方式一:使用 Docker 命令直接启动 ```bash # 构建镜像 docker build -t mysql-to-es-sync . # 运行容器,需要准备 $(pwd)/config/config.py sudo docker run --name mysql-to-es-sync -v $(pwd)/config:/app/config -v $(pwd)/data:/app/data -v $(pwd)/logs:/app/logs harbor.songliangcheng.com.cn/library/mysql-to-es-sync:latest ``` #### 方式二:使用 Docker Compose 启动(推荐) ```bash # 构建镜像 docker compose build # 运行容器 docker compose up -d # 查看日志 docker compose logs -f ``` ### 自动配置创建 容器首次启动时,如果没有找到配置文件,会自动从示例配置创建一个。您可以在容器启动后修改该配置文件: ```bash # 修改自动创建的配置文件 docker exec -it mysql-to-es-sync vi /app/config/config.py # 重启容器使配置生效 docker restart mysql-to-es-sync ``` ### 环境变量配置 您可以通过环境变量自定义配置: ```bash docker run -d \ --name mysql-to-es-sync \ -v $(pwd)/config:/app/config \ -v $(pwd)/data:/app/data \ -v $(pwd)/logs:/app/logs \ -e CONFIG_FILE=/app/config/custom_config.py \ -e DATA_DIR=/app/custom_data \ -e LOGS_DIR=/app/custom_logs \ mysql-to-es-sync ``` 或在 docker-compose.yml 中设置: ```yaml services: mysql-to-es-sync: # ... environment: - CONFIG_FILE=/app/config/custom_config.py - DATA_DIR=/app/custom_data - LOGS_DIR=/app/custom_logs ``` ### 发布到镜像仓库 修改仓库地址和标签: ```bash # 设置环境变量 export DOCKER_REGISTRY=your-registry.com export TAG=v1.0.0 # 构建和推送 docker compose build docker compose push ``` 也可以直接在命令行指定: ```bash DOCKER_REGISTRY=your-registry.com TAG=v1.0.0 docker compose build DOCKER_REGISTRY=your-registry.com TAG=v1.0.0 docker compose push ``` ## 配置说明 ### Python格式配置示例 ```python # 数据库连接配置 CONNECTION = { 'host': 'localhost', 'port': 3306, 'user': 'root', 'passwd': 'password', 'connect_timeout': 300, 'read_timeout': 600, 'write_timeout': 600 } # 批量大小 BULK_SIZE = 3000 # Elasticsearch节点 NODES = ["http://user:pass@localhost:9200/"] # 同步任务 TASKS = [ { "stream": { "database": "mydb", # 对于大型表的简单查询将自动转换为分批处理 "sql": "select * from users;", "pk": {"field": "id", "type": "int"}, "batch_size": 8000 # 每批次处理的记录数 }, "jobs": [ { "actions": ["insert", "update"], "pipeline": [ {"set_id": {"field": "id"}}, {"only_fields": {"fields": ["id", "name", "email", "created_at"]}} ], "dest": { "es": { "action": "upsert", "index": "users", "type": "_doc", "nodes": NODES } } } ] } ] ``` ### 配置参数详解 #### CONNECTION 数据库连接配置: - `host`: MySQL主机地址 - `port`: MySQL端口 - `user`: 用户名 - `passwd`: 密码 - `connect_timeout`: 连接超时时间(秒) - `read_timeout`: 读取超时时间(秒) - `write_timeout`: 写入超时时间(秒) #### BULK_SIZE 每批次处理的数据量,默认为1000 #### NODES Elasticsearch节点列表,格式为: `["http://user:pass@host:port/"]` #### TASKS 同步任务配置列表,每个任务包含以下字段: ##### stream 数据流配置: - `database`: 数据库名 - `sql`: SQL查询语句,对于大表会自动进行分批 - `pk`: 主键配置,包含`field`(字段名)和`type`(字段类型,如`int`或`char`) - `batch_size`: 每批次处理的记录数量,默认为8000 ##### jobs 作业配置列表,每个作业包含: - `actions`: 支持的操作类型,如`["insert", "update"]` - `pipeline`: 数据处理管道配置 - `dest`: 目标配置,目前支持`es`(Elasticsearch) ##### pipeline 数据处理管道支持的操作: - `set_id`: 设置文档ID,如`{"set_id": {"field": "id"}}` - `only_fields`: 只保留指定字段,如`{"only_fields": {"fields": ["id", "name"]}}` - `rename_fields`: 字段重命名,如`{"rename_fields": {"mapping": {"old_name": "new_name"}}}` ##### dest.es Elasticsearch目标配置: - `action`: 操作类型,支持`index`、`update`、`upsert` - `index`: 索引名称 - `type`: 文档类型,通常为`_doc` - `nodes`: Elasticsearch节点列表(可选,默认使用全局`NODES`) ## 数据和日志 - **数据文件**:同步状态保存在 `data/sync_state.json` 中 - **日志文件**:日志保存在 `logs/mysql_to_es_sync.log` 中 Docker环境中,这些文件会保存在相应的挂载目录中。 ## 依赖安装 ```bash pip install mysql-connector-python elasticsearch tqdm ``` ### 可选依赖 为了获得更好的SQL解析支持,建议安装SQLGlot库: ```bash pip install sqlglot ``` SQLGlot提供了高级SQL解析功能,可以更准确地处理复杂SQL语句(包含GROUP BY、HAVING等子句)。如果未安装SQLGlot,系统将自动降级使用简单的字符串解析方法。 相关文档: - [SQLGlot官方文档](https://sqlglot.com/sqlglot.html) - [SQLGlot AST Primer指南](https://github.com/tobymao/sqlglot/blob/main/posts/ast_primer.md) ## 主键类型支持 同步工具支持不同类型的主键: 1. **整数主键** (默认) ```python "pk": {"field": "id", "type": "int"} ``` 2. **字符串主键** ```python "pk": {"field": "uuid", "type": "char"} ``` 3. **其他类型** ```python "pk": {"field": "custom_id", "type": "varchar"} ``` 对于非整数类型的主键,工具会自动在SQL条件中添加引号,例如:`WHERE uuid > 'last_uuid'`