代码拉取完成,页面将自动刷新
package gweb
import (
"fmt"
"time"
"gitee.com/makitdone/gx"
"gitee.com/makitdone/gx/slices"
"github.com/robfig/cron/v3"
)
type TaskState string
const (
TaskStatePending TaskState = "pending" // 等待执行
TaskStateRunning TaskState = "running" // 执行中
TaskStatePaused TaskState = "paused" // 暂停
TaskStateStopped TaskState = "stopped" // 停止
TaskStateError TaskState = "error" // 执行出错
TaskStateDone TaskState = "done" // 已完成
)
type TaskHandler struct {
paused bool
channel chan int
pauseCallback func()
resumeCallback func()
stopCallback func()
}
func NewTaskHandler() *TaskHandler {
return &TaskHandler{
channel: make(chan int, 1),
}
}
// 暂停
func (t *TaskHandler) Pause() {
t.paused = true
}
// 恢复
func (t *TaskHandler) Resume() {
t.channel <- 1
}
func (t *TaskHandler) Wait() {
if t.paused {
if t.pauseCallback != nil {
t.pauseCallback()
}
if signal := <-t.channel; signal == 1 {
if t.resumeCallback != nil {
t.resumeCallback()
}
t.paused = false
} else if signal == 2 {
if t.stopCallback != nil {
t.stopCallback()
}
panic("task stopped")
}
}
}
func (t *TaskHandler) Stop() {
t.channel <- 2
}
func (t *TaskHandler) OnPause(callback func()) {
t.pauseCallback = callback
}
func (t *TaskHandler) OnResume(callback func()) {
t.resumeCallback = callback
}
func (t *TaskHandler) OnStop(callback func()) {
t.stopCallback = callback
}
type CronTaskInfo struct {
CronID cron.EntryID `json:"cron_id"`
Key string `json:"key"`
Name string `json:"name"`
CronExpression string `json:"cron_expression"`
Desc string `json:"desc"`
Enabled bool `json:"enabled"`
AllowPause bool `json:"allow_pause"`
AllowStop bool `json:"allow_stop,omitempty"`
State TaskState `json:"state"`
TimeStart *time.Time `json:"time_start,omitempty"`
TimeEnd *time.Time `json:"time_end,omitempty"`
TimeCost float64 `json:"time_cost,omitempty"` // 运行耗时
Error string `json:"error,omitempty"`
Traceback string `json:"traceback,omitempty"`
}
func (info *CronTaskInfo) IsIdle() bool {
return info.State != TaskStateRunning && info.State == TaskStatePaused
}
type IAppTask interface {
GetInfo() *CronTaskInfo
UpdateInfo(info CronTaskInfo)
Run(handler *TaskHandler) error
}
type TaskManager struct {
*cron.Cron
tasks []IAppTask
handlers map[string]*TaskHandler
changedTasks []string
tasksToRemove []string
}
func NewTaskManager() *TaskManager {
return &TaskManager{
Cron: cron.New(),
handlers: make(map[string]*TaskHandler),
changedTasks: make([]string, 0, 10),
tasksToRemove: make([]string, 0, 10),
}
}
func (m *TaskManager) RunTask(task IAppTask) error {
taskInfo := task.GetInfo()
var handler *TaskHandler
if taskInfo.AllowPause || taskInfo.AllowStop {
handler = NewTaskHandler()
defer close(handler.channel)
m.handlers[taskInfo.Key] = handler
}
taskInfo.State = TaskStateRunning
taskInfo.TimeCost = 0.0
timeStart := time.Now()
taskInfo.TimeStart = &timeStart
if err := task.Run(handler); err != nil {
taskInfo.State = TaskStateError
taskInfo.Error = err.Error()
return err
}
taskInfo.State = TaskStateDone
taskInfo.TimeEnd = gx.Ptr(time.Now())
taskInfo.TimeCost = time.Since(timeStart).Seconds()
return nil
}
func (m *TaskManager) PauseTask(task IAppTask) {
taskInfo := task.GetInfo()
if taskInfo.State == TaskStateRunning && taskInfo.AllowPause {
taskInfo.State = TaskStatePaused
handler := m.handlers[taskInfo.Key]
if handler != nil {
handler.Pause()
}
}
}
func (m *TaskManager) ResumeTask(task IAppTask) {
taskInfo := task.GetInfo()
if taskInfo.State == TaskStatePaused {
handler := m.handlers[taskInfo.Key]
if handler != nil {
handler.Resume()
taskInfo.State = TaskStateRunning
}
}
}
func (m *TaskManager) StopTask(task IAppTask) error {
taskInfo := task.GetInfo()
if taskInfo.State == TaskStateRunning || taskInfo.State == TaskStatePaused {
handler := m.handlers[taskInfo.Key]
if handler != nil {
handler.Stop()
taskInfo.State = TaskStateStopped
}
}
return nil
}
func (m *TaskManager) AddTask(task IAppTask) (IAppTask, error) {
m.tasks = append(m.tasks, task)
taskInfo := task.GetInfo()
if !taskInfo.Enabled {
return task, nil
}
taskInfo.State = TaskStatePending
taskId, _ := m.Cron.AddFunc(taskInfo.CronExpression, func() {
defer func() {
if e := recover(); e != nil {
taskInfo.State = TaskStateError
taskInfo.Error = e.(error).Error()
err := fmt.Errorf("%v", e)
taskInfo.Traceback = fmt.Sprintf("Traceback:\n%+v", err)
}
}()
m.RunTask(task)
if slices.Contains(m.changedTasks, taskInfo.Key) {
m.Cron.Remove(taskInfo.CronID)
m.changedTasks = slices.DeleteFunc(m.changedTasks, func(el string) bool {
return el == taskInfo.Key
})
m.AddTask(task)
}
})
taskInfo.CronID = taskId
return task, nil
}
func (m *TaskManager) UpdateTask(task IAppTask) {
m.changedTasks = append(m.changedTasks, task.GetInfo().Key)
}
func (m *TaskManager) RegisterRoutes(router *RouterGroup) {
router.Config("", func(router *RouterGroup) {
router.GET("/system/tasks", func(c *Context) {
app := c.App
tasks := []CronTaskInfo{}
for _, task := range app.Tasks.tasks {
tasks = append(tasks, *task.GetInfo())
}
c.SuccessWithData(tasks)
})
router.POST("/system/task/pause", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
if !taskInfo.AllowPause {
c.Fail("任务不支持暂停")
return
}
manager.PauseTask(task)
break
}
}
c.Success()
})
router.POST("/system/task/resume", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
if !taskInfo.AllowPause {
c.Fail("任务不支持暂停和恢复")
return
}
if taskInfo.State != TaskStatePaused {
c.Fail("任务并未暂停")
return
}
manager.ResumeTask(task)
break
}
}
c.Success()
})
router.POST("/system/task/stop", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
manager.StopTask(task)
break
}
}
c.Success()
})
router.POST("/system/task/start", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
go func() {
manager.RunTask(task)
}()
break
}
}
c.Success()
})
router.POST("/system/task/enable", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
taskInfo.Enabled = true
manager.AddTask(task)
break
}
}
c.Success()
})
router.POST("/system/task/disable", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
taskInfo.Enabled = false
manager.changedTasks = append(manager.changedTasks, taskInfo.Key)
break
}
}
c.Success()
})
// 标记为信息更改,用于重新加载任务
router.POST("/system/task/reload", func(c *Context) {
var req struct {
Key string `json:"key"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if req.Key == "" || taskInfo.Key == req.Key {
if taskInfo.State == TaskStatePending || taskInfo.State == TaskStateDone {
manager.Remove(taskInfo.CronID)
delete(manager.handlers, taskInfo.Key)
manager.AddTask(task)
} else {
manager.changedTasks = append(manager.changedTasks, taskInfo.Key)
}
if req.Key != "" {
break
}
}
}
c.Success()
})
router.PUT("/system/task", func(c *Context) {
var req struct {
Key string `json:"key"`
Name string `json:"name"`
CronExpression string `json:"cron_expression"`
Enabled bool `json:"enabled"`
Desc string `json:"desc"`
}
c.ShouldBindJSON(&req)
app := c.App
manager := app.Tasks
for _, task := range manager.tasks {
taskInfo := task.GetInfo()
if taskInfo.Key == req.Key {
taskInfo.Name = req.Name
taskInfo.CronExpression = req.CronExpression
taskInfo.Enabled = req.Enabled
taskInfo.Desc = req.Desc
manager.UpdateTask(task)
break
}
}
c.Success()
})
})
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。