# MySqlSynchronous **Repository Path**: yangjuncom/my-sql-synchronous ## Basic Information - **Project Name**: MySqlSynchronous - **Description**: MySQL数据实时同步程序: 本程序基于.Net 8 + Redis + Kafka(集群模式) + MySQL 开发。调试前需部署 Kafka 相关集群环境,建议通过 Docker 完成 Kafka 环境的部署与搭建 程序包含两个核心组件: MySqlSync.Producer(生产者):运行时请先启动生产者 MySqlSync.Consumer(消费者):再运行消费者进行数据消费处理 - **Primary Language**: C# - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-08-25 - **Last Updated**: 2025-08-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 一、项目介绍 MySqlSynchronous 是一款高性能的 MySQL 数据实时同步程序,旨在解决分布式系统中 MySQL 数据跨节点、跨服务的实时一致性问题。程序基于 .NET 8 框架开发,整合 Redis(用于缓存与状态管理)、Kafka(集群模式)(用于高可靠消息投递)与 MySQL(源 / 目标数据库)技术栈,确保数据同步的实时性、稳定性与可扩展性。 核心组件说明 程序包含两个强依赖的核心组件,需按固定顺序启动以保证同步流程正常运行: MySqlSync.Producer(生产者组件):负责监听 MySQL 源数据库的数据变更(如新增、修改、删除操作),将变更数据封装为标准化消息并发送至 Kafka 集群的指定主题(Topic)。运行时需优先启动生产者,确保数据变更能被实时捕获并投递。 MySqlSync.Consumer(消费者组件):从 Kafka 集群订阅指定主题,消费生产者投递的数据库变更消息,并根据预设逻辑将数据同步至目标 MySQL 数据库(或其他存储介质)。需在生产者启动成功后,再启动消费者进行数据消费处理。 二、软件架构 本项目采用 “生产者 - 消费者” 分布式架构,各组件职责与交互流程如下: 数据源层:MySQL 源数据库,产生数据变更的源头; 生产者层(MySqlSync.Producer):基于 .NET 8 开发,监听 MySQL 数据变更(可通过 Binlog 解析实现),将变更数据结构化后,通过 Kafka 客户端发送至 Kafka 集群; 消息中间件层:Kafka 集群,负责接收、存储生产者发送的变更消息,提供高吞吐、高可用的消息暂存与投递能力,避免因生产者 / 消费者临时故障导致数据丢失; 缓存层:Redis,用于存储同步任务状态、数据偏移量(如 Kafka 消费组的 offset)、分布式锁等,确保多消费者实例下的数据同步一致性,避免重复消费; 消费者层(MySqlSync.Consumer):基于 .NET 8 开发,订阅 Kafka 集群中的目标主题,拉取变更消息并进行解析、校验,最终将数据同步至目标 MySQL 数据库; 目标存储层:MySQL 目标数据库,接收消费者同步的数据,与源数据库保持实时数据一致性。 三、安装教程 1. 环境依赖准备 在部署程序前,需先完成以下基础环境的安装与配置: .NET 8 SDK:确保开发 / 部署机器已安装 .NET 8 SDK(用于编译、运行 .NET 程序); Docker:建议通过 Docker 快速部署 Kafka 集群与 Redis(避免手动配置复杂的集群环境),需提前安装 Docker 与 Docker Compose; MySQL:准备源数据库与目标数据库(版本建议 5.7+ 或 8.0+,需开启 Binlog 功能以支持数据变更监听,Binlog 格式建议设置为 ROW 模式)。 2. Kafka 集群部署(Docker 方式) 推荐使用 Docker Compose 一键部署 Kafka 集群(含 ZooKeeper,Kafka 依赖 ZooKeeper 管理集群元数据),步骤如下: 创建 docker-compose.yml 文件,内容参考如下(可根据需求调整节点数量、端口映射): # docker-compose.yaml services: kafka1: image: 'bitnami/kafka:3.6.1' container_name: kafka1 hostname: kafka1 ports: - '19092:19092' environment: # 凡是以KAFKA_CFG_开头的环境变量,在配置kafka的配置文件中都有对应的配置项。例如KAFKA_CFG_NODE_ID在配置文件中就是node.id # 集群ID,同一个集群中的所有节点都应该指定相同的集群ID,可以使用kafka-storage.bat random--uuid生成。 - KAFKA_KRAFT_CLUSTER_ID=K3QlUFqCQ66BXfkBs5K7Wg # 当process.roles不为空时,与之相关联的节点ID,KRaft模式下必须配置,同个集群内的节点ID不允许重复。 - KAFKA_CFG_NODE_ID=1 # 角色:'broker', 'controller'或'broker, controller'两者同时兼顾 - KAFKA_CFG_PROCESS_ROLES=broker # 定义Kafka服务端的监听列表,用','分隔,它的格式是{LISTENER}://{hostname}:{port} - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092 # 定义访问地址 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka1:9092, EXTERNAL://localhost:19092 # 配置监听器对应的安全协议 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT # 指定集群内所有角色为controller的所有节点,格式是{node.id}@{host}:{port} - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093 # 指定角色为controller监听器名称 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER # 指定其它broker节点与本节点通信的监听器名称 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # === 性能优化配置 === # 增加网络线程,系统默认值为3 - KAFKA_CFG_NUM_NETWORK_THREADS=8 # 增加I/O线程,系统默认值为8 - KAFKA_CFG_NUM_IO_THREADS=16 # 增大socket请求大小(支持大消息) - KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # 100MB # 增加请求队列大小 - KAFKA_CFG_QUEUED_MAX_REQUESTS=5000 # JVM堆内存配置 - KAFKA_HEAP_OPTS=-Xmx4G -Xms4G # JVM性能优化 - KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 volumes: - kafka文件地址/kafka1-data:/bitnami/kafka # - C:/StoneBurningProject/RSDataSynchronous Kafka/kafka1-data:/bitnami/kafka networks: - kfk-network healthcheck: # 健康检查命令 test: [ "CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka1:19092" ] interval: 30s timeout: 10s retries: 5 kafka2: image: 'bitnami/kafka:3.6.1' container_name: kafka2 hostname: kafka2 ports: - '29092:19092' environment: - KAFKA_KRAFT_CLUSTER_ID=K3QlUFqCQ66BXfkBs5K7Wg - KAFKA_CFG_NODE_ID=2 - KAFKA_CFG_PROCESS_ROLES=broker - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka2:9092, EXTERNAL://localhost:29092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # === 性能优化配置 === # 增加网络线程,系统默认值为3 - KAFKA_CFG_NUM_NETWORK_THREADS=8 # 增加I/O线程,系统默认值为8 - KAFKA_CFG_NUM_IO_THREADS=16 # 增大socket请求大小(支持大消息) - KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # 100MB # 增加请求队列大小 - KAFKA_CFG_QUEUED_MAX_REQUESTS=5000 # JVM堆内存配置 - KAFKA_HEAP_OPTS=-Xmx4G -Xms4G # JVM性能优化 - KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 volumes: - kafka文件地址/kafka1-data:/bitnami/kafka # - C:/StoneBurningProject/RSDataSynchronous Kafka/kafka2-data:/bitnami/kafka networks: - kfk-network healthcheck: # 健康检查命令 test: [ "CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka2:19092" ] interval: 30s timeout: 10s retries: 5 kafka3: image: 'bitnami/kafka:3.6.1' container_name: kafka3 hostname: kafka3 ports: - '39092:19092' environment: - KAFKA_KRAFT_CLUSTER_ID=K3QlUFqCQ66BXfkBs5K7Wg - KAFKA_CFG_NODE_ID=3 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092, CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka3:9092, EXTERNAL://localhost:39092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # === 性能优化配置 === # 增加网络线程,系统默认值为3 - KAFKA_CFG_NUM_NETWORK_THREADS=8 # 增加I/O线程,系统默认值为8 - KAFKA_CFG_NUM_IO_THREADS=16 # 增大socket请求大小(支持大消息) - KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # 100MB # 增加请求队列大小 - KAFKA_CFG_QUEUED_MAX_REQUESTS=5000 # JVM堆内存配置 - KAFKA_HEAP_OPTS=-Xmx4G -Xms4G # JVM性能优化 - KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 volumes: - kafka文件地址/kafka1-data:/bitnami/kafka # - C:/StoneBurningProject/RSDataSynchronous Kafka/kafka3-data:/bitnami/kafka networks: - kfk-network healthcheck: # 健康检查命令 test: [ "CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka3:19092" ] interval: 30s timeout: 10s retries: 5 kafka4: image: 'bitnami/kafka:3.6.1' container_name: kafka4 hostname: kafka4 ports: - '49092:19092' environment: - KAFKA_KRAFT_CLUSTER_ID=K3QlUFqCQ66BXfkBs5K7Wg - KAFKA_CFG_NODE_ID=4 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092, CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka4:9092, EXTERNAL://localhost:49092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # === 性能优化配置 === # 增加网络线程,系统默认值为3 - KAFKA_CFG_NUM_NETWORK_THREADS=8 # 增加I/O线程,系统默认值为8 - KAFKA_CFG_NUM_IO_THREADS=16 # 增大socket请求大小(支持大消息) - KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # 100MB # 增加请求队列大小 - KAFKA_CFG_QUEUED_MAX_REQUESTS=5000 # JVM堆内存配置 - KAFKA_HEAP_OPTS=-Xmx4G -Xms4G # JVM性能优化 - KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 volumes: - kafka文件地址/kafka1-data:/bitnami/kafka # - C:/StoneBurningProject/RSDataSynchronous Kafka/kafka4-data:/bitnami/kafka networks: - kfk-network healthcheck: # 健康检查命令 test: [ "CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka4:19092" ] interval: 30s timeout: 10s retries: 5 kafka5: image: 'bitnami/kafka:3.6.1' container_name: kafka5 hostname: kafka5 ports: - '59092:19092' environment: - KAFKA_KRAFT_CLUSTER_ID=K3QlUFqCQ66BXfkBs5K7Wg - KAFKA_CFG_NODE_ID=5 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_LISTENERS=INTERNAL://:9092, EXTERNAL://:19092, CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka5:9092, EXTERNAL://localhost:59092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, CONTROLLER:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=3@kafka3:9093, 4@kafka4:9093, 5@kafka5:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL # === 性能优化配置 === # 增加网络线程,系统默认值为3 - KAFKA_CFG_NUM_NETWORK_THREADS=8 # 增加I/O线程,系统默认值为8 - KAFKA_CFG_NUM_IO_THREADS=16 # 增大socket请求大小(支持大消息) - KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES=104857600 # 100MB # 增加请求队列大小 - KAFKA_CFG_QUEUED_MAX_REQUESTS=5000 # JVM堆内存配置 - KAFKA_HEAP_OPTS=-Xmx4G -Xms4G # JVM性能优化 - KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 volumes: - kafka文件地址/kafka1-data:/bitnami/kafka # - C:/StoneBurningProject/RSDataSynchronous Kafka/kafka5-data:/bitnami/kafka networks: - kfk-network healthcheck: # 健康检查命令 test: [ "CMD", "kafka-broker-api-versions.sh", "--bootstrap-server", "kafka5:19092" ] interval: 30s timeout: 10s retries: 5 # 新增 Redpanda Console 服务 redpanda-console: image: redpandadata/console:v2.3.0 # Redpanda 官方控制台镜像 container_name: redpanda-console # 容器名称 hostname: redpanda-console # 主机名 ports: - "8080:8080" # 端口映射:主机8080 → 容器8080 environment: # 关键配置:连接到Kafka集群的broker列表 # 使用Docker内部DNS名称(kafka1等)+内部端口9092 KAFKA_BROKERS: "kafka1:9092,kafka2:9092,kafka3:9092,kafka4:9092,kafka5:9092" CONSOLE_LOG_LEVEL: "info" # 日志级别(debug/info/warn/error) CONSOLE_SERVE_FRONTEND: "true" # 必须启用以提供Web界面 networks: - kfk-network # 加入Kafka同一网络 depends_on: # 启动顺序依赖 - kafka1 - kafka2 - kafka3 - kafka4 - kafka5 networks: kfk-network: name: kfk-network external: true 在 docker-compose.yml 所在目录执行命令,启动 Kafka 集群:docker-compose up -d 验证集群启动状态:执行 docker ps,确认 zookeeper、kafka1、kafka2 容器均处于 Up 状态。 3. Redis 部署(Docker 方式) 执行以下命令快速启动 Redis 容器(如需持久化或密码配置,可添加对应参数): bash docker run -d --name redis -p 6379:6379 redis:7.2 4. 程序部署 从代码仓库拉取项目源码(如 Gitee/GitHub); 打开项目解决方案(MySqlSynchronous.sln),在 appsettings.json 中配置各组件连接信息: MySQL 源数据库:配置 ConnectionStrings:MySqlSource(含数据库地址、端口、账号、密码); MySQL 目标数据库:配置 ConnectionStrings:MySqlTarget(如需多目标存储,可扩展配置); Kafka 集群:配置 Kafka:BootstrapServers(如 localhost:9092,localhost:9093)、Kafka:Topic(生产者发送 / 消费者订阅的主题名); Redis:配置 ConnectionStrings:Redis(如 localhost:6379,如需密码则为 localhost:6379,password=xxx); 编译项目:在解决方案目录执行 dotnet build,确保无编译错误; 发布程序:分别对 MySqlSync.Producer 和 MySqlSync.Consumer 执行发布命令(以发布到 publish 目录为例): bash # 发布生产者 dotnet publish MySqlSync.Producer/MySqlSync.Producer.csproj -o ./publish/Producer # 发布消费者 dotnet publish MySqlSync.Consumer/MySqlSync.Consumer.csproj -o ./publish/Consumer 四、使用说明 1. 启动顺序(关键!) 必须严格按照 “生产者 → 消费者” 的顺序启动,否则消费者将无法获取数据变更消息: 启动生产者:进入 publish/Producer 目录,执行启动命令: bash dotnet MySqlSync.Producer.dll 验证:查看控制台输出,若显示 “Kafka 连接成功”“开始监听 MySQL 数据变更” 等日志,说明生产者启动正常; 启动消费者:进入 publish/Consumer 目录,执行启动命令: bash dotnet MySqlSync.Consumer.dll 验证:查看控制台输出,若显示 “Kafka 消费组连接成功”“开始消费消息” 等日志,且当源数据库发生数据变更时,消费者日志显示 “消息消费成功,数据同步完成”,说明同步流程正常。 2. 数据同步验证 在 MySQL 源数据库执行一条数据插入 / 更新 / 删除操作(如 INSERT INTO test_table (name) VALUES ('test')); 查看生产者控制台:确认日志显示 “捕获数据变更,发送消息至 Kafka 成功”; 查看消费者控制台:确认日志显示 “消费消息成功,同步至目标数据库”; 查询 MySQL 目标数据库的 test_table:确认新增数据已同步,验证同步结果。 3. 常见问题处理 生产者无法监听 MySQL 变更:检查源数据库 Binlog 是否已开启(执行 show variables like 'log_bin';,结果应为 ON),且 Binlog 格式为 ROW 模式(执行 show variables like 'binlog_format';); 消费者无法消费消息:检查 Kafka 集群地址配置是否正确、主题是否存在(可通过 Kafka 命令行工具 kafka-topics.sh 查看)、Redis 连接是否正常; 数据同步重复:检查 Redis 中存储的 Kafka 消费组 offset 是否正常(若 offset 异常,可重置消费组 offset 后重启消费者)。