# micro-dispatch **Repository Path**: hu-xuewei/micro-dispatch ## Basic Information - **Project Name**: micro-dispatch - **Description**: No description available - **Primary Language**: Go - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-06-09 - **Last Updated**: 2025-06-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # micro-dispatch ## 简介 `micro-dispatch` 是一个基于有向无环图(DAG)实现任务并行调度的轻量级Go语言工具库。它通过 `Container` 管理任务图,`Engine` 协调任务调度,`WorkerPool` 实现任务的并行执行,从而高效地完成任务调度工作。该项目适用于需要处理复杂任务依赖关系并实现高效并行执行的场景。 ## 特点 - **基于DAG的任务依赖管理**:自动处理任务之间的依赖关系 - **高效的并行执行**:利用WorkerPool实现任务的高效并行处理 - **灵活的扩展性**:支持自定义Parser、DataStore和Pipeline - **简洁的API设计**:易于集成和使用 - **安全的并发控制**:内置并发安全机制 ## 架构图 ```mermaid graph TD classDef core fill:#e1f5fe,stroke:#4fc3f7,stroke-width:2px; classDef component fill:#f3e5f5,stroke:#ce93d8,stroke-width:2px; classDef external fill:#fff8e1,stroke:#ffcc80,stroke-width:2px; A[用户代码]:::external --> B[Engine]:::core B --> C[Container]:::core B --> D[ThreadPool]:::core B --> E[Parser]:::core B --> F[DataStore]:::core B --> G[Pipeline]:::core C --> H[任务依赖图]:::component D --> I[Worker队列]:::component E --> J[结构解析]:::component F --> K[数据存储]:::component G --> L[任务处理中间件]:::component ``` ## 软件架构 ### 核心组件 1. **`unit` 模块**:定义了 `DispatchUnit` 接口,作为调度任务的基本单元,每个任务需实现 `Run` 方法。 2. **`container` 模块**:负责管理任务单元的注册、检查任务图是否存在环,以及获取任务的入度和依赖关系。 3. **`engine` 模块**:协调任务的调度过程,包括检查任务图、创建任务和提交任务到工作池。 4. **`threadpool` 模块**:实现工作池,管理任务队列和工作协程,支持任务的并行执行。 5. **`parser` 模块**:解析任务单元的输入输出字段,为任务调度提供依赖信息。 6. **`datastore` 模块**:管理任务执行过程中的数据存储和传递。 7. **`pipeline` 模块**:实现任务执行前后的处理流程,支持中间件机制。 ### 核心模块流程图 ```mermaid graph LR classDef process fill:#E5F6FF,stroke:#73A6FF,stroke-width:2px; classDef decision fill:#FFF6CC,stroke:#FFBC52,stroke-width:2px; A([初始化]):::process --> B(定义任务单元):::process B --> C(实现 DispatchUnit 接口):::process C --> D(注册任务到 Container):::process D --> E{检查任务图是否有环?}:::decision E -->|是| F(抛出错误):::process E -->|否| G(Engine 准备调度):::process G --> H(提交任务到 WorkPool):::process H --> I(WorkPool 分配任务到 Worker):::process I --> J(Worker 执行任务):::process J --> K{所有任务完成?}:::decision K -->|否| H(提交任务到 WorkPool):::process K -->|是| L([结束]):::process M(Container):::process --> N(维护任务依赖关系):::process O(Engine):::process --> P(协调调度流程):::process Q(WorkPool):::process --> R(管理任务队列):::process Q --> S(管理工作协程):::process ``` ## 安装 ```bash # 克隆项目 git clone https://gitee.com/hu-xuewei/micro-dispatch.git cd micro-dispatch # 或者直接通过go get安装 go get gitee.com/hu-xuewei/micro-dispatch ``` ## 快速开始 ### 基本用法 ```go package main import ( "fmt" "gitee.com/hu-xuewei/micro-dispatch/core" "gitee.com/hu-xuewei/micro-dispatch/unit" ) // 定义任务A - 没有依赖 type TaskA struct { // 输出字段 Output struct { Result string `output:"result_a"` } } func (t *TaskA) Run() error { t.Output.Result = "TaskA完成" fmt.Println("执行任务A") return nil } // 定义任务B - 依赖任务A的输出 type TaskB struct { // 输入字段 Input struct { ResultA string `input:"result_a"` } // 输出字段 Output struct { Result string `output:"result_b"` } } func (t *TaskB) Run() error { fmt.Println("执行任务B,使用A的结果:", t.Input.ResultA) t.Output.Result = "TaskB完成" return nil } func main() { // 创建引擎 engine := core.NewDefaultEngine() // 创建任务 taskA := &TaskA{} taskB := &TaskB{} // 注册任务 engine.RegisterUnits([]unit.DispatchUnit{taskA, taskB}) // 启动引擎 if err := engine.Run(); err != nil { panic(err) } } ``` ### 高级用法 #### 自定义线程池 ```go // 创建自定义线程池配置 threadPool := threadpool.NewWorkerPool( 10, // 最小工作线程数 20, // 最大工作线程数 threadpool.WithQueueSize(200), // 任务队列大小 threadpool.WithDefaultTimeout(60 * time.Second), // 任务超时时间 ) // 使用自定义线程池创建引擎 engine := core.NewDefaultEngine(core.WithThreadPool(threadPool)) ``` #### 添加管道处理器 ```go // 定义前置处理器 preHandler := func(ctx context.Context, u unit.DispatchUnit) error { fmt.Printf("执行任务前: %T\n", u) return nil } // 定义后置处理器 postHandler := func(ctx context.Context, u unit.DispatchUnit) error { fmt.Printf("执行任务后: %T\n", u) return nil } // 添加处理器到引擎 engine.UsePrePipeline(preHandler) engine.UsePostPipeline(postHandler) ``` #### 自定义数据存储 ```go // 创建自定义数据存储 customStore := datastore.NewShardedDataStore(20) // 20个分片 // 使用自定义数据存储创建引擎 engine := core.NewDefaultEngine(core.WithDataStore(customStore)) ``` ## 设计理念 `micro-dispatch`的设计遵循以下原则: 1. **简单性**:提供简洁明了的API,降低使用门槛 2. **扩展性**:核心组件均可扩展和自定义 3. **可靠性**:提供完善的错误处理和容错机制 4. **高效性**:优化任务调度和执行效率 ## 常见问题 ### 如何检测任务图中的循环依赖? `Container`组件会在执行前自动检查任务图是否存在循环依赖,如果存在则会返回错误。 ### 如何处理任务执行失败? 可以通过自定义`Pipeline`处理器来捕获和处理任务执行异常。 ### 如何设置任务超时? 可以通过`WithTimeout`选项设置整个引擎的默认任务超时时间,也可以针对特定任务在线程池中设置不同的超时策略。 ## 参与贡献 1. Fork 本仓库 2. 新建 `Feat_xxx` 分支 3. 提交代码 4. 新建 Pull Request ## 许可证 [MIT](LICENSE)