代码拉取完成,页面将自动刷新
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"reflect"
"strconv"
"sync"
"time"
"gitee.com/linqwen/momo/app/job/model"
"gitee.com/linqwen/momo/base"
"gitee.com/linqwen/momo/utils"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
)
// 全局变量
var (
c *cron.Cron
registeredJobs = make(map[int64]int) // 使用 map 来追踪已注册的任务
mu sync.Mutex // 保护 map 的并发访问
)
// 初始化 cron 调度器
func init() {
c = cron.New()
c.Start()
LoadSchedules()
}
type scheduleService struct {
base.BaseService[model.ScheduleEntity]
taskStatusLock sync.RWMutex // 使用读写锁来优化并发控制
taskStatus map[int64]string // 任务ID与状态的映射
taskCancelFuncsLock sync.Mutex
taskCancelFuncs map[int64]context.CancelFunc
}
func NewScheduleService() *scheduleService {
return &scheduleService{
BaseService: *base.NewBaseService(JobDb, model.ScheduleEntity{}),
taskStatus: make(map[int64]string),
taskCancelFuncs: make(map[int64]context.CancelFunc),
}
}
func (bs *scheduleService) EndTask(id string) error {
taskId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
log.Printf("ID conversion error: %v", err)
return fmt.Errorf("endTask error: invalid task ID")
}
bs.taskCancelFuncsLock.Lock()
defer bs.taskCancelFuncsLock.Unlock()
if cancel, ok := bs.taskCancelFuncs[taskId]; ok {
cancel()
delete(bs.taskCancelFuncs, taskId)
bs.taskStatusLock.Lock()
delete(bs.taskStatus, taskId)
bs.taskStatusLock.Unlock()
return nil
}
return fmt.Errorf("endTask error: task not found")
}
// 加载所有的调度任务
func LoadSchedules() error {
var schedules []model.ScheduleEntity
if err := JobDb.Where("status = ?", 1).Find(&schedules).Order("id DESC").Error; err != nil {
fmt.Printf("Failed to load schedules: %v\n", err)
return err
}
// 更新所有 status != 1 的调度任务的 CronId 为 -1,并清除 NextRun
if err := JobDb.Model(&model.ScheduleEntity{}).
Where("status != ?", 1).
Select("cron_id", "next_run").
Updates(map[string]interface{}{"cron_id": -1, "next_run": ""}).
Error; err != nil {
fmt.Printf("Failed to update schedules where status != 1: %v\n", err)
return err
}
if len(schedules) == 0 {
return fmt.Errorf("no schedule inited")
}
// 清空现有的 cron 任务和已注册的任务
mu.Lock()
defer mu.Unlock()
c.Stop()
c = cron.New()
c.Start()
// 每分钟调用一次 LoadSchedules
_, err := c.AddFunc("@every 1m", func() {
if err := LoadSchedules(); err != nil {
fmt.Printf("Error loading schedules: %v\n", err)
}
})
if err != nil {
fmt.Printf("Failed to add schedule for LoadSchedules: %v\n", err)
}
registeredJobs = make(map[int64]int) // 清空已注册的任务
for _, schedule := range schedules {
// 验证 cron 表达式
_, err := cron.ParseStandard(schedule.Cron)
if err == nil {
go addCronJob(&schedule)
}
schedule.CronId = -1
schedule.NextRun = ""
JobDb.Save(&schedule)
}
return nil
}
// 添加 cron 任务
func addCronJob(schedule *model.ScheduleEntity) {
mu.Lock()
defer mu.Unlock()
if _, exists := registeredJobs[schedule.Id]; exists {
c.Remove(cron.EntryID(int(schedule.Id)))
}
id, err := c.AddFunc(schedule.Cron, func() {
err := RunTask(schedule.Id)
if err != nil {
fmt.Printf("Failed to add cron job: %v\n", err)
}
})
if err != nil {
fmt.Printf("Failed to add cron job: %v\n", err)
return
}
registeredJobs[schedule.Id] = int(id)
// cache.SetCache("schedule", registeredJobs, time.Now().Add(365*24*time.Hour))
schedule.CronId = int(id)
schedule.NextRun = c.Entry(cron.EntryID(registeredJobs[schedule.Id])).Next.Format(time.RFC3339)
JobDb.Save(&schedule)
}
func RunTask(id int64) error {
var schedule model.ScheduleEntity
if err := JobDb.First(&schedule, id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
log.Printf("Database NOT record: %v", err)
return err
}
log.Printf("Database error: %v", err)
return err
}
entry := c.Entry(cron.EntryID(registeredJobs[schedule.Id]))
if entry.ID > 0 {
schedule.NextRun = entry.Next.Format(time.RFC3339)
}
schedule.LastRun = time.Now().Format(time.RFC3339)
JobDb.Save(&schedule)
var logEntry model.ScheduleLogEntity
logEntry.ScheduleId = schedule.Id
JobDb.Create(&logEntry)
logEntry.RunStatus = "started"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
var paramsMap map[string]interface{}
var headersMap map[string]string
// 解析任务参数
if schedule.Params != "" {
if err := json.Unmarshal([]byte(schedule.Params), ¶msMap); err != nil {
logEntry.Message = "Error decoding Params: " + err.Error()
logEntry.StatusCode = http.StatusBadRequest
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
}
}
// 解析任务头信息
if schedule.Headers != "" {
if err := json.Unmarshal([]byte(schedule.Headers), &headersMap); err != nil {
logEntry.Message = "Error decoding Headers: " + err.Error()
logEntry.StatusCode = http.StatusBadRequest
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
}
}
// 创建HTTP请求
httpBuilder, err := utils.NewRequestBuilder[interface{}](schedule.Method, schedule.Url)
if err != nil {
logEntry.Message = "Failed to create request builder: " + err.Error()
logEntry.StatusCode = http.StatusInternalServerError
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
}
if schedule.Timeout != 0 {
httpBuilder.SetTimeout(time.Duration(schedule.Timeout))
}
httpBuilder.WithJSONBody(paramsMap)
// 添加请求头
httpBuilder.WithHeader("ScheduleId", fmt.Sprintf("%d", schedule.Id))
httpBuilder.WithHeader("ScheduleLogId", fmt.Sprintf("%d", logEntry.Id))
for key, value := range headersMap {
httpBuilder.WithHeader(key, value)
}
// 发送请求并传递上下文
statusCode, res, err := httpBuilder.Send(nil)
if err != nil {
logEntry.Message = "Task failed: " + err.Error()
logEntry.StatusCode = statusCode
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
}
// 序列化响应为JSON字符串
resJson, err := json.Marshal(res)
if err != nil {
logEntry.Message = "Error marshaling response: " + err.Error()
logEntry.StatusCode = http.StatusInternalServerError
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
return err
}
}
// 如果任务成功执行,更新日志状态为完成
logEntry.Message = string(resJson)
logEntry.StatusCode = statusCode
logEntry.RunStatus = "scheduled"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Failed to update LastRun and NextRun: %v\n", err)
}
return nil
}
type ScheduleVO struct {
Id int `json:"id"`
LastRunAt string `json:"lastRunAt"`
NextRun string `json:"nextRun"`
FuncName string `json:"funcName"`
}
func GetSchedulesWithCron() ([]any, error) {
entries := c.Entries() // 获取当前所有注册的 cron 任务
var scheduleVOs []any
for _, entry := range entries {
scheduleVO := ScheduleVO{
Id: int(entry.ID),
LastRunAt: entry.Prev.Format(time.RFC3339),
NextRun: entry.Next.Format(time.RFC3339),
FuncName: reflect.ValueOf(entry.Job).Type().Name(),
}
scheduleVOs = append(scheduleVOs, scheduleVO)
}
return scheduleVOs, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。