1 Star 0 Fork 0

fkil555/gin-extend

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
cron.go 4.69 KB
一键复制 编辑 原始数据 按行查看 历史
fkil555 提交于 2023-10-26 09:57 . up
package cron
import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"gitee.com/fkil555/gin-extend/client"
"gitee.com/fkil555/gin-extend/conf"
cronContext "gitee.com/fkil555/gin-extend/cron/context"
"gitee.com/fkil555/gin-extend/cron/middlewares"
"gitee.com/fkil555/gin-extend/gcontext"
v3cron "github.com/robfig/cron/v3"
)
// Cron框架实例
type GECron struct {
cron *v3cron.Cron
middlewares []cronContext.HandleFunc // 中间件
}
// 创建Cron
func New() (geCron *GECron, err error) {
// 加载配置
if err = conf.InitConf(conf.MODE_CRON); err != nil {
return
}
geCron = &GECron{
middlewares: make([]cronContext.HandleFunc, 0),
cron: v3cron.New(),
}
if err = client.InitClients(); err != nil {
return
}
// 注册中间件
geCron.registerMiddleware()
return
}
// 启动APP
func (cron *GECron) Run() (err error) {
//启动前,先注入配置中的cron
cron.injectCronConf()
cron.cron.Start()
cron.waitGraceExit()
cron.close()
return
}
// 清理关闭一些文件或关闭部分服务
func (cron *GECron) close() {
client.Close()
}
// 注册中间件
func (cron *GECron) Use(handleFunc cronContext.HandleFunc) {
cron.middlewares = append(cron.middlewares, handleFunc)
}
// 注册定时任务
func (cron *GECron) AddJob(name string, spec string, funcs ...cronContext.HandleFunc) (err error) {
params := make([]string, 0)
err = cron.AddJobWithParams(name, spec, params, funcs...)
return
}
// AddJobWithParams 添加带参数的
func (cron *GECron) AddJobWithParams(name string, spec string, params []string, funcs ...cronContext.HandleFunc) (err error) {
// 中间件数组
middlewares := make([]cronContext.HandleFunc, len(cron.middlewares)+len(funcs))
// 全局中间件
copy(middlewares, cron.middlewares)
// 任务级中间件
copy(middlewares[len(cron.middlewares):], funcs)
f := func() {
// 每次任务执行前, 生成context
ctx := cron.newContext(name, spec, params, middlewares)
// 执行chain
ctx.Next()
}
// 跳过尚未结束的任务
job := v3cron.SkipIfStillRunning(v3cron.DefaultLogger)(newJob(f))
_, err = cron.cron.AddJob(spec, job)
return
}
// 生成任务执行的上下文
func (cron *GECron) newContext(name string, spec string, params []string, middlewares []cronContext.HandleFunc) (ctx *cronContext.Context) {
ctx = &cronContext.Context{
Name: name,
Spec: spec,
Context: context.TODO(),
Chain: middlewares,
KeyValues: make(map[string]interface{}, 0),
Params: params,
}
return
}
// 闭包GE上下文
func (cron *GECron) WithGEContext(geHandle gcontext.GEHandleFunc) cronContext.HandleFunc {
return func(ctx *cronContext.Context) {
// GE上下文
geCtx := gcontext.GEContext{
Context: ctx,
Cron: ctx,
}
// 回调用户
geHandle(&geCtx)
}
}
// 等待优雅退出
func (cron *GECron) waitGraceExit() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
fmt.Fprintf(os.Stdout, "收到信号: %s, 退出倒计时%d毫秒,服务正在退出... \n", s.String(), conf.GEConf.CronConfig.WaitGraceExit)
select {
case <-time.NewTimer(time.Duration(conf.GEConf.CronConfig.WaitGraceExit) * time.Millisecond).C: // 最多等待N秒
case <-cron.cron.Stop().Done(): // 等待所有任务结束
}
return
case syscall.SIGHUP:
default:
}
}
}
// 注册中间件
func (cron *GECron) registerMiddleware() {
// recovery
cron.Use(middlewares.Recovery())
}
// 注入Dot中配置的Cron指令
func (cron *GECron) injectCronConf() {
cronList := conf.ParseCronContent()
for _, cronConf := range cronList {
fields := strings.Fields(cronConf)
if len(fields) < conf.MIN_FIELDS_COUNT {
panic(cronConf + " cron格式错误")
}
spec := fmt.Sprintf("%s %s %s %s %s",
fields[conf.MinuteIndex], fields[conf.HourIndex], fields[conf.DomIndex], fields[conf.MonthIndex], fields[conf.DowIndex])
name := fields[conf.NameIndex]
params := fields[conf.DescIndex:]
cronFunc := getRouterByName(name)
if cronFunc == nil {
_, _ = fmt.Fprintf(os.Stdout, "cron配置中缺少name:%s相关的配置\n", name)
continue
}
_ = cron.AddJobWithParams(name, spec, params, cronFunc...)
}
}
// 注册路由规则
func (cron *GECron) RegisterRouter(name string, funcs ...cronContext.HandleFunc) {
r.lock.Lock()
defer r.lock.Unlock()
if _, ok := r.routerMap[name]; ok {
panicText := fmt.Sprintf("RegisterRouter重复注册,name:%s", name)
panic(panicText)
}
r.routerMap[name] = &cronConf{
name: name,
execFunc: funcs,
}
}
// 初始化运行环境
func init() {
runtime.GOMAXPROCS(runtime.NumCPU()) // 用满所有核心
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/fkil555/gin-extend.git
git@gitee.com:fkil555/gin-extend.git
fkil555
gin-extend
gin-extend
v0.0.5

搜索帮助