diff --git a/admin-front/README.md b/admin-front/README.md index ffb28c26ef32a71c6bc1b84861efdec895c772b5..078feb3204e62478e60f391629dcec11c531adf4 100644 --- a/admin-front/README.md +++ b/admin-front/README.md @@ -1,44 +1,239 @@ -# element-plus-vite-starter -> A starter kit for Element Plus with Vite -Element Plus +# DataFlow + +DataFlow 是一个数据处理平台,包含调度器(scheduler)和执行器(executor)两部分。调度器负责管理任务调度和执行器状态,执行器负责具体的数据读写任务。 + +## 项目结构 + +- **admin-front**:前端管理界面,使用 Vue3 + TypeScript + Vite + ElementPlus 开发。 +- **dataflow-common**:调度器和执行器共用的公共类,包含数据结构、异常处理、实体类等。 +- **dataflow-executor**:执行器核心模块,包含数据源连接、任务运行、通道读写等实现。 +- **dataflow-scheduler**:调度器核心模块,负责任务调度、定时任务、执行器管理等。 +- **executor-sample**:执行器使用示例,包含自定义拦截器实现。 + +## 功能特性 + +- 支持多数据源(MySQL、Oracle � + - 支持自定义数据源扩展 +- 提供高效的数据传输通道(MemoryChannel) +- 支持任务拦截器扩展(启动、读取后、写入后) + - 提供示例:数据脱敏、增量保存 +- 支持任务调度和定时任务管理 +- 提供丰富的监控信息 + - 执行器运行状态(CPU、内存、文件描述符) + - 任务统计信息(吞吐量、阻塞时间等) +- 支持任务运行时动态调整 + - 任务启动参数修改 + - 读取/写入后处理逻辑扩展 + +## 模块说明 + +### 调度器(dataflow-scheduler) + +- 提供 REST API 箍�이터管理 + - 数据源管理 + - 任务管理(增删改查、运行、终止) + - 任务组管理 + - 任务统计信息查询 +- 基于时间轮(TimeWheel)实现定时任务调度 +- 使用 MyBatis Plus 进行数据持久化 +- 提供执行器健康检查和负载均衡 + +### 执行器(dataflow-executor) + +- 实现 JDBC 数据读写 + - Oracle/MySQL 读写支持 + - 支持扩展其他数据库 +- 提供内存通道(MemoryChannel)用于数据缓存 +- 支持任务动态拦截 + - 启动前处理(AbstractTaskLaunchInterceptor) + - 读取后处理(AbstractAfterReadInterceptor) + - 写入后处理(AbstractAfterWriteInterceptor) +- 提供任务超时检测和异常处理 +- 支持与调度器通信 + - 注册/心跳检测 + - 任务状态上报 + - 任务终止处理 + +## 使用示例 + +### 自定义读取后拦截器(脱敏示例) + +```java +@Slf4j +@Component("MyAfterReadInterceptor") +@Scope("prototype") +public class MyAfterReadInterceptor extends AbstractAfterReadInterceptor { + private static final List SALTS = Arrays.asList("a", "b", "c", "d", "e"); + private static final int SALTS_SIZE = SALTS.size(); + + @Override + public void onAfterRead(Row row) { + Column cardNoColumn = row.getColumnByName("card_no"); + if (cardNoColumn != null && !cardNoColumn.asString().isEmpty()) { + String cardNo = cardNoColumn.asString(); + String mpi = calcMpi(cardNo); + ((StringColumn) cardNoColumn).setValue(mpi); + } + } + + private static String calcMpi(String cardNo) { + int index = hash(cardNo) % SALTS_SIZE; + String salt = SALTS.get(index); + return Md5Util.md5(cardNo + salt); + } + + private static int hash(String cardNo) { + // 实现哈希算法 + } +} +``` -- Preview: +### 自定义写入后拦截器(增量保存示例) + +```java +@Slf4j +@Component("MyAfterWriteInterceptor") +@Scope("prototype") +public class MyAfterWriteInterceptor extends AbstractAfterWriteInterceptor { + private final Map maxIdtMap = new ConcurrentHashMap<>(); + + @Override + public void onAfterWrite(List writtenRows, List failedRows) { + // 实现增量保存逻辑 + for (Row row : writtenRows) { + Column idtColumn = row.getColumnByName("idt"); + if (idtColumn != null) { + Timestamp currentIdt = (Timestamp) idtColumn.getValue(); + Timestamp maxIdt = maxIdtMap.getOrDefault("maxIdt", currentIdt); + if (currentIdt.after(maxIdt)) { + maxIdtMap.put("maxIdt", currentIdt); + } + } + } + } + + @Override + public void onWriteFinished() { + // 最终保存最大时间戳 + saveMaxIdtToDb(); + } + + private void saveMaxIdtToDb() { + // 实现将最大时间戳保存到数据库 + } +} +``` -This is an example of on-demand element-plus with [unplugin-vue-components](https://github.com/antfu/unplugin-vue-components). +## 开发部署 -> If you want to import all, it may be so simple that no examples are needed. Just follow [quickstart | Docs](https://element-plus.org/zh-CN/guide/quickstart.html) and import them. +### 环境要求 -If you just want an on-demand import example `manually`, you can check [unplugin-element-plus/examples/vite](https://github.com/element-plus/unplugin-element-plus/tree/main/examples/vite). +- Java 1.8+ +- MySQL 5.7+ / Oracle 12c+ +- Redis 3.0+ (用于分布式锁) +- Maven 3.5+ +- Node.js 14+ (前端需要) -## Project setup +### 部署架构 -```bash -npm install +``` ++------------------+ +-------------------+ +| | | | +| Scheduler |<----->| Executor | +| (任务调度) | | (数据处理) | +| - 定时任务 | | - JDBC 连接 | +| - 任务分发 | | - 数据通道 | +| - 状态监控 | | - 拦截器扩展 | ++------------------+ +-------------------+ + | | + | | + +----------+---------+-------+ + | + +---+---+ + | | + | Redis | + +-------+ ``` -### Compiles and hot-reloads for development +## 调度器 API 接口 -```bash -npm run dev -``` +### 任务管理 -### Compiles and minifies for production +- POST /admin/task-info/addOrUpdate - 新增/更新任务 +- GET /admin/task-info/run/{taskId} - 运行单个任务 +- POST /admin/task-info/runTasks - 运行多个任务 +- GET /admin/task-info/enableTimer/{taskId} - 启用定时任务 +- GET /admin/task-info/disableTimer/{taskId} - 禁用定时任务 -```bash -npm run build -``` +### 数据源管理 + +- POST /admin/ds/addOrUpdate - 新增/更新数据源 +- GET /admin/ds/find/{id} - 查询数据源 +- POST /admin/ds/paging - 分页查询数据源 + +### 执行器监控 + +- POST /admin/executor-info/paging - 执行器运行状态监控 +- GET /admin/executor-info/{uuid}/jvmInfo - 执行器 JVM 信息 +- GET /admin/executor-info/{uuid}/taskStates - 执行器任务状态 + +## 执行器 API 接口 + +- GET /ping - 健康检查 +- GET /jvmInfo - JVM 信息 +- GET /getRuntimeInfo - 运行时信息 +- POST /task/run - 启动任务 +- GET /task/terminate/{taskId} - 终止任务 +- GET /task/statistic/{taskId} - 任务统计信息 +- POST /task/statistic/batch - 批量任务统计信息 +- GET /task/running - 运行中任务列表 -## Usage +## 执行器通信协议 -```bash -git clone https://github.com/element-plus/element-plus-vite-starter -cd element-plus-vite-starter -npm i -npm run dev +### 注册接口 + +- POST /register +- Body: ExecutorInfo 对象 +- 返回: R 对象 + +### 任务状态上报 + +- POST /state/error/{uuid}/{taskId}/{taskRecordId} + - 参数: errorMsg 错误信息 +- POST /state/finished/{uuid}/{taskId}/{taskRecordId} +- POST /state/timeout/{uuid}/{taskId}/{taskRecordId} + +## 项目依赖 + +```xml + + com.etl.dataflow + dataflow-common + + + + com.etl.dataflow + dataflow-scheduler + + + + com.etl.dataflow + dataflow-executor + ``` -### Custom theme +## 许可协议 + +本项目使用 MIT 许可协议,请查看 LICENSE 文件获取详细信息。 + +## 贡献指南 + +1. Fork 本项目 +2. 创建 feature 分支 +3. 实现新功能 +4. 提交代码 +5. 创建 Pull Request -See `src/styles/element/index.scss`. +请遵循项目代码规范,保持代码简洁清晰。对于重大功能更新,建议先提交 issue 讨论实现方案。 \ No newline at end of file