1 Star 0 Fork 0

墨城/async_task_demon

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
default_async_task_manager.go 6.49 KB
一键复制 编辑 原始数据 按行查看 历史
墨城 提交于 2023-11-21 15:52 +08:00 . 写库时限链接数只能是xorm
package async_task_manager
import (
"container/list"
"errors"
"sync"
"time"
)
type taskItem struct {
Task AsyncTaskInterface
handle AsyncTaskHandle
CallBack AsyncTaskCallBack
}
func NewDefaultAsyncTaskManager() *DefaultAsyncTaskManager {
result := &TaskManagerResult{
StartTime: 0,
EndTime: 0,
Length: 0,
TagsMap: map[string]AsyncTaskInterface{},
}
return &DefaultAsyncTaskManager{
mu: sync.Mutex{},
taskList: list.New(),
workMap: map[string]*taskItem{},
waitMaxLength: DefaultWaitLength,
workMaxLength: DefaultWorkLength,
result: result,
finishCb: nil,
status: ManagerInit,
taskMaxLen: 0,
}
}
type DefaultAsyncTaskManager struct {
mu sync.Mutex // 任务锁,锁定workMap
taskList *list.List // 等待队列
workMap map[string]*taskItem // 工作列表
waitMaxLength int // 等待队列最大长度
workMaxLength int // 工作队列最大长度
result *TaskManagerResult // 执行结果
finishCb func(*TaskManagerResult) // 所有任务完成后的回调
status uint8 // 状态
taskMaxLen int // 最大任务数,默认为0无限:进度回调与finish回调将不起作用,
progress int // 进度
progressCb ProgressCallBack // 进度回调
parenProgressTime int64
}
func (m *DefaultAsyncTaskManager) SetProgressCallBack(p int, cb ProgressCallBack) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && p > 0 && cb != nil {
m.progress = p
m.progressCb = cb
}
}
func (m *DefaultAsyncTaskManager) SetTaskMaxLength(s int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit {
if s < 1 {
// 0 是非定长任务,进度回调与finish回调将不起作用
s = 0
}
m.taskMaxLen = s
}
}
func (m *DefaultAsyncTaskManager) GetManagerStatus() uint8 {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return m.status
}
func (m *DefaultAsyncTaskManager) GetWaitLength() int {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return m.taskList.Len()
}
func (m *DefaultAsyncTaskManager) Stop() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.status = ManagerStop
m.result.Status = ManagerStop
if len(m.workMap) > 0 {
for _, item := range m.workMap {
item.handle.Stop()
}
}
if m.taskList.Len() > 0 {
for i := 0; i <= m.taskList.Len(); i++ {
e := m.taskList.Back()
item := e.Value.(*taskItem)
m.taskList.Remove(e)
//go m.doStopWorkWithWaiList(item)
item.Task.SetError(UserStopError)
if item.CallBack != nil {
go item.CallBack(item.Task)
}
if m.taskMaxLen > 0 && m.finishCb != nil {
m.result.TagsMap[item.Task.GetTag().String()] = item.Task
}
}
}
m.result.Status = m.status
m.result.EndTime = time.Now().Unix()
// 处理结束
if m.finishCb != nil {
go m.finishCb(m.result)
}
}
func (m *DefaultAsyncTaskManager) Start() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.result.StartTime = time.Now().Unix()
m.status = ManagerStart
}
func (m *DefaultAsyncTaskManager) SetWaitListMaxLength(l int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && l >= 1 {
m.waitMaxLength = l
}
}
func (m *DefaultAsyncTaskManager) SetWorkPoolSize(s int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && s >= 0 {
m.workMaxLength = s
}
}
func (m *DefaultAsyncTaskManager) AddAsyncTask(t AsyncTaskInterface, h AsyncTaskHandle, cbs ...AsyncTaskCallBack) error {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status != ManagerStart {
return errors.New("manager Status is not Start")
}
if t == nil {
return errors.New("task is empty")
}
if t.GetTag() == nil {
return errors.New("task Target is empty")
}
if h == nil {
return errors.New("handle is empty")
}
var cb AsyncTaskCallBack
if cbs != nil && len(cbs) == 1 && cbs[0] != nil {
cb = cbs[0]
} else {
cb = nil
}
if m.taskList.Len() >= m.waitMaxLength {
return errors.New("wait list is full")
}
m.taskList.PushFront(&taskItem{
Task: t,
handle: h,
CallBack: cb,
})
go m.waitToWorker()
return nil
}
func (m *DefaultAsyncTaskManager) SetFinishCallBack(f func(*TaskManagerResult)) {
if f != nil {
m.finishCb = f
}
}
func (m *DefaultAsyncTaskManager) CheckTaskInWork(workTag string) (bool, error) {
if workTag == "" {
return false, errors.New("work Tag is empty")
}
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if _, ok := m.workMap[workTag]; ok {
return true, nil
}
return false, nil
}
func (m *DefaultAsyncTaskManager) doWork(item *taskItem) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.workMap[item.Task.GetTag().String()] = item
// 配置handle回调
item.handle.SetHandleCallBack(m.TaskResultCallBack)
// 执行handle
go item.handle.Start(item.Task)
}
func (m *DefaultAsyncTaskManager) GetWorkerLength() int {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return len(m.workMap)
}
func (m *DefaultAsyncTaskManager) TaskResultCallBack(res AsyncTaskInterface) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
taskTag := res.GetTag().String()
if item, ok := m.workMap[taskTag]; ok {
if item.CallBack != nil {
go item.CallBack(res)
}
delete(m.workMap, taskTag)
res.SetEndUnix(time.Now().Unix())
// 只有定长任务并finishCallBack不为空,才会加入resultMap
if m.taskMaxLen > 0 && m.finishCb != nil {
m.result.TagsMap[taskTag] = res
}
// 处理进度回调
if m.status == ManagerStart && m.progressCb != nil && m.progress > 0 && len(m.result.TagsMap)%m.progress == 0 {
go m.progressCb(&TaskProgress{
WorkNum: m.workMaxLength,
AllTaskNum: m.taskMaxLen,
Progress: len(m.result.TagsMap),
ParentTime: m.parenProgressTime,
})
m.parenProgressTime = time.Now().Unix()
}
// 处理结束
if m.taskMaxLen > 0 && len(m.result.TagsMap) == m.taskMaxLen {
if m.status != ManagerStop {
m.status = ManagerFinish
}
m.result.Status = m.status
m.result.EndTime = time.Now().Unix()
if m.finishCb != nil {
go m.finishCb(m.result)
}
} else {
go m.waitToWorker()
}
}
}
func (m *DefaultAsyncTaskManager) waitToWorker() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status != ManagerStart {
return
}
if len(m.workMap) < m.workMaxLength && m.taskList.Len() > 0 {
e := m.taskList.Back()
item := e.Value.(*taskItem)
m.taskList.Remove(e)
go m.doWork(item)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/qdmc/async_task_demon.git
git@gitee.com:qdmc/async_task_demon.git
qdmc
async_task_demon
async_task_demon
v1.0.0

搜索帮助