1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
group.go 3.97 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2022-10-25 20:57 . 优化
package scheduler
import (
"gitee.com/h79/goutils/common"
"gitee.com/h79/goutils/common/queue"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"sync"
"time"
)
var _ queue.IPriorityItem = (*ScheduledJob)(nil)
type ScheduledJob struct {
Trigger Trigger
Job Task
JobPriority int64
NextRunTime int64
}
// GValue for implement queue.IPriorityItem
func (it *ScheduledJob) GValue() interface{} {
return it
}
// PValue for implement queue.IPriorityItem
func (it *ScheduledJob) PValue() int64 {
return it.JobPriority
}
type groupScheduled struct {
rm sync.Mutex
schedJobs *queue.SQueue
feeder chan *ScheduledJob
interrupt chan struct{}
}
func newGroupScheduled() *groupScheduled {
return &groupScheduled{
schedJobs: queue.NewSQueue(5000),
feeder: make(chan *ScheduledJob, 2),
interrupt: make(chan struct{}),
}
}
func (job *groupScheduled) AddTask(task Task) {
_ = job.AddTriggerTask(task, NewSimpleTrigger(time.Second*5))
}
func (job *groupScheduled) AddTriggerTask(task Task, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(timer.NowNano())
if err != nil {
return err
}
job.rm.Lock()
err = job.schedJobs.Add(&ScheduledJob{Trigger: trigger, Job: task, JobPriority: nextRunTime})
job.rm.Unlock()
return err
}
func (job *groupScheduled) StopTask(jobId string) {
job.rm.Lock()
job.schedJobs.Find(func(v interface{}, index int) bool {
scheduledJob, ok := v.(*ScheduledJob)
if !ok {
return false
}
if scheduledJob.Job.GetKey() == jobId {
scheduledJob.Job.Cancel()
return true
}
return false
})
job.rm.Unlock()
}
func (job *groupScheduled) RemoveTask(jobId string) {
job.rm.Lock()
_, _ = job.schedJobs.Find(func(v interface{}, index int) bool {
scheduledJob, ok := v.(*ScheduledJob)
if !ok {
return false
}
if scheduledJob.Job.GetKey() == jobId {
scheduledJob.Job.Cancel()
return true
}
return false
})
job.rm.Unlock()
}
func (job *groupScheduled) HasTask(jobId string) bool {
job.rm.Lock()
_, idx := job.schedJobs.Find(func(v interface{}, index int) bool {
scheduledJob, ok := v.(*ScheduledJob)
if !ok {
return false
}
return scheduledJob.Job.GetKey() == jobId
})
job.rm.Unlock()
return idx != -1
}
func (job *groupScheduled) Run() {
go func() {
defer common.Recover()
job.execTask()
}()
go func() {
defer common.Recover()
job.runFeeder()
}()
}
func (job *groupScheduled) calculateNextTick() time.Duration {
var (
parkTime = func(ts int64) int64 {
now := timer.NowNano()
if ts > now {
return ts - now
}
return 0
}
v interface{}
)
job.rm.Lock()
v = job.schedJobs.Peek()
job.rm.Unlock()
schedJob, ok := v.(*ScheduledJob)
if !ok {
return 0
}
return time.Duration(parkTime(schedJob.JobPriority))
}
func (job *groupScheduled) execTask() {
var (
exit = system.Exit()
t = time.NewTimer(0)
)
for {
t.Reset(job.calculateNextTick())
select {
case <-exit.S:
return
case <-t.C:
if job.runTask() {
break
}
case <-job.interrupt:
break
}
}
}
func (job *groupScheduled) runFeeder() {
var exit = system.Exit()
for {
select {
case item := <-job.feeder:
job.rm.Lock()
_ = job.schedJobs.Add(item)
job.rm.Unlock()
job.interrupt <- struct{}{}
case <-exit.S:
return
}
}
}
func (job *groupScheduled) runTask() bool {
//没到时间,不用执行
var (
notExpired = func(t int64) bool {
return t > timer.NowNano()
}
ok = false
v interface{}
schedJob *ScheduledJob
)
job.rm.Lock()
v = job.schedJobs.Peek()
schedJob, ok = v.(*ScheduledJob)
if !ok || notExpired(schedJob.JobPriority) {
job.rm.Unlock()
time.Sleep(time.Millisecond * 20)
return true
}
job.schedJobs.Pop()
job.rm.Unlock()
state := schedJob.Job.Execute()
if state.IsQuit() {
return true
}
nextRunTime, err := schedJob.Trigger.NextFireTime(schedJob.JobPriority)
if err != nil {
return true
}
// 轮到下一个时间
schedJob.JobPriority = nextRunTime
job.feeder <- schedJob
return false
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.2.11

搜索帮助

344bd9b3 5694891 D2dac590 5694891