2 Star 0 Fork 0

hero/momo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
schedule.go 5.13 KB
一键复制 编辑 原始数据 按行查看 历史
hero 提交于 2024-10-11 17:50 . upd:schedule
package service
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"sync"
"time"
"gitee.com/linqwen/momo/app/apitool"
"gitee.com/linqwen/momo/sys/job/model"
"gitee.com/linqwen/momo/app"
)
type scheduleService struct {
app.BaseService[model.ScheduleEntity]
taskStatusLock sync.RWMutex // 使用读写锁来优化并发控制
taskStatus map[int64]string // 任务ID与状态的映射
taskCancelFuncsLock sync.Mutex
taskCancelFuncs map[int64]context.CancelFunc
}
func NewScheduleService() *scheduleService {
return &scheduleService{
BaseService: *app.NewBaseService(JobDb, model.ScheduleEntity{}),
taskStatus: make(map[int64]string),
taskCancelFuncs: make(map[int64]context.CancelFunc),
}
}
func (bs *scheduleService) GetRunningTasks() map[int64]string {
runningTasks := make(map[int64]string)
bs.taskStatusLock.RLock()
defer bs.taskStatusLock.RUnlock()
for id, status := range bs.taskStatus {
runningTasks[id] = status
}
return runningTasks
}
func (bs *scheduleService) EndTask(id string) error {
taskId, err := strconv.ParseInt(id, 10, 64)
if err != nil {
log.Printf("ID conversion error: %v", err)
return fmt.Errorf("endTask error: invalid task ID")
}
bs.taskCancelFuncsLock.Lock()
defer bs.taskCancelFuncsLock.Unlock()
if cancel, ok := bs.taskCancelFuncs[taskId]; ok {
cancel()
delete(bs.taskCancelFuncs, taskId)
bs.taskStatusLock.Lock()
delete(bs.taskStatus, taskId)
bs.taskStatusLock.Unlock()
return nil
}
return fmt.Errorf("endTask error: task not found")
}
func (bs *scheduleService) RunTask(id string) error {
var schedule model.ScheduleEntity
if err := JobDb.First(&schedule, id).Error; err != nil {
log.Printf("Database error: %v", err)
return fmt.Errorf("runTask error: task not found")
}
// 创建初始日志记录,表明任务开始运行
logEntry := model.ScheduleLogEntity{
ScheduleId: schedule.Id,
Message: "Task started",
StatusCode: 0,
StartAt: app.ISO8601Time{Time: time.Now()},
RunStatus: "running",
}
if err := JobDb.Create(&logEntry).Error; err != nil {
log.Printf("Log creation error: %v", err)
return fmt.Errorf("runTask error: failed to create log")
}
// 尝试运行任务
bs.taskStatusLock.Lock()
bs.taskStatus[schedule.Id] = "running"
bs.taskStatusLock.Unlock()
defer func() {
bs.taskStatusLock.Lock()
delete(bs.taskStatus, schedule.Id)
bs.taskStatusLock.Unlock()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bs.taskCancelFuncsLock.Lock()
bs.taskCancelFuncs[schedule.Id] = cancel
bs.taskCancelFuncsLock.Unlock()
var paramsMap map[string]interface{}
var headersMap map[string]string
// 解析任务参数
if schedule.Params != "" {
if err := json.Unmarshal([]byte(schedule.Params), &paramsMap); err != nil {
logEntry.Message = "Error decoding Params: " + err.Error()
logEntry.StatusCode = http.StatusBadRequest
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return fmt.Errorf("runTask error: invalid params")
}
}
// 解析任务头信息
if schedule.Headers != "" {
if err := json.Unmarshal([]byte(schedule.Headers), &headersMap); err != nil {
logEntry.Message = "Error decoding Headers: " + err.Error()
logEntry.StatusCode = http.StatusBadRequest
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return fmt.Errorf("runTask error: invalid headers")
}
}
// 创建HTTP请求
httpBuilder, err := apitool.NewRequestBuilder[interface{}](schedule.Method, schedule.Url)
if err != nil {
logEntry.Message = "Failed to create request builder: " + err.Error()
logEntry.StatusCode = http.StatusInternalServerError
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return fmt.Errorf("runTask error: request builder failure")
}
httpBuilder.WithJSONBody(paramsMap)
// 添加请求头
for key, value := range headersMap {
httpBuilder.WithHeader(key, value)
}
// 发送请求并传递上下文
statusCode, res, err := httpBuilder.Send(&ctx)
if err != nil {
logEntry.Message = "Task failed: " + err.Error()
logEntry.StatusCode = statusCode
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return fmt.Errorf("runTask error: task execution failure")
}
// 序列化响应为JSON字符串
resJson, err := json.Marshal(res)
if err != nil {
logEntry.Message = "Error marshaling response: " + err.Error()
logEntry.StatusCode = http.StatusInternalServerError
logEntry.RunStatus = "failed"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return fmt.Errorf("runTask error: response marshaling failure")
}
// 如果任务成功执行,更新日志状态为完成
logEntry.Message = string(resJson)
logEntry.StatusCode = statusCode
logEntry.RunStatus = "scheduled"
if err := JobDb.Save(&logEntry).Error; err != nil {
log.Printf("Log update error: %v", err)
}
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/linqwen/momo.git
git@gitee.com:linqwen/momo.git
linqwen
momo
momo
v1.7.8

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385