代码拉取完成,页面将自动刷新
package async_task_manager
import (
"container/list"
"errors"
"sync"
"time"
)
type taskItem struct {
Task AsyncTaskInterface
handle AsyncTaskHandle
CallBack AsyncTaskCallBack
}
func NewDefaultAsyncTaskManager() *DefaultAsyncTaskManager {
result := &TaskManagerResult{
StartTime: 0,
EndTime: 0,
Length: 0,
TagsMap: map[string]AsyncTaskInterface{},
}
return &DefaultAsyncTaskManager{
mu: sync.Mutex{},
taskList: list.New(),
workMap: map[string]*taskItem{},
waitMaxLength: DefaultWaitLength,
workMaxLength: DefaultWorkLength,
result: result,
finishCb: nil,
status: ManagerInit,
taskMaxLen: 0,
}
}
type DefaultAsyncTaskManager struct {
mu sync.Mutex // 任务锁,锁定workMap
taskList *list.List // 等待队列
workMap map[string]*taskItem // 工作列表
waitMaxLength int // 等待队列最大长度
workMaxLength int // 工作队列最大长度
result *TaskManagerResult // 执行结果
finishCb func(*TaskManagerResult) // 所有任务完成后的回调
status uint8 // 状态
taskMaxLen int // 最大任务数,默认为0无限:进度回调与finish回调将不起作用,
progress int // 进度
progressCb ProgressCallBack // 进度回调
parenProgressTime int64
}
func (m *DefaultAsyncTaskManager) SetProgressCallBack(p int, cb ProgressCallBack) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && p > 0 && cb != nil {
m.progress = p
m.progressCb = cb
}
}
func (m *DefaultAsyncTaskManager) SetTaskMaxLength(s int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit {
if s < 1 {
// 0 是非定长任务,进度回调与finish回调将不起作用
s = 0
}
m.taskMaxLen = s
}
}
func (m *DefaultAsyncTaskManager) GetManagerStatus() uint8 {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return m.status
}
func (m *DefaultAsyncTaskManager) GetWaitLength() int {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return m.taskList.Len()
}
func (m *DefaultAsyncTaskManager) Stop() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.status = ManagerStop
m.result.Status = ManagerStop
if len(m.workMap) > 0 {
for _, item := range m.workMap {
item.handle.Stop()
}
}
if m.taskList.Len() > 0 {
for i := 0; i <= m.taskList.Len(); i++ {
e := m.taskList.Back()
item := e.Value.(*taskItem)
m.taskList.Remove(e)
//go m.doStopWorkWithWaiList(item)
item.Task.SetError(UserStopError)
if item.CallBack != nil {
go item.CallBack(item.Task)
}
if m.taskMaxLen > 0 && m.finishCb != nil {
m.result.TagsMap[item.Task.GetTag().String()] = item.Task
}
}
}
m.result.Status = m.status
m.result.EndTime = time.Now().Unix()
// 处理结束
if m.finishCb != nil {
go m.finishCb(m.result)
}
}
func (m *DefaultAsyncTaskManager) Start() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.result.StartTime = time.Now().Unix()
m.status = ManagerStart
}
func (m *DefaultAsyncTaskManager) SetWaitListMaxLength(l int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && l >= 1 {
m.waitMaxLength = l
}
}
func (m *DefaultAsyncTaskManager) SetWorkPoolSize(s int) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status == ManagerInit && s >= 0 {
m.workMaxLength = s
}
}
func (m *DefaultAsyncTaskManager) AddAsyncTask(t AsyncTaskInterface, h AsyncTaskHandle, cbs ...AsyncTaskCallBack) error {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status != ManagerStart {
return errors.New("manager Status is not Start")
}
if t == nil {
return errors.New("task is empty")
}
if t.GetTag() == nil {
return errors.New("task Target is empty")
}
if h == nil {
return errors.New("handle is empty")
}
var cb AsyncTaskCallBack
if cbs != nil && len(cbs) == 1 && cbs[0] != nil {
cb = cbs[0]
} else {
cb = nil
}
if m.taskList.Len() >= m.waitMaxLength {
return errors.New("wait list is full")
}
m.taskList.PushFront(&taskItem{
Task: t,
handle: h,
CallBack: cb,
})
go m.waitToWorker()
return nil
}
func (m *DefaultAsyncTaskManager) SetFinishCallBack(f func(*TaskManagerResult)) {
if f != nil {
m.finishCb = f
}
}
func (m *DefaultAsyncTaskManager) CheckTaskInWork(workTag string) (bool, error) {
if workTag == "" {
return false, errors.New("work Tag is empty")
}
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if _, ok := m.workMap[workTag]; ok {
return true, nil
}
return false, nil
}
func (m *DefaultAsyncTaskManager) doWork(item *taskItem) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
m.workMap[item.Task.GetTag().String()] = item
// 配置handle回调
item.handle.SetHandleCallBack(m.TaskResultCallBack)
// 执行handle
go item.handle.Start(item.Task)
}
func (m *DefaultAsyncTaskManager) GetWorkerLength() int {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
return len(m.workMap)
}
func (m *DefaultAsyncTaskManager) TaskResultCallBack(res AsyncTaskInterface) {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
taskTag := res.GetTag().String()
if item, ok := m.workMap[taskTag]; ok {
if item.CallBack != nil {
go item.CallBack(res)
}
delete(m.workMap, taskTag)
res.SetEndUnix(time.Now().Unix())
// 只有定长任务并finishCallBack不为空,才会加入resultMap
if m.taskMaxLen > 0 && m.finishCb != nil {
m.result.TagsMap[taskTag] = res
}
// 处理进度回调
if m.status == ManagerStart && m.progressCb != nil && m.progress > 0 && len(m.result.TagsMap)%m.progress == 0 {
go m.progressCb(&TaskProgress{
WorkNum: m.workMaxLength,
AllTaskNum: m.taskMaxLen,
Progress: len(m.result.TagsMap),
ParentTime: m.parenProgressTime,
})
m.parenProgressTime = time.Now().Unix()
}
// 处理结束
if m.taskMaxLen > 0 && len(m.result.TagsMap) == m.taskMaxLen {
if m.status != ManagerStop {
m.status = ManagerFinish
}
m.result.Status = m.status
m.result.EndTime = time.Now().Unix()
if m.finishCb != nil {
go m.finishCb(m.result)
}
} else {
go m.waitToWorker()
}
}
}
func (m *DefaultAsyncTaskManager) waitToWorker() {
m.mu.Lock()
defer func() {
m.mu.Unlock()
}()
if m.status != ManagerStart {
return
}
if len(m.workMap) < m.workMaxLength && m.taskList.Len() > 0 {
e := m.taskList.Back()
item := e.Value.(*taskItem)
m.taskList.Remove(e)
go m.doWork(item)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。