代码拉取完成,页面将自动刷新
package cCommand
import (
"fmt"
"reflect"
"time"
"github.com/robfig/cron/v3"
"gitee.com/csingo/cLog"
)
func dispatch() {
var err error
var id cron.EntryID
syncCommands := make([]*CommandConf_Command, 0)
asyncCommands := make([]*CommandConf_Command, 0)
for _, command := range command_config.Commands {
if err != nil {
cLog.WithContext(nil, map[string]any{
"source": "cCommand.dispatch",
"command": command,
"err": err.Error(),
}, []cLog.Option{
cLog.IncludePaths{},
}).Panic("任务ID保存失败")
continue
}
switch command.Mode {
case CommandMode_Sync:
syncCommands = append(syncCommands, command)
case CommandMode_Single:
fallthrough
case CommandMode_Multi:
// 判断是否需要立即执行
if command.Instant {
syncCommands = append(syncCommands, command)
}
asyncCommands = append(asyncCommands, command)
default:
}
}
// 优先执行同步串行任务
for _, command := range syncCommands {
if command.Mode == CommandMode_Sync || command.Instant {
err = run(command)
if err != nil {
cLog.WithContext(nil, map[string]any{
"source": "cCommand.dispatch",
"command": command,
"err": err.Error(),
}, []cLog.Option{
cLog.IncludePaths{},
}).Panic("同步串行任务执行失败")
}
}
}
// 执行异步串行定时任务
for _, command := range asyncCommands {
f := handler(command)
id, err = container.cron.AddFunc(command.Cron, f)
if err != nil {
cLog.WithContext(nil, map[string]any{
"source": "cCommand.dispatch",
"command": command,
"err": err.Error(),
}, []cLog.Option{
cLog.IncludePaths{},
}).Error("异步任务添加失败")
continue
}
command.SetID(id)
}
container.cron.Start()
}
func handler(command *CommandConf_Command) func() {
return func() {
defer func() {
if !command.Runnable() {
container.stop <- command.id
}
}()
_ = run(command)
// err := run(command)
// if err != nil {
// cLog.WithContext(nil, map[string]any{
// "source": "cCommand.handler",
// "command": command,
// "err": err.Error(),
// }).Error("异步任务执行失败")
// }
}
}
func run(item *CommandConf_Command) (err error) {
var runnable, locked bool
defer func() {
if locked {
item.Unlock()
}
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
if err != nil {
cLog.WithContext(nil, map[string]any{
"err": err.Error(),
"command": item,
}).Error("任务执行失败")
}
}()
// 判断是否需要串行
if item.Mode == CommandMode_Single {
locked = item.TryLock()
if !locked {
return
}
}
// 判断是否已经达到执行次数限制
runnable = item.Runnable()
if !runnable {
return
}
item.Incr()
index := fmt.Sprintf(instance_name_format, item.App, item.Command)
instance := container.Get(index)
if instance == nil {
return fmt.Errorf("command not found")
}
var params = make([]reflect.Value, 0)
instanceMethod := reflect.ValueOf(instance).MethodByName(item.Method)
in := instanceMethod.Type().NumIn()
out := instanceMethod.Type().NumOut()
if out > 1 || (out == 1 && !instanceMethod.Type().Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem())) {
return fmt.Errorf("任务异常, 必须无返回值, 或只返回 error")
}
for i := 0; i < in; i++ {
if len(item.Options) <= i {
item.Options = append(item.Options, "")
}
params = append(params, reflect.ValueOf(item.Options[i]))
}
if item.Wait > 0 {
cLog.WithContext(nil, map[string]any{
"source": "cCommand.run",
"item": item,
}, []cLog.Option{
cLog.IncludePaths{},
}).Trace("任务执行前等待")
time.Sleep(time.Duration(item.Wait) * time.Second)
}
cLog.WithContext(nil, map[string]any{
"source": "cCommand.run",
"item": item,
}, []cLog.Option{
cLog.IncludePaths{},
}).Trace("执行任务")
result := instanceMethod.Call(params)
if out == 1 && !result[0].IsNil() {
err = result[0].Interface().(error)
}
return
}
func clears() {
for {
select {
case id := <-container.stop:
container.cron.Remove(id)
// list := container.cron.Entries()
// log.Println(list)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。