# mydataharbor
**Repository Path**: mydataharbor/mydataharbor
## Basic Information
- **Project Name**: mydataharbor
- **Description**: :cn: MyDataHarbor是一个致力于解决异构数据源之间的分布式、高扩展性、高性能、微事务(至少一次保证)的数据同步中间件。帮助用户可靠、快速、稳定的对海量数据进行准实时增量同步或者定时全量同步,主要定位是为实时交易系统服务,亦可用于大数据的数据同步(ETL领域)。
- **Primary Language**: Java
- **License**: GPL-3.0
- **Default Branch**: main
- **Homepage**: https://mydataharbor.yuque.com/staff-tzwgrd/uqew9p
- **GVP Project**: No
## Statistics
- **Stars**: 197
- **Forks**: 53
- **Created**: 2021-08-18
- **Last Updated**: 2026-04-03
## Categories & Tags
**Categories**: database-service
**Tags**: None
## README
2026-04-03
---
## 📖 简介
**MyDataHarbor** 是一个分布式、高扩展、高性能的**准实时数据同步中间件**,致力于解决异构数据源之间的数据同步问题。
> 🎯 **核心定位**:为实时交易系统服务,支持海量数据的准实时增量同步和定时全量同步,亦可用于 ETL 领域。
典型应用场景:
- 🔄 **数据库 → 搜索引擎**:MySQL → Elasticsearch
- 🔄 **数据库 → 缓存**:MySQL → Redis
- 🔄 **消息队列 → 数据库**:Kafka → MySQL
- 🔄 **跨部门数据同步**:多系统间的数据共享与同步
---
## ✨ 核心特性
| 特性 | 说明 |
|------|------|
| 🚩 **分布式设计** | 基于 ZooKeeper 构建,支持水平扩展,节点分组隔离,负载均衡与故障自动转移 |
| 🔌 **插件化架构** | 高度抽象的接口设计,任何数据源/目标都可通过开发插件接入 |
| 🛡️ **数据不丢失** | 引入微事务机制,保障数据**至少成功写入一次** |
| 📊 **可视化监控** | 集成 JMX,实时查看任务运行状态、吞吐量、错误率等指标 |
| 🎨 **自由组合** | 支持从不同插件复用组件,可视化配置 Pipeline 管道 |
| ⚡ **高性能** | 支持批量提交、ForkJoin 并发处理,摩托变汽车,汽车变高铁 |
| 📝 **插件自描述** | 自动识别插件能力,生成友好的可视化配置界面 |
---
## 🏗️ 架构设计
MyDataHarbor 唯一依赖的中间件是 ZooKeeper,共有两个核心组件:
### 集群架构设计

### 节点任务设计

### 系统架构(示意)
```
┌─────────────────────────────────────────┐
│ MyDataHarbor Console │
│ - 任务管理 - 插件管理 - 监控看板 │
└───────────────────┬─────────────────────┘
│
┌─────────▼─────────┐
│ ZooKeeper │
│ (配置中心/协调) │
└─────────┬─────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
┌───────▼───────┐ ┌───────▼───────┐ ┌───────▼───────┐
│ Server A │ │ Server B │ │ Server C │
│ (group: G1) │ │ (group: G1) │ │ (group: G2) │
│ - Task 1 │ │ - Task 2 │ │ - Task 3 │
│ - Plugin │ │ - Plugin │ │ - Plugin │
└───────────────┘ └───────────────┘ └───────────────┘
```
### 核心组件
| 组件 | 说明 |
|------|------|
| **mydataharbor-console** | 管理控制台,提供任务管理、插件管理、监控看板等 Web 界面 |
| **mydataharbor-server** | 数据同步执行节点,负责运行任务 Pipeline |
| **mydataharbor-core** | 核心接口与抽象实现,包含数据管道执行引擎 |
| **mydataharbor-plugin** | 插件体系,支持数据源、转换器、写入器等扩展 |
### 数据处理流程
```
┌──────────────┐ ┌───────────────┐ ┌───────────┐ ┌───────────────┐ ┌──────────────┐
│ DataSource │ → │ ProtocolData │ → │ Checker │ → │ DataConverter │ → │ DataSink │
│ (数据源) │ │ (协议转换) │ │ (数据校验) │ │ (数据转换) │ │ (数据写入) │
└──────────────┘ └───────────────┘ └───────────┘ └───────────────┘ └──────────────┘
↓ ↓ ↓ ↓
MySQL JSON POJO Elasticsearch
Kafka XML Map Redis
RabbitMQ ... ... ...
```
---
## 🚀 快速开始
### 环境要求
| 依赖 | 版本要求 | 说明 |
|------|----------|------|
| JDK | 1.8+ | 推荐使用 OpenJDK 8/11 |
| ZooKeeper | 3.4.x+ | **必须**,唯一外部依赖 |
| Maven | 3.6+ | 如需源码编译 |
### 方式一:下载二进制包
#### 1. 下载安装包
从 [Releases](https://github.com/mydataharbor/mydataharbor/releases) 下载:
- `mydataharbor-console-{version}-bin.tar.gz`
- `mydataharbor-server-{version}-bin.tar.gz`
```bash
# Linux
wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-console-2.0.1-bin.tar.gz
wget https://github.com/mydataharbor/mydataharbor/releases/download/v2.0.1/mydataharbor-server-2.0.1-bin.tar.gz
# 解压
tar -xzf mydataharbor-console-2.0.1-bin.tar.gz
tar -xzf mydataharbor-server-2.0.1-bin.tar.gz
```
#### 2. 配置 mydataharbor-console
编辑 `config/application.yml`:
```yaml
server:
port: 8080 # Console 服务端口
zk: 127.0.0.1:2181 # ZooKeeper 地址
```
#### 3. 配置 mydataharbor-server
编辑 `config/system.yml`:
```yaml
zk: ["127.0.0.1:2181"] # ZooKeeper 地址
port: 1299 # Server 服务端口
group: biz001 # 节点所属组名
pluginRepository: http://127.0.0.1:8080 # 插件仓库地址(Console)
```
#### 4. 启动服务
```bash
# 启动 Console(管理台)
cd mydataharbor-console
./start.sh
# 启动 Server(数据同步节点)
cd mydataharbor-server
./start.sh
```
> 💡 **start.sh 支持参数**:
> - `./start.sh jmx` - 开启远程 JMX 监控
> - `./start.sh debug` - 开启远程调试
> - `./start.sh status` - 查看服务状态
> - `./stop.sh` - 停止服务
#### 5. 验证
访问管理控制台:http://127.0.0.1:8080
---
### 方式二:源码编译
```bash
# 克隆项目
git clone https://github.com/mydataharbor/mydataharbor.git
cd mydataharbor
# 编译打包
mvn clean package -DskipTests
# 产物位置
# mydataharbor-deploy/mydataharbor-console/target/mydataharbor-console-{version}-bin.tar.gz
# mydataharbor-deploy/mydataharbor-server/target/mydataharbor-server-{version}-bin.tar.gz
```
---
### 方式三:Docker 部署(推荐)
```bash
# 启动 ZooKeeper
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4
# 启动 Console
docker run -d --name mydataharbor-console \
-e ZK_HOST=127.0.0.1:2181 \
-p 8080:8080 \
mydataharbor/mydataharbor-console:2.0.1
# 启动 Server
docker run -d --name mydataharbor-server \
-e ZK_HOST=127.0.0.1:2181 \
-e GROUP=biz001 \
-e PLUGIN_REPO=http://127.0.0.1:8080 \
-p 1299:1299 \
mydataharbor/mydataharbor-server:2.0.1
```
---
## 📦 插件生态
### 已支持插件
| 插件名称 | 类型 | 说明 |
|---------|------|------|
| MySQL Plugin | DataSource | MySQL 数据库增量/全量同步 |
| Elasticsearch Plugin | DataSink | 数据写入 ES 索引 |
| Redis Plugin | DataSink | 数据写入 Redis |
| Kafka Plugin | DataSource/Sink | Kafka 消息消费与生产 |
| Groovy Plugin | Converter | Groovy 脚本数据转换 |
完整插件清单:[插件市场](https://www.mydataharbor.com/user/info.html)
### 开发自定义插件
#### 1. 插件项目结构
```
myplugin/
├── pom.xml # Maven 配置
├── plugin.properties # 插件描述文件
└── src/main/java/
└── myplugin/
├── MyDataSource.java # 数据源实现
├── MyDataSink.java # 写入器实现
├── MyConverter.java # 转换器实现
└── MyPipelineCreator.java # Pipeline 创建器
```
#### 2. plugin.properties 配置
```properties
# 插件唯一标识
plugin.id=myplugin
# 插件版本
plugin.version=1.0.0
# 插件描述
plugin.desc=我的自定义插件
# 插件作者
plugin.author=yourname
# 依赖其他插件(可选)
plugin.dependencies=mydataharbor-base:1.0.0,mysql-plugin:2.0.0
# 插件依赖类加载器隔离
plugin.requiresIsolation=true
```
#### 3. 数据源(DataSource)开发
```java
@MyDataHarborMarker(title = "我的数据源")
@Extension
public class MyDataSource implements IDataSource {
// 数据总数(用于进度展示)
private long total;
// 当前进度
private long now = 0L;
@Override
public void init(MySetting setting) {
// 初始化资源,如数据库连接
this.total = queryTotalCount();
}
@Override
public String dataSourceType() {
return "MyDataSource";
}
@Override
public Long total() {
return total;
}
@Override
public Iterable poll(MySetting setting) throws TheEndException {
// 当数据全部拉取完成时抛出 TheEndException
if (now >= total) {
throw new TheEndException("数据拉取完成");
}
List batch = new ArrayList<>();
// 批量拉取数据(建议每批 100-1000 条)
while (now < total && batch.size() < 100) {
MyRecord record = fetchData(now);
batch.add(record);
now++;
}
return batch;
}
@Override
public void commit(MyRecord record, MySetting setting) {
// 单条提交(记录成功位置,用于断点续传)
}
@Override
public void commit(Iterable records, MySetting setting) {
// 批量提交(推荐实现此方法以提升性能)
for (MyRecord record : records) {
commit(record, setting);
}
}
@Override
public void rollback(MyRecord record, MySetting setting) {
// 单条回滚(处理失败记录)
}
@Override
public void close() throws IOException {
// 释放资源
}
}
```
#### 4. 写入器(DataSink)开发
```java
@MyDataHarborMarker(title = "我的写入器")
@Extension
@Slf4j
public class MyDataSink implements IDataSink {
@Override
public String name() {
return "MyDataSink";
}
@Override
public WriterResult write(MyRecord record, MySetting setting) throws ResetException {
// 单条写入
try {
// 执行写入操作
doWrite(record);
return WriterResult.builder()
.commit(true)
.success(true)
.msg("success")
.build();
} catch (Exception e) {
log.error("写入失败", e);
return WriterResult.builder()
.commit(false)
.success(false)
.msg(e.getMessage())
.build();
}
}
@Override
public WriterResult write(List records, MySetting setting) throws ResetException {
// 批量写入(推荐实现,性能更高)
try {
doBatchWrite(records);
return WriterResult.builder()
.commit(true)
.success(true)
.msg("batch success")
.build();
} catch (Exception e) {
log.error("批量写入失败", e);
return WriterResult.builder()
.commit(false)
.success(false)
.msg(e.getMessage())
.build();
}
}
@Override
public void close() throws IOException {
// 释放资源
}
}
```
#### 5. 转换器(DataConverter)开发
```java
@MyDataHarborMarker(title = "我的转换器")
@Extension
public class MyDataConverter implements IDataConverter {
@Override
public MyRecord convert(ProtocolData protocolData, BaseSettingContext settingContext) {
// 将 ProtocolData 转换为业务 Record 对象
MyRecord record = new MyRecord();
record.setId(protocolData.getId());
record.setData(protocolData.getContent());
return record;
}
}
```
#### 6. 校验器(Checker)开发
```java
@MyDataHarborMarker(title = "我的校验器")
@Extension
public class MyChecker implements IChecker {
@Override
public CheckResult check(MyRecord record, BaseSettingContext settingContext) {
// 数据校验逻辑
if (record == null || record.getId() == null) {
return CheckResult.fail("数据不完整");
}
return CheckResult.pass();
}
}
```
#### 7. Pipeline 创建器开发
```java
@Extension
public class MyPipelineCreator extends AbstractAutoScanPipelineCreator {
@Override
public String type() {
return "我的数据同步任务";
}
@Override
public IDataPipeline createPipeline(MyConfig config, MySetting setting) throws Exception {
return CommonDataPipeline.builder()
.dataSource(new MyDataSource())
.protocolDataConverter(new OriginalProtocolDataConverter())
.checker(new MyChecker())
.dataConverter(new MyDataConverter())
.sink(new MyDataSink())
.settingContext(setting)
.build();
}
@Override
public String scanPackage() {
return "myplugin";
}
// 配置类(用于生成可视化配置界面)
@Data
public static class MyConfig {
@MyDataHarborMarker(title = "数据源地址", defaultValue = "localhost:3306")
private String dataSourceUrl;
@MyDataHarborMarker(title = "用户名", defaultValue = "root")
private String username;
@MyDataHarborMarker(title = "密码", defaultValue = "", secret = true)
private String password;
@MyDataHarborMarker(title = "批处理大小", defaultValue = "1000")
private Integer batchSize;
}
}
```
#### 8. 插件依赖声明
在 `plugin.properties` 中声明依赖:
```properties
# 依赖基础插件(版本 1.0.0 及以上)
plugin.dependencies=mydataharbor-base:1.0.0
# 依赖多个插件
plugin.dependencies=mysql-plugin:2.0.0,elasticsearch-plugin:1.5.0
# 可选依赖(不存在也不影响运行)
plugin.dependencies=mysql-plugin:2.0.0?optional=true
```
#### 9. 插件打包与发布
```bash
# 编译打包
mvn clean package
# 产物位置
target/myplugin-1.0.0.zip
# 发布到插件仓库
# 方式 1:上传到 Console 管理台
curl -X POST http://localhost:8080/mydataharbor/plugin/upload \
-F "file=@target/myplugin-1.0.0.zip"
# 方式 2:手动放置到插件仓库目录
cp target/myplugin-1.0.0.zip /path/to/plugin-repository/
```
#### 10. 本地调试
```bash
# 方式 1:将插件 JAR 放到 Server 插件目录
cp target/myplugin-1.0.0.zip mydataharbor-server/plugins/
# 重启 Server
# 方式 2:Maven 依赖方式开发
# 在 Server 项目的 pom.xml 中添加:
com.mydataharbor
myplugin
1.0.0
```
#### 11. @MyDataHarborMarker 注解属性
| 属性 | 说明 | 示例 |
|------|------|------|
| title | 字段显示名称 | `@MyDataHarborMarker(title = "数据源地址")` |
| defaultValue | 默认值 | `defaultValue = "localhost:3306"` |
| secret | 是否为敏感信息(前端会隐藏显示) | `secret = true` |
| required | 是否必填 | `required = true` |
| placeholder | 输入框占位提示 | `placeholder = "请输入数据库地址"` |
---
## 💡 使用指南
### 创建数据同步任务
1. **访问管理台**:http://localhost:8080
2. **创建任务**
- 进入"任务管理" → "创建任务"
- 选择 Pipeline 类型(如"MySQL 到 Elasticsearch")
- 填写配置信息
3. **配置调度策略**
- 增量同步:实时监听数据库变更
- 全量同步:定时全量刷新(如每天凌晨 2 点)
- Cron 表达式:`0 0 2 * * ?`
4. **启动任务**
- 点击"启动"按钮
- 选择执行节点组
- 观察任务状态变为"运行中"
### 常见任务类型
#### MySQL → Elasticsearch
```yaml
pipelineType: mysql-to-elasticsearch
settingContext:
dataSource:
jdbcUrl: jdbc:mysql://localhost:3306/mydb
username: root
password: "123456"
tables: ["users", "orders"]
sink:
esHosts: ["localhost:9200"]
indexPrefix: "myapp"
batchCommit: true
batchSize: 1000
parallel: true
threadNum: 4
```
#### Kafka → MySQL
```yaml
pipelineType: kafka-to-mysql
settingContext:
kafka:
bootstrapServers: ["localhost:9092"]
topic: "user-events"
groupId: "mysql-sync-group"
mysql:
jdbcUrl: jdbc:mysql://localhost:3306/target_db
username: root
password: "123456"
table: "events"
batchCommit: true
batchSize: 500
```
### 任务运维
#### 查看任务状态
```bash
# API 方式
curl http://localhost:8080/mydataharbor/task/list?group=biz001
# JMX 方式
# 连接 JMX 端口 9999,查看 MyDataHarbor MBean
```
#### 暂停/恢复任务
```bash
# 暂停任务
curl -X POST http://localhost:8080/mydataharbor/task/pause?taskId=task-001
# 恢复任务
curl -X POST http://localhost:8080/mydataharbor/task/resume?taskId=task-001
```
---
## 🔧 配置说明
### Console 配置 (application.yml)
```yaml
server:
port: 8080 # Web 服务端口
servlet:
multipart:
max-file-size: 500MB # 插件上传大小限制
max-request-size: 500MB
zk: 127.0.0.1:2181 # ZooKeeper 连接地址
spring:
devtools:
livereload:
enabled: true # 开发环境热加载
```
### Server 配置 (system.yml)
```yaml
zk: ["127.0.0.1:2181"] # ZooKeeper 集群地址
port: 1299 # RPC 服务端口
group: biz001 # 节点分组名(同组节点负载均衡)
pluginRepository: http://127.0.0.1:8080 # 插件仓库地址
# 可选:JMX 监控配置
jmx:
enabled: true
port: 9999
```
---
## 📊 监控与运维
### JMX 监控
启动时开启 JMX:
```bash
./start.sh jmx
```
连接 JMX 后可查看:
- 任务执行状态(运行中/暂停/已结束)
- 数据吞吐量(条数/秒)
- 错误统计
- 各阶段耗时分析
### API 接口
| 接口 | 说明 |
|------|------|
| `GET /mydataharbor/node/nodeList` | 获取所有节点列表 |
| `GET /mydataharbor/task/listTask` | 获取任务列表 |
| `POST /mydataharbor/task/create` | 创建同步任务 |
| `POST /mydataharbor/plugin/listPlugins` | 获取插件列表 |
完整 API 文档:`http://localhost:8080/swagger-ui/`
---
## 🎯 最佳实践
### 1. 节点分组隔离
将不同业务的任务分配到不同组:
- `group: order-sync` - 订单数据同步
- `group: user-sync` - 用户数据同步
- `group: log-sync` - 日志数据同步
### 2. 批量提交优化
对于写入型任务,开启批量提交可显著提升性能:
```yaml
# 任务配置
settingContext:
batchCommit: true # 批量提交
batchSize: 1000 # 每批 1000 条
parallel: true # 开启并行处理
threadNum: 4 # 4 线程并行
```
### 3. 故障转移配置
```yaml
# 任务配置
enableRebalance: true # 开启故障转移
enableLoadBalance: true # 开启负载均衡
```
---
## 🤝 参与贡献
我们欢迎以下类型的贡献:
1. **功能开发**:新的数据源/写入器插件
2. **文档完善**:使用教程、最佳实践
3. **Bug 修复**:提交 Issue 或 Pull Request
4. **前端优化**:Vue 开发人员改进 UI 体验
### 开发环境搭建
```bash
# 克隆项目
git clone https://github.com/mydataharbor/mydataharbor.git
cd mydataharbor
# 导入 IDE(IntelliJ IDEA 推荐)
# File -> Open -> 选择项目根目录
# 编译
mvn clean install
# 运行单元测试
mvn test
```
### 提交规范
```
feat: 新增 XXX 功能
fix: 修复 XXX 问题
docs: 文档更新
refactor: 代码重构
test: 测试用例
chore: 构建/配置变更
```
---
## 📞 社区与支持
| 渠道 | 链接/方式 |
|------|----------|
| 📧 联系作者 | 1053618636@qq.com |
| 📚 官方文档 | [语雀文档](https://mydataharbor.yuque.com/staff-tzwgrd/uqew9p) |
| 🌐 官网 | [www.mydataharbor.com](https://www.mydataharbor.com) |
| 💬 Demo 环境 | [demo.mydataharbor.com](http://demo.mydataharbor.com) |
| 👥 QQ 交流群 | 
(加群需验证 Star 数)|
---
## 📝 更新日志
### v2.0.1(当前版本)
- ✨ 优化 pipeline 可视化创建,支持 UI 自由组合组件
- 🔧 插件支持调整位置,解决依赖加载顺序问题
- 🔄 支持插件重复安装和版本更新
- ⚖️ 负载均衡和故障转移可分开配置
### v2.0.0
- 💾 新增 `ITaskStorage` 接口,支持任务状态持久化
- 📊 管理台实时展示任务监控信息
- 🔄 支持任务修改重建功能
- 🎯 优化 rebalance 算法
### v1.x
- 初始版本,基础数据同步功能
---
## 📄 许可证
本项目采用 [GNU GPL v3.0](https://github.com/mydataharbor/mydataharbor/blob/main/LICENSE) 开源协议。
```
Copyright (C) 2020-2026 xulang <1053618636@qq.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
```
---
如果项目对你有帮助,欢迎 ⭐Star 支持!