2 Star 1 Fork 2

go-mao/mao

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
scheduler.go 8.08 KB
一键复制 编辑 原始数据 按行查看 历史
package crontab
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
)
// copy robfig/cron's crontab parser to cronlib.cron_parser.go
// "github.com/robfig/cron"
const (
OnMode = true
OffMode = false
)
var (
ErrNotFoundJob = errors.New("not found job")
ErrAlreadyRegister = errors.New("the job already in pool")
ErrJobDOFuncNil = errors.New("callback func is nil")
ErrCronSpecInvalid = errors.New("crontab spec is invalid")
)
// null logger
var defualtLogger = func(level, s string) {}
type loggerType func(level, s string)
func SetLogger(logger loggerType) {
defualtLogger = logger
}
// panic call
var panicCaller = func(srv, err string) {
}
type panicType func(srv, err string)
func SetPanicCaller(p panicType) {
panicCaller = p
}
// New - create CronSchduler
func New() *CronSchduler {
ctx, cancel := context.WithCancel(context.Background())
return &CronSchduler{
tasks: make(map[string]*JobModel),
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
once: &sync.Once{},
}
}
// CronSchduler
type CronSchduler struct {
tasks map[string]*JobModel
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
once *sync.Once
sync.RWMutex
}
// Register - only register srv's job model, don't start auto.
func (c *CronSchduler) Register(srv string, model *JobModel) error {
return c.reset(srv, model, true, false)
}
// UpdateJobModel - stop old job, update srv's job model
func (c *CronSchduler) UpdateJobModel(srv string, model *JobModel) error {
return c.reset(srv, model, false, true)
}
// DynamicRegister - after cronlib already run, dynamic add a job, the job autostart by cronlib.
func (c *CronSchduler) DynamicRegister(srv string, model *JobModel) error {
return c.reset(srv, model, false, true)
}
// reset - reset srv model
func (c *CronSchduler) reset(srv string, model *JobModel, denyReplace, autoStart bool) error {
c.Lock()
defer c.Unlock()
// validate model
err := model.validate()
if err != nil {
return err
}
cctx, cancel := context.WithCancel(c.ctx)
model.ctx = cctx
model.cancel = cancel
model.srv = srv
oldModel, ok := c.tasks[srv]
if denyReplace && ok {
return ErrAlreadyRegister
}
if ok {
oldModel.kill()
}
c.tasks[srv] = model
if autoStart {
c.wg.Add(1)
go c.tasks[srv].runLoop(c.wg)
}
return nil
}
// UnRegister - stop and delete srv
func (c *CronSchduler) UnRegister(srv string) error {
c.Lock()
defer c.Unlock()
oldModel, ok := c.tasks[srv]
if !ok {
return ErrNotFoundJob
}
oldModel.kill()
delete(c.tasks, srv)
return nil
}
// Stop - stop all cron job
func (c *CronSchduler) Stop() {
c.Lock()
defer c.Unlock()
for srv, job := range c.tasks {
job.kill()
delete(c.tasks, srv)
}
c.cancel()
}
// StopService - stop job by serviceName
func (c *CronSchduler) StopService(srv string) {
c.Lock()
defer c.Unlock()
job, ok := c.tasks[srv]
if !ok {
return
}
job.kill()
delete(c.tasks, srv)
}
// StopServicePrefix - stop job by srv regex prefix.
// if regex = "risk.scan", stop risk.scan.total, risk.scan.user at the same time
func (c *CronSchduler) StopServicePrefix(regex string) {
c.Lock()
defer c.Unlock()
// regex match
for srv, job := range c.tasks {
if !strings.HasPrefix(srv, regex) {
continue
}
job.kill()
delete(c.tasks, srv)
}
}
func validateSpec(spec string) bool {
_, err := Parse(spec)
if err != nil {
return false
}
return true
}
func getNextDue(spec string) (time.Time, error) {
sc, err := Parse(spec)
if err != nil {
return time.Now(), err
}
// avoid time.sub
time.Sleep(10 * time.Millisecond)
due := sc.Next(time.Now())
return due, err
}
func getNextDueSafe(spec string, last time.Time) (time.Time, error) {
var (
due time.Time
err error
)
for {
due, err = getNextDue(spec)
if err != nil {
return due, err
}
if last.Equal(due) {
// avoid time.sub lost some accuracy, repeat do job.
time.Sleep(100 * time.Millisecond)
continue
}
break
}
return due, err
}
func (c *CronSchduler) Start() {
// only once call
c.once.Do(func() {
for _, job := range c.tasks {
c.wg.Add(1)
job.runLoop(c.wg)
}
})
}
// Wait - if all jobs is exited, return.
func (c *CronSchduler) Wait() {
c.wg.Wait()
}
// WaitStop - when stop cronlib controller, return.
func (c *CronSchduler) WaitStop() {
select {
case <-c.ctx.Done():
}
}
func (c *CronSchduler) GetServiceCron(srv string) (*JobModel, error) {
c.RLock()
defer c.RUnlock()
oldModel, ok := c.tasks[srv]
if !ok {
return nil, ErrNotFoundJob
}
return oldModel, nil
}
// NewJobModel - defualt block sync callfunc
func NewJobModel(spec string, f func(), options ...JobOption) (*JobModel, error) {
var err error
job := &JobModel{
running: true,
async: false,
do: f,
spec: spec,
notifyChan: make(chan int, 1), // avoid block
}
for _, opt := range options {
if opt != nil {
if err := opt(job); err != nil {
return nil, err
}
}
}
err = job.validate()
if err != nil {
return job, err
}
return job, nil
}
type JobOption func(*JobModel) error
func AsyncMode() JobOption {
return func(o *JobModel) error {
o.async = true
return nil
}
}
func TryCatchMode() JobOption {
return func(o *JobModel) error {
o.tryCatch = true
return nil
}
}
type JobModel struct {
// srv name
srv string
// callfunc
do func()
// if async = true; go func() { do() }
async bool
// try catch panic
tryCatch bool
// cron spec
spec string
// for control
ctx context.Context
cancel context.CancelFunc
notifyChan chan int
// break for { ... } loop
running bool
// ensure job worker is exited already
exited bool
sync.RWMutex
}
func (j *JobModel) SetTryCatch(b bool) {
j.tryCatch = b
}
func (j *JobModel) SetAsyncMode(b bool) {
j.async = b
}
func (j *JobModel) validate() error {
if j.do == nil {
return ErrJobDOFuncNil
}
if _, err := getNextDue(j.spec); err != nil {
return err
}
return nil
}
func (j *JobModel) runLoop(wg *sync.WaitGroup) {
go j.run(wg)
}
func (j *JobModel) run(wg *sync.WaitGroup) {
var (
// stdout do time cost
doTimeCostFunc = func() {
startTS := time.Now()
defualtLogger("info",
fmt.Sprintf("scheduler service: %s begin run",
j.srv,
),
)
if j.tryCatch {
tryCatch(j)
} else {
j.do()
}
defualtLogger("info",
fmt.Sprintf("scheduler service: %s has been finished, time cost: %s, spec: %s",
j.srv,
time.Since(startTS).String(),
j.spec,
),
)
}
timer *time.Timer
lastNextTime time.Time
due time.Time
interval time.Duration
err error
)
// parse crontab spec
due, err = getNextDue(j.spec)
interval = due.Sub(time.Now())
if err != nil {
panic(err.Error())
}
lastNextTime = due
defualtLogger("info",
fmt.Sprintf("scheduler service: %s next time is %s, sub: %s",
j.srv,
due.String(),
interval.String(),
),
)
// int timer
timer = time.NewTimer(interval)
// release join counter
defer func() {
timer.Stop()
wg.Done()
j.exited = true
}()
for j.running {
select {
case <-timer.C:
if time.Now().Before(due) {
timer.Reset(
due.Sub(time.Now()) + 50*time.Millisecond,
)
continue
}
due, _ := getNextDueSafe(j.spec, lastNextTime)
lastNextTime = due
interval := due.Sub(time.Now())
timer.Reset(interval)
if j.async {
go doTimeCostFunc() // goroutine for per job
} else {
doTimeCostFunc()
}
defualtLogger("info",
fmt.Sprintf("scheduler service: %s next time is %s, sub: %s",
j.srv,
due.String(),
interval.String(),
),
)
case <-j.notifyChan:
// parse crontab spec again !
continue
case <-j.ctx.Done():
return
}
}
}
func (j *JobModel) kill() {
j.running = false
j.cancel()
}
func (j *JobModel) workerExited() bool {
return j.exited
}
func (j *JobModel) notifySig() {
select {
case j.notifyChan <- 1:
default:
// avoid block
return
}
}
func tryCatch(job *JobModel) {
defer func() {
if e := recover(); e != nil {
panicCaller(
job.srv,
fmt.Sprintf("%v", e),
)
defualtLogger(
"error",
fmt.Sprintf("srv: %s, trycatch panicing %v", job.srv, e),
)
}
}()
job.do()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/go-mao/mao.git
git@gitee.com:go-mao/mao.git
go-mao
mao
mao
v1.0.20

搜索帮助