# async_task_demon **Repository Path**: qdmc/async_task_demon ## Basic Information - **Project Name**: async_task_demon - **Description**: No description available - **Primary Language**: Go - **License**: LGPL-2.1 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-11-17 - **Last Updated**: 2023-12-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 异步任务管理器 ## OneToManyTaskManagerInterface: mapReduce的简单实现 - oneToMany : 一个task执行多个handle,每一个oneToManyTask就是执行一个定长的AsyncTaskManager - 应用场景:数据汇总,消息分发 ## AsyncTaskManagerInterface: 事件回调模式的异步任务 - 可以定长任务 - 可以Stop - todo:从等待队列里删除 - todo:任务超时处理 ## 工作原理 - 在加入任务时或任务完成时都会触发"任务从等待队列转到工作队列"的动作,并有触发条件与互斥锁保证了任务不会丢失与不会重复 - 任务管理者(有默认实现) - 任务执行者interface - 任务(有默认实现) ## 注意 - 大并发操作DB是,一定注意链接数问题 - 常用的orm,经测试,只有xorm能锁定链接数 ## 测试异步写库 - orm: xorm,1000的链接数 - 总任务:100000 - 并行数:1000 - 任务执行者:40次写入 - 用时:261秒,写入4000000条记录 ~~~ go // AsyncTaskManagerInterface 异步任务管理器通用接口 type AsyncTaskManagerInterface interface { Start() // 开始 Stop() // 停止 SetWaitListMaxLength(l int) // 设置等待队列的最大长度,默认:1000 SetWorkPoolSize(s int) // 设置并行的数量,默认:100 AddAsyncTask(t AsyncTaskInterface, h AsyncTaskHandle, cb ...AsyncTaskCallBack) error // 增加一个异步任务 SetFinishCallBack(f func(*TaskManagerResult)) // 设置任务完成的回调 CheckTaskInWork(tag string) (bool, error) // 查验任务是否在执行 GetWaitLength() int // 获取当前等待队列长度 GetWorkerLength() int // 获取当前工作队列长度 GetManagerStatus() uint8 // 获取管理器状态 SetTaskMaxLength(s int) // 设置最大任务数 SetProgressCallBack(p int, cb ProgressCallBack) // 设置进度回调 } type AsyncTaskInterface interface { GetTag() *TaskTag // 任务全局唯一标识 SetStartUnix(end int64) // 设置任务开始时间 GetStartUnix() int64 // 获取任务开始时间 SetEndUnix(end int64) // 设置任务结束时间 GetEndUnix() int64 // 获取任务结束时间 SetStatus(s uint8) // 设置当前任务状态 GetStatus() uint8 // 获取当前任务状态 SetError(e error) GetError() error } type AsyncTaskHandle interface { Start(AsyncTaskInterface) // 开始执行任务 Stop() // 结束任务,然后拉起回调,返回enmu.UserStopError SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调 } // AsyncTaskCallBack 异步任务回调 type AsyncTaskCallBack func(AsyncTaskInterface) // TaskManagerResult 任务管理器执行结果 type TaskManagerResult struct { StartTime int64 EndTime int64 Length uint64 TagsMap map[string]AsyncTaskInterface Status uint8 } // ProgressCallBack 进度回调 type ProgressCallBack func(*TaskProgress) // TaskProgress 进度信息 type TaskProgress struct { WorkNum int // 任务并行数 AllTaskNum int // 总任务数,如果没有SetTaskMaxLength(),返回0 Progress int // 当前完成个数 ParentTime int64 // 上次进度回调的时间 } // OneToManyTaskInterface 一对多异步任务通用接口 type OneToManyTaskInterface interface { AsyncTaskInterface SetTaskTag(tag *TaskTag) GetHandleTag() *TaskTag SetHandleTag(tag *TaskTag) GetHandleResult() interface{} SetHandleResult(res interface{}) Copy() OneToManyTaskInterface } // OneToManyTaskHandleInterface 一对多异步任务Handle通用接口 type OneToManyTaskHandleInterface interface { Start(AsyncTaskInterface) // 开始执行任务 Stop() // 结束任务,然后拉起回调,返回enmu.UserStopError SetHandleCallBack(f AsyncTaskCallBack) // 配置执行的回调 GetTarget() *TaskTag // 同一任务中唯一 } // OneToManyTaskManagerInterface 一对多异步任务管理通用接口 type OneToManyTaskManagerInterface interface { Start() // 开始 Stop() // 停止 SetWaitListMaxLength(l int) // 设置等待队列的最大长度,默认:1000 SetWorkPoolSize(s int) // 设置并行的数量,默认:100 AddAsyncTask(t OneToManyTaskInterface, cb func(res *OneToManyTaskResult), handles ...OneToManyTaskHandleInterface) error // 增加一个异步任务 CheckTaskInWork(tag string) (bool, error) // 查验任务是否在执行 GetWaitLength() int // 获取当前等待队列长度 GetWorkerLength() int // 获取当前工作队列长度 GetManagerStatus() uint8 // 获取管理器状态 } // OneToManyTaskResult 一对多异步任务 执行结果 type OneToManyTaskResult struct { StartTime int64 EndTime int64 Status uint8 Task OneToManyTaskInterface ResultMap map[string]interface{} } ~~~