1 Star 0 Fork 0

h79 / goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
group.go 4.52 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2024-03-13 19:40 . task duration
package scheduler
import (
"gitee.com/h79/goutils/common/queue"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"sync"
"time"
)
var _ queue.IPriority = (*ScheduledJob)(nil)
type ScheduledJob struct {
Trigger Trigger
Job Task
JobPriority int64
NextRunTime int64
}
// GValue for implement queue.IPriority
func (it *ScheduledJob) GValue() interface{} {
return it
}
// PValue for implement queue.IPriority
func (it *ScheduledJob) PValue() int64 {
return it.JobPriority
}
type groupScheduled struct {
rm sync.Mutex
schedJobs *queue.SQueue
feeder chan *ScheduledJob
feederTaskCheck system.RunningCheck
execTaskCheck system.RunningCheck
interrupt chan struct{}
taskDuration time.Duration
}
const defTaskDuration = time.Microsecond * 500
func newGroupScheduled(capacity int, taskDuration time.Duration) *groupScheduled {
if taskDuration < defTaskDuration {
taskDuration = defTaskDuration
}
return &groupScheduled{
schedJobs: queue.NewSQueue(capacity),
feeder: make(chan *ScheduledJob, 2),
interrupt: make(chan struct{}),
taskDuration: taskDuration,
}
}
func (gs *groupScheduled) AddTask(task Task) {
_ = gs.AddTriggerTask(task, NewSimpleTrigger(time.Second*5))
}
func (gs *groupScheduled) AddTriggerTask(task Task, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(timer.NowNano())
if err != nil {
return err
}
gs.rm.Lock()
defer gs.rm.Unlock()
err = gs.schedJobs.Add(&ScheduledJob{Trigger: trigger, Job: task, JobPriority: nextRunTime})
if err == nil {
gs.Run()
}
return err
}
func (gs *groupScheduled) UpdateTask(jobId string, update func(task Task) bool) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
return update(task)
}
return false
}
func (gs *groupScheduled) StopTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) RemoveTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) HasTask(jobId string) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
return gs.find(jobId) != nil
}
func (gs *groupScheduled) Run() {
gs.execTaskCheck.GoRunning(gs.execTask)
gs.feederTaskCheck.GoRunning(gs.runFeeder)
}
func (gs *groupScheduled) find(jobId string) Task {
job, idx := gs.schedJobs.Find(func(v interface{}, index int) bool {
if scheduledJob, ok := v.(*ScheduledJob); ok {
return scheduledJob.Job.GetId() == jobId
}
return false
})
if idx == -1 {
return nil
}
if scheduledJob, ok := job.(*ScheduledJob); ok {
return scheduledJob.Job
}
return nil
}
func (gs *groupScheduled) peek() *ScheduledJob {
gs.rm.Lock()
defer gs.rm.Unlock()
var v = gs.schedJobs.Peek()
if schedJob, ok := v.(*ScheduledJob); !ok {
return nil
} else {
return schedJob
}
}
func (gs *groupScheduled) calculateNextTick() time.Duration {
var parkTime = func(ts int64) time.Duration {
now := timer.NowNano()
if ts > now {
return time.Duration(ts - now)
}
return gs.taskDuration
}
schedJob := gs.peek()
if schedJob == nil {
return gs.taskDuration
}
return parkTime(schedJob.JobPriority)
}
func (gs *groupScheduled) execTask() {
var t = time.NewTimer(gs.taskDuration)
for {
t.Reset(gs.calculateNextTick())
select {
case <-system.Closed():
return
case <-t.C:
if gs.runTask() {
break
}
case <-gs.interrupt:
break
}
}
}
func (gs *groupScheduled) addFeeder(item *ScheduledJob) {
gs.rm.Lock()
defer gs.rm.Unlock()
_ = gs.schedJobs.Add(item)
}
func (gs *groupScheduled) runFeeder() {
for {
select {
case item := <-gs.feeder:
gs.addFeeder(item)
gs.interrupt <- struct{}{}
case <-system.Closed():
return
}
}
}
func (gs *groupScheduled) pop() (*ScheduledJob, bool) {
gs.rm.Lock()
defer gs.rm.Unlock()
var notExpired = func(t int64) bool {
return t > timer.NowNano()
}
var v = gs.schedJobs.Peek()
var schedJob, ok = v.(*ScheduledJob)
if !ok || notExpired(schedJob.JobPriority) {
return nil, true
}
gs.schedJobs.Pop()
return schedJob, false
}
func (gs *groupScheduled) runTask() bool {
//没到时间,不用执行
schedJob, ok := gs.pop()
if ok {
return true
}
_, err := schedJob.Job.Execute()
if schedJob.Job.GetState().IsQuit() {
return true
}
nextRunTime, err := schedJob.Trigger.NextFireTime(schedJob.JobPriority)
if err != nil {
return true
}
// 轮到下一个时间
schedJob.JobPriority = nextRunTime
gs.feeder <- schedJob
return false
}
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.57

搜索帮助

53164aa7 5694891 3bd8fe86 5694891