# elegent-pipe 优雅数据同步框架 **Repository Path**: myelegent/elegent-pipe ## Basic Information - **Project Name**: elegent-pipe 优雅数据同步框架 - **Description**: Elegent pipe(优雅数据同步框架)是一款发布订阅模式的数据同步框架。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 14 - **Forks**: 1 - **Created**: 2023-10-18 - **Last Updated**: 2026-03-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ![](doc/images/logob.png) # Elegent pipe【优雅数据同步框架】 ## 框架介绍 ​ Elegent pipe(优雅数据同步框架)是一款发布订阅模式的数据同步框架。 这个组件具有的特点: 1. 服务端以配置文件的方式配置数据库监听规则,无需要任何管理界面,也不需要单独部署。 2. 需要mysql开启binlog日志。 3. 客户端只需要引入依赖和使用注解即可。 下面是Elegent job框架的系统架构图: ![](doc/images/system.png) ## 使用说明 ### 开启binlog 要开启MySQL的binlog,您可以按照以下步骤进行操作: (1)编辑MySQL的配置文件 `my.cnf` 或 `my.ini`,具体位置根据您的操作系统和MySQL版本而定。 (2)找到并修改以下参数: ``` 复制代码 [mysqld] log-bin=mysql-bin server-id=1 ``` - `log-bin`:指定binlog日志文件的前缀名称,可以根据需要自定义。 - `server-id`:指定MySQL实例的唯一标识,每个MySQL实例需要有不同的server-id。 (3)保存并退出配置文件。 (4)重启MySQL服务,以使配置生效。 开启binlog后,MySQL将开始记录所有的数据库更改操作,并将其写入binlog文件中。这些binlog文件可以用于数据恢复、数据备份、数据同步等用途。 请注意,在修改MySQL配置文件之前,请确保您对MySQL有足够的权限,并且备份了重要的数据。此外,开启binlog会增加MySQL的写入负载,因此在生产环境中,应该根据系统的性能和资源情况进行评估和调整。 ### 服务端 (1)在项目中引入elegent-pipe-server的依赖。 ```xml cn.elegent.pipe elegent-pipe-server 1.1.0 ``` 因为elegent-pipe-server需要用到elegent-AC 来发送消息,所以我们还需要加入 elegent-AC的实现依赖。如果你用的的是mqtt协议的消息中间件,则需要引入 ```xml cn.elegent.ac elegent-AC-mqtt 1.1.0 ``` 如果是amqp协议的消息中间件,则需要引入依赖 ```xml cn.elegent.ac elegent-AC-amqp 1.1.0 ``` (2)在项目配置文件添加配置,示例如下: ```yaml elegent: pipe: host: 127.0.0.1 port: 3306 username: root passwd: HuangShu_2023 tables: - demo:users ac: host: 127.0.0.1 port: 1883 clientId: transmitServer username: admin password: public keepAliveInterval: 30 connectionTimeout: 60 ``` elegent.pipe 配置说明; | 配置 | 说明 | | -------- | ----------------------------------- | | host | 数据库主机 | | port | 数据库端口 | | username | 数据库用户名 | | passwd | 数据库密码 | | tables | 需要监听的数据库表,类型为数组 ,元素的格式为 数据库名:数据库表 | elegent.ac 配置说明: | 配置 | 说明 | | ----------------- | ------------------- | | host | MQ的主机地址 | | port | MQ中间件的端口。 EMQ的是1883 | | username | 登录用户名 | | password | 登录密码 | | clientId | 客户端ID | | keepAliveInterval | 心跳检测(秒) | | connectionTimeout | 连接超时时间 | (3)启动消息中间件。 ### 客户端 (1)引入依赖 ```xml cn.elegent.pipe elegent-pipe-client 1.1.0 ``` 因为elegent-job-client需要用到elegent-AC 来接收消息,所以我们还需要加入 elegent-AC的实现依赖。如果你用的的是mqtt协议的消息中间件,则需要引入 ```xml cn.elegent.ac elegent-AC-mqtt 1.1.0 ``` 如果是amqp协议的消息中间件,则需要引入依赖 ```xml cn.elegent.ac elegent-AC-amqp 1.1.0 ``` (2)在项目配置文件添加配置,示例如下: ```yaml elegent: ac: host: 127.0.0.1 port: 1883 clientId: jobClient username: admin password: public keepAliveInterval: 30 connectionTimeout: 60 ``` elegent.ac 配置说明请参考上一小节 (3)编写数据处理 ```java @ElegentPipe(db="demo",table = "users") public class UserTransmit implements PipeService { @Override public void insertHandler(TransmitDTO transmitDTO) { System.out.println("修改前数据:"+transmitDTO.getBefore()); System.out.println("修改后数据:"+transmitDTO.getAfter()); } @Override public void updateHandler(TransmitDTO transmitDTO) { System.out.println("修改前数据:"+transmitDTO.getBefore()); System.out.println("修改后数据:"+transmitDTO.getAfter()); } @Override public void deleteHandler(TransmitDTO transmitDTO) { System.out.println("修改前数据:"+transmitDTO.getBefore()); System.out.println("修改后数据:"+transmitDTO.getAfter()); } } ``` 编写类实现PipeService接口,并在类上添加@ElegentPipe注解 ,属性说明: db:数据库名 table:表名称 ## 参与贡献 1. 从 `master` 分支 `checkout` 一个新分支(**注**:*请务必保证 master 代码是最新的*) 2. 新分支命名格式:`docs/username_description`,例如:`docs/tom_新增分布式锁配置项` 3. 在新分支上编辑文档、代码,并提交代码 4. 最后 `PR` 合并到 `develop` 分支,等待作者合并即可