代码拉取完成,页面将自动刷新
package httpserver
///Http服务
import (
"fmt"
"gitee.com/mysoft-free/cron-task/lib/authorize"
"gitee.com/mysoft-free/cron-task/lib/context"
"gitee.com/mysoft-free/cron-task/lib/job"
"gitee.com/mysoft-free/cron-task/lib/storage"
"gitee.com/mysoft-free/cron-task/lib/types"
"github.com/douglarek/zerodown"
"github.com/gin-gonic/gin"
"net/http"
"strconv"
"time"
)
const (
TaskRunning = "isRunning"
TaskFin = "isFinished"
TaskNotFound = "没有找到任务"
TaskAlreadyRunning = "已经有个任务在执行中,请等待!"
Res_Ok = "ok"
Res_Fail = "fail"
)
//Build for key
func BuildKey(taskId string) string {
return fmt.Sprintf("task_%s", taskId)
}
//授权校验
func CheckAuth(auth authorize.IAuthorize ,ctx *context.Context) gin.HandlerFunc {
return func(c *gin.Context) {
if ctx.Config.Authorize.Mode != "none"{
token := c.Query("token")
err := auth.VerifyToken(token)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": fmt.Sprintf("token 校验失败,原因是 %s", err.Error()),
})
c.Abort()
return
}
}
c.Next()
}
}
func Run(context *context.Context, storage storage.IStorage) {
var config = context.Config
var listenAddr = config.HttpServer.ListenAddr
log := context.Log
if listenAddr == "" {
context.Log.Info(types.HttpJobCategory, "listenAddr为空或不存在")
return
}
var r = gin.Default()
var tasks = context.Config.Tasks
auth := authorize.NewAuthorize(context) //初始化校验对象
//运行一次task,排他性
r.GET("/once/task", CheckAuth(auth,context), func(c *gin.Context) {
var taskId = c.Query("id") //string
var async = c.Query("async") //0 or nil 代表false
taskInfo, ok := tasks[taskId]
if !ok {
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": TaskNotFound,
})
} else {
task := &types.Task{
TaskType: taskInfo.TaskType,
Name: taskInfo.Name,
Instances: taskInfo.Instances,
Exclusive: taskInfo.Exclusive,
RunEnv: taskInfo.RunEnv,
HttpMethod: taskInfo.HttpMethod,
Path: taskInfo.Path,
Args: taskInfo.Args,
Cron: taskInfo.Cron,
}
isAsync, err := strconv.Atoi(async)
if err == nil && isAsync == 1 {
log.Info(types.HttpJobCategory, "启用异步模式执行")
res, _ := storage.Get(BuildKey(taskId))
rs, ok := res.(types.RunTaskResult)
if ok && rs.Status == TaskRunning { //只允许一个运行的任务进行
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": TaskAlreadyRunning,
})
return
} else {
if ok && rs.Status == TaskFin { //如果执行完毕则先清理
storage.Del(BuildKey(taskId))
}
//不存在则storage,这里是初始化任务
err := storage.Set(BuildKey(taskId), types.RunTaskResult{
Status: TaskRunning,
StartTime: time.Now(),
}, false)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": "任务状态初始化失败!",
})
return //写失败则立即返回
}
}
//Start async
go func(taskId string, task *types.Task) {
job.NewJob(task).Run()
res, _ := storage.Get(BuildKey(taskId))
result, _ := res.(types.RunTaskResult) //断言类型转化
result.Duration = time.Now().Sub(result.StartTime) //耗费时间
result.Status = TaskFin
//完成状态,强制写,这里确保原子性
storage.Set(BuildKey(taskId), result, true)
}(taskId, task)
} else {
log.Info(types.HttpJobCategory, "启用同步模式执行")
job.NewJob(task).Run()
}
c.JSON(http.StatusOK, gin.H{
"status": Res_Ok,
"taskId": tasks[taskId].Name,
"isAsync": isAsync,
})
}
})
//获取作业状态,如果是异步的话
r.GET("/task/status", CheckAuth(auth,context), func(c *gin.Context) {
var taskId = c.Query("id") //string
v, err := storage.Get(BuildKey(taskId))
if err != nil {
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": TaskNotFound,
})
} else {
task := v.(types.RunTaskResult)
duration := time.Now().Sub(task.StartTime)
if task.Status == TaskFin {
duration = task.Duration
}
taskStatus := task.Status
c.JSON(http.StatusOK, gin.H{
"status": Res_Ok,
"taskStatus": taskStatus, //任务状态
"duration": duration.Seconds(), //任务持续的时间,单位秒
})
}
})
r.GET("/storage/release", CheckAuth(auth,context), func(c *gin.Context) {
var prefix = c.Query("p") //释放前缀
if prefix == "all" { //直接flush
storage.Release()
} else {
storage.Release(prefix)
}
c.JSON(http.StatusOK, gin.H{
"status": Res_Ok,
"message": "释放完毕",
})
})
//获取访问token,不需要授权
r.GET("/getToken", func(c *gin.Context) {
u := c.Query("u")
p := c.Query("p")
token, err := auth.GetToken(u, p)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"status": Res_Fail,
"errmsg": err.Error(),
})
} else {
c.JSON(http.StatusOK, gin.H{
"status": Res_Ok,
"token": token,
})
}
})
//r.GET("/task/all",func(c *gin.Context){
// c.JSON(http.StatusOK, gin.H{
// "status": Res_Ok,
// "lists": tasks,
// })
//})
//关闭退出服务
//r.GET("/stop/task", func(c *gin.Context) {
//
// c.JSON(http.StatusOK, gin.H{
// "status": Res_Ok,
// "message": "请求关闭服务成功",
// })
// context.CronDone <- true //send close signal
//})
fmt.Println(zerodown.ListenAndServe(listenAddr, r))
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。