# mydataharbor **Repository Path**: mydataharbor/mydataharbor ## Basic Information - **Project Name**: mydataharbor - **Description**: :cn: MyDataHarbor是一个致力于解决异构数据源之间的分布式、高扩展性、高性能、微事务(至少一次保证)的数据同步中间件。帮助用户可靠、快速、稳定的对海量数据进行准实时增量同步或者定时全量同步,主要定位是为实时交易系统服务,亦可用于大数据的数据同步(ETL领域)。 - **Primary Language**: Java - **License**: GPL-3.0 - **Default Branch**: main - **Homepage**: https://mydataharbor.yuque.com/staff-tzwgrd/uqew9p - **GVP Project**: No ## Statistics - **Stars**: 197 - **Forks**: 53 - **Created**: 2021-08-18 - **Last Updated**: 2026-04-03 ## Categories & Tags **Categories**: database-service **Tags**: None ## README


logo

--- ## 📖 简介 **MyDataHarbor** 是一个分布式、高扩展、高性能的**准实时数据同步中间件**,致力于解决异构数据源之间的数据同步问题。 > 🎯 **核心定位**:为实时交易系统服务,支持海量数据的准实时增量同步和定时全量同步,亦可用于 ETL 领域。 典型应用场景: - 🔄 **数据库 → 搜索引擎**:MySQL → Elasticsearch - 🔄 **数据库 → 缓存**:MySQL → Redis - 🔄 **消息队列 → 数据库**:Kafka → MySQL - 🔄 **跨部门数据同步**:多系统间的数据共享与同步 --- ## ✨ 核心特性 | 特性 | 说明 | |------|------| | 🚩 **分布式设计** | 基于 ZooKeeper 构建,支持水平扩展,节点分组隔离,负载均衡与故障自动转移 | | 🔌 **插件化架构** | 高度抽象的接口设计,任何数据源/目标都可通过开发插件接入 | | 🛡️ **数据不丢失** | 引入微事务机制,保障数据**至少成功写入一次** | | 📊 **可视化监控** | 集成 JMX,实时查看任务运行状态、吞吐量、错误率等指标 | | 🎨 **自由组合** | 支持从不同插件复用组件,可视化配置 Pipeline 管道 | | ⚡ **高性能** | 支持批量提交、ForkJoin 并发处理,摩托变汽车,汽车变高铁 | | 📝 **插件自描述** | 自动识别插件能力,生成友好的可视化配置界面 | --- ## 🏗️ 架构设计 MyDataHarbor 唯一依赖的中间件是 ZooKeeper,共有两个核心组件: ### 集群架构设计 ![集群设计](./doc/image/cluster-design.png) ### 节点任务设计 ![节点任务设计](./doc/image/node-design.png) ### 系统架构(示意) ``` ┌─────────────────────────────────────────┐ │ MyDataHarbor Console │ │ - 任务管理 - 插件管理 - 监控看板 │ └───────────────────┬─────────────────────┘ │ ┌─────────▼─────────┐ │ ZooKeeper │ │ (配置中心/协调) │ └─────────┬─────────┘ │ ┌─────────────────────────────┼─────────────────────────────┐ │ │ │ ┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐ │ Server A │ │ Server B │ │ Server C │ │ (group: G1) │ │ (group: G1) │ │ (group: G2) │ │ - Task 1 │ │ - Task 2 │ │ - Task 3 │ │ - Plugin │ │ - Plugin │ │ - Plugin │ └───────────────┘ └───────────────┘ └───────────────┘ ``` ### 核心组件 | 组件 | 说明 | |------|------| | **mydataharbor-console** | 管理控制台,提供任务管理、插件管理、监控看板等 Web 界面 | | **mydataharbor-server** | 数据同步执行节点,负责运行任务 Pipeline | | **mydataharbor-core** | 核心接口与抽象实现,包含数据管道执行引擎 | | **mydataharbor-plugin** | 插件体系,支持数据源、转换器、写入器等扩展 | ### 数据处理流程 ``` ┌──────────────┐ ┌───────────────┐ ┌───────────┐ ┌───────────────┐ ┌──────────────┐ │ DataSource │ → │ ProtocolData │ → │ Checker │ → │ DataConverter │ → │ DataSink │ │ (数据源) │ │ (协议转换) │ │ (数据校验) │ │ (数据转换) │ │ (数据写入) │ └──────────────┘ └───────────────┘ └───────────┘ └───────────────┘ └──────────────┘ ↓ ↓ ↓ ↓ MySQL JSON POJO Elasticsearch Kafka XML Map Redis RabbitMQ ... ... ... ``` --- ## 🚀 快速开始 ### 环境要求 | 依赖 | 版本要求 | 说明 | |------|----------|------| | JDK | 1.8+ | 推荐使用 OpenJDK 8/11 | | ZooKeeper | 3.4.x+ | **必须**,唯一外部依赖 | | Maven | 3.6+ | 如需源码编译 | ### 方式一:下载二进制包 #### 1. 下载安装包 从 [Releases](https://github.com/mydataharbor/mydataharbor/releases) 下载: - `mydataharbor-console-{version}-bin.tar.gz` - `mydataharbor-server-{version}-bin.tar.gz` ```bash # Linux wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-console-2.0.1-bin.tar.gz wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-server-2.0.1-bin.tar.gz # 解压 tar -xzf mydataharbor-console-2.0.1-bin.tar.gz tar -xzf mydataharbor-server-2.0.1-bin.tar.gz ``` #### 2. 配置 mydataharbor-console 编辑 `config/application.yml`: ```yaml server: port: 8080 # Console 服务端口 zk: 127.0.0.1:2181 # ZooKeeper 地址 ``` #### 3. 配置 mydataharbor-server 编辑 `config/system.yml`: ```yaml zk: ["127.0.0.1:2181"] # ZooKeeper 地址 port: 1299 # Server 服务端口 group: biz001 # 节点所属组名 pluginRepository: http://127.0.0.1:8080 # 插件仓库地址(Console) ``` #### 4. 启动服务 ```bash # 启动 Console(管理台) cd mydataharbor-console ./start.sh # 启动 Server(数据同步节点) cd mydataharbor-server ./start.sh ``` > 💡 **start.sh 支持参数**: > - `./start.sh jmx` - 开启远程 JMX 监控 > - `./start.sh debug` - 开启远程调试 > - `./start.sh status` - 查看服务状态 > - `./stop.sh` - 停止服务 #### 5. 验证 访问管理控制台:http://127.0.0.1:8080 --- ### 方式二:源码编译 ```bash # 克隆项目 git clone https://github.com/mydataharbor/mydataharbor.git cd mydataharbor # 编译打包 mvn clean package -DskipTests # 产物位置 # mydataharbor-deploy/mydataharbor-console/target/mydataharbor-console-{version}-bin.tar.gz # mydataharbor-deploy/mydataharbor-server/target/mydataharbor-server-{version}-bin.tar.gz ``` --- ### 方式三:Docker 部署(推荐) ```bash # 启动 ZooKeeper docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4 # 启动 Console docker run -d --name mydataharbor-console \ -e ZK_HOST=127.0.0.1:2181 \ -p 8080:8080 \ mydataharbor/mydataharbor-console:2.0.1 # 启动 Server docker run -d --name mydataharbor-server \ -e ZK_HOST=127.0.0.1:2181 \ -e GROUP=biz001 \ -e PLUGIN_REPO=http://127.0.0.1:8080 \ -p 1299:1299 \ mydataharbor/mydataharbor-server:2.0.1 ``` --- ## 📦 插件生态 ### 已支持插件 | 插件名称 | 类型 | 说明 | |---------|------|------| | MySQL Plugin | DataSource | MySQL 数据库增量/全量同步 | | Elasticsearch Plugin | DataSink | 数据写入 ES 索引 | | Redis Plugin | DataSink | 数据写入 Redis | | Kafka Plugin | DataSource/Sink | Kafka 消息消费与生产 | | Groovy Plugin | Converter | Groovy 脚本数据转换 | 完整插件清单:[插件市场](https://www.mydataharbor.com/user/info.html) ### 开发自定义插件 #### 1. 插件项目结构 ``` myplugin/ ├── pom.xml # Maven 配置 ├── plugin.properties # 插件描述文件 └── src/main/java/ └── myplugin/ ├── MyDataSource.java # 数据源实现 ├── MyDataSink.java # 写入器实现 ├── MyConverter.java # 转换器实现 └── MyPipelineCreator.java # Pipeline 创建器 ``` #### 2. plugin.properties 配置 ```properties # 插件唯一标识 plugin.id=myplugin # 插件版本 plugin.version=1.0.0 # 插件描述 plugin.desc=我的自定义插件 # 插件作者 plugin.author=yourname # 依赖其他插件(可选) plugin.dependencies=mydataharbor-base:1.0.0,mysql-plugin:2.0.0 # 插件依赖类加载器隔离 plugin.requiresIsolation=true ``` #### 3. 数据源(DataSource)开发 ```java @MyDataHarborMarker(title = "我的数据源") @Extension public class MyDataSource implements IDataSource { // 数据总数(用于进度展示) private long total; // 当前进度 private long now = 0L; @Override public void init(MySetting setting) { // 初始化资源,如数据库连接 this.total = queryTotalCount(); } @Override public String dataSourceType() { return "MyDataSource"; } @Override public Long total() { return total; } @Override public Iterable poll(MySetting setting) throws TheEndException { // 当数据全部拉取完成时抛出 TheEndException if (now >= total) { throw new TheEndException("数据拉取完成"); } List batch = new ArrayList<>(); // 批量拉取数据(建议每批 100-1000 条) while (now < total && batch.size() < 100) { MyRecord record = fetchData(now); batch.add(record); now++; } return batch; } @Override public void commit(MyRecord record, MySetting setting) { // 单条提交(记录成功位置,用于断点续传) } @Override public void commit(Iterable records, MySetting setting) { // 批量提交(推荐实现此方法以提升性能) for (MyRecord record : records) { commit(record, setting); } } @Override public void rollback(MyRecord record, MySetting setting) { // 单条回滚(处理失败记录) } @Override public void close() throws IOException { // 释放资源 } } ``` #### 4. 写入器(DataSink)开发 ```java @MyDataHarborMarker(title = "我的写入器") @Extension @Slf4j public class MyDataSink implements IDataSink { @Override public String name() { return "MyDataSink"; } @Override public WriterResult write(MyRecord record, MySetting setting) throws ResetException { // 单条写入 try { // 执行写入操作 doWrite(record); return WriterResult.builder() .commit(true) .success(true) .msg("success") .build(); } catch (Exception e) { log.error("写入失败", e); return WriterResult.builder() .commit(false) .success(false) .msg(e.getMessage()) .build(); } } @Override public WriterResult write(List records, MySetting setting) throws ResetException { // 批量写入(推荐实现,性能更高) try { doBatchWrite(records); return WriterResult.builder() .commit(true) .success(true) .msg("batch success") .build(); } catch (Exception e) { log.error("批量写入失败", e); return WriterResult.builder() .commit(false) .success(false) .msg(e.getMessage()) .build(); } } @Override public void close() throws IOException { // 释放资源 } } ``` #### 5. 转换器(DataConverter)开发 ```java @MyDataHarborMarker(title = "我的转换器") @Extension public class MyDataConverter implements IDataConverter { @Override public MyRecord convert(ProtocolData protocolData, BaseSettingContext settingContext) { // 将 ProtocolData 转换为业务 Record 对象 MyRecord record = new MyRecord(); record.setId(protocolData.getId()); record.setData(protocolData.getContent()); return record; } } ``` #### 6. 校验器(Checker)开发 ```java @MyDataHarborMarker(title = "我的校验器") @Extension public class MyChecker implements IChecker { @Override public CheckResult check(MyRecord record, BaseSettingContext settingContext) { // 数据校验逻辑 if (record == null || record.getId() == null) { return CheckResult.fail("数据不完整"); } return CheckResult.pass(); } } ``` #### 7. Pipeline 创建器开发 ```java @Extension public class MyPipelineCreator extends AbstractAutoScanPipelineCreator { @Override public String type() { return "我的数据同步任务"; } @Override public IDataPipeline createPipeline(MyConfig config, MySetting setting) throws Exception { return CommonDataPipeline.builder() .dataSource(new MyDataSource()) .protocolDataConverter(new OriginalProtocolDataConverter()) .checker(new MyChecker()) .dataConverter(new MyDataConverter()) .sink(new MyDataSink()) .settingContext(setting) .build(); } @Override public String scanPackage() { return "myplugin"; } // 配置类(用于生成可视化配置界面) @Data public static class MyConfig { @MyDataHarborMarker(title = "数据源地址", defaultValue = "localhost:3306") private String dataSourceUrl; @MyDataHarborMarker(title = "用户名", defaultValue = "root") private String username; @MyDataHarborMarker(title = "密码", defaultValue = "", secret = true) private String password; @MyDataHarborMarker(title = "批处理大小", defaultValue = "1000") private Integer batchSize; } } ``` #### 8. 插件依赖声明 在 `plugin.properties` 中声明依赖: ```properties # 依赖基础插件(版本 1.0.0 及以上) plugin.dependencies=mydataharbor-base:1.0.0 # 依赖多个插件 plugin.dependencies=mysql-plugin:2.0.0,elasticsearch-plugin:1.5.0 # 可选依赖(不存在也不影响运行) plugin.dependencies=mysql-plugin:2.0.0?optional=true ``` #### 9. 插件打包与发布 ```bash # 编译打包 mvn clean package # 产物位置 target/myplugin-1.0.0.zip # 发布到插件仓库 # 方式 1:上传到 Console 管理台 curl -X POST http://localhost:8080/mydataharbor/plugin/upload \ -F "file=@target/myplugin-1.0.0.zip" # 方式 2:手动放置到插件仓库目录 cp target/myplugin-1.0.0.zip /path/to/plugin-repository/ ``` #### 10. 本地调试 ```bash # 方式 1:将插件 JAR 放到 Server 插件目录 cp target/myplugin-1.0.0.zip mydataharbor-server/plugins/ # 重启 Server # 方式 2:Maven 依赖方式开发 # 在 Server 项目的 pom.xml 中添加: com.mydataharbor myplugin 1.0.0 ``` #### 11. @MyDataHarborMarker 注解属性 | 属性 | 说明 | 示例 | |------|------|------| | title | 字段显示名称 | `@MyDataHarborMarker(title = "数据源地址")` | | defaultValue | 默认值 | `defaultValue = "localhost:3306"` | | secret | 是否为敏感信息(前端会隐藏显示) | `secret = true` | | required | 是否必填 | `required = true` | | placeholder | 输入框占位提示 | `placeholder = "请输入数据库地址"` | --- ## 💡 使用指南 ### 创建数据同步任务 1. **访问管理台**:http://localhost:8080 2. **创建任务** - 进入"任务管理" → "创建任务" - 选择 Pipeline 类型(如"MySQL 到 Elasticsearch") - 填写配置信息 3. **配置调度策略** - 增量同步:实时监听数据库变更 - 全量同步:定时全量刷新(如每天凌晨 2 点) - Cron 表达式:`0 0 2 * * ?` 4. **启动任务** - 点击"启动"按钮 - 选择执行节点组 - 观察任务状态变为"运行中" ### 常见任务类型 #### MySQL → Elasticsearch ```yaml pipelineType: mysql-to-elasticsearch settingContext: dataSource: jdbcUrl: jdbc:mysql://localhost:3306/mydb username: root password: "123456" tables: ["users", "orders"] sink: esHosts: ["localhost:9200"] indexPrefix: "myapp" batchCommit: true batchSize: 1000 parallel: true threadNum: 4 ``` #### Kafka → MySQL ```yaml pipelineType: kafka-to-mysql settingContext: kafka: bootstrapServers: ["localhost:9092"] topic: "user-events" groupId: "mysql-sync-group" mysql: jdbcUrl: jdbc:mysql://localhost:3306/target_db username: root password: "123456" table: "events" batchCommit: true batchSize: 500 ``` ### 任务运维 #### 查看任务状态 ```bash # API 方式 curl http://localhost:8080/mydataharbor/task/list?group=biz001 # JMX 方式 # 连接 JMX 端口 9999,查看 MyDataHarbor MBean ``` #### 暂停/恢复任务 ```bash # 暂停任务 curl -X POST http://localhost:8080/mydataharbor/task/pause?taskId=task-001 # 恢复任务 curl -X POST http://localhost:8080/mydataharbor/task/resume?taskId=task-001 ``` --- ## 🔧 配置说明 ### Console 配置 (application.yml) ```yaml server: port: 8080 # Web 服务端口 servlet: multipart: max-file-size: 500MB # 插件上传大小限制 max-request-size: 500MB zk: 127.0.0.1:2181 # ZooKeeper 连接地址 spring: devtools: livereload: enabled: true # 开发环境热加载 ``` ### Server 配置 (system.yml) ```yaml zk: ["127.0.0.1:2181"] # ZooKeeper 集群地址 port: 1299 # RPC 服务端口 group: biz001 # 节点分组名(同组节点负载均衡) pluginRepository: http://127.0.0.1:8080 # 插件仓库地址 # 可选:JMX 监控配置 jmx: enabled: true port: 9999 ``` --- ## 📊 监控与运维 ### JMX 监控 启动时开启 JMX: ```bash ./start.sh jmx ``` 连接 JMX 后可查看: - 任务执行状态(运行中/暂停/已结束) - 数据吞吐量(条数/秒) - 错误统计 - 各阶段耗时分析 ### API 接口 | 接口 | 说明 | |------|------| | `GET /mydataharbor/node/nodeList` | 获取所有节点列表 | | `GET /mydataharbor/task/listTask` | 获取任务列表 | | `POST /mydataharbor/task/create` | 创建同步任务 | | `POST /mydataharbor/plugin/listPlugins` | 获取插件列表 | 完整 API 文档:`http://localhost:8080/swagger-ui/` --- ## 🎯 最佳实践 ### 1. 节点分组隔离 将不同业务的任务分配到不同组: - `group: order-sync` - 订单数据同步 - `group: user-sync` - 用户数据同步 - `group: log-sync` - 日志数据同步 ### 2. 批量提交优化 对于写入型任务,开启批量提交可显著提升性能: ```yaml # 任务配置 settingContext: batchCommit: true # 批量提交 batchSize: 1000 # 每批 1000 条 parallel: true # 开启并行处理 threadNum: 4 # 4 线程并行 ``` ### 3. 故障转移配置 ```yaml # 任务配置 enableRebalance: true # 开启故障转移 enableLoadBalance: true # 开启负载均衡 ``` --- ## 🤝 参与贡献 我们欢迎以下类型的贡献: 1. **功能开发**:新的数据源/写入器插件 2. **文档完善**:使用教程、最佳实践 3. **Bug 修复**:提交 Issue 或 Pull Request 4. **前端优化**:Vue 开发人员改进 UI 体验 ### 开发环境搭建 ```bash # 克隆项目 git clone https://github.com/mydataharbor/mydataharbor.git cd mydataharbor # 导入 IDE(IntelliJ IDEA 推荐) # File -> Open -> 选择项目根目录 # 编译 mvn clean install # 运行单元测试 mvn test ``` ### 提交规范 ``` feat: 新增 XXX 功能 fix: 修复 XXX 问题 docs: 文档更新 refactor: 代码重构 test: 测试用例 chore: 构建/配置变更 ``` --- ## 📞 社区与支持 | 渠道 | 链接/方式 | |------|----------| | 📧 联系作者 | 1053618636@qq.com | | 📚 官方文档 | [语雀文档](https://mydataharbor.yuque.com/staff-tzwgrd/uqew9p) | | 🌐 官网 | [www.mydataharbor.com](https://www.mydataharbor.com) | | 💬 Demo 环境 | [demo.mydataharbor.com](http://demo.mydataharbor.com) | | 👥 QQ 交流群 | QQ 群
(加群需验证 Star 数)| --- ## 📝 更新日志 ### v2.0.1(当前版本) - ✨ 优化 pipeline 可视化创建,支持 UI 自由组合组件 - 🔧 插件支持调整位置,解决依赖加载顺序问题 - 🔄 支持插件重复安装和版本更新 - ⚖️ 负载均衡和故障转移可分开配置 ### v2.0.0 - 💾 新增 `ITaskStorage` 接口,支持任务状态持久化 - 📊 管理台实时展示任务监控信息 - 🔄 支持任务修改重建功能 - 🎯 优化 rebalance 算法 ### v1.x - 初始版本,基础数据同步功能 --- ## 📄 许可证 本项目采用 [GNU GPL v3.0](https://github.com/mydataharbor/mydataharbor/blob/main/LICENSE) 开源协议。 ``` Copyright (C) 2020-2026 xulang <1053618636@qq.com> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ``` ---

如果项目对你有帮助,欢迎 ⭐Star 支持!