代码拉取完成,页面将自动刷新
package scheduler
import (
"gitee.com/h79/goutils/common/queue"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"sync"
"time"
)
var _ queue.IPriority = (*ScheduledJob)(nil)
type ScheduledJob struct {
Trigger Trigger
Job Task
JobPriority int64
NextRunTime int64
}
// GValue for implement queue.IPriority
func (it *ScheduledJob) GValue() interface{} {
return it
}
// PValue for implement queue.IPriority
func (it *ScheduledJob) PValue() int64 {
return it.JobPriority
}
type groupScheduled struct {
rm sync.Mutex
schedJobs *queue.SQueue
feeder chan *ScheduledJob
feederTaskCheck system.RunningCheck
execTaskCheck system.RunningCheck
interrupt chan struct{}
taskDuration time.Duration
}
const defTaskDuration = time.Microsecond * 500
func newGroupScheduled(capacity int, taskDuration time.Duration) *groupScheduled {
if taskDuration < defTaskDuration {
taskDuration = defTaskDuration
}
return &groupScheduled{
schedJobs: queue.NewSQueue(capacity),
feeder: make(chan *ScheduledJob, 2),
interrupt: make(chan struct{}),
taskDuration: taskDuration,
}
}
func (gs *groupScheduled) AddTask(task Task) {
_ = gs.AddTriggerTask(task, NewSimpleTrigger(time.Second*5))
}
func (gs *groupScheduled) AddTriggerTask(task Task, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(timer.NowNano())
if err != nil {
return err
}
gs.rm.Lock()
defer gs.rm.Unlock()
err = gs.schedJobs.Add(&ScheduledJob{Trigger: trigger, Job: task, JobPriority: nextRunTime})
if err == nil {
gs.Run()
}
return err
}
func (gs *groupScheduled) UpdateTask(jobId string, update func(task Task) bool) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
return update(task)
}
return false
}
func (gs *groupScheduled) StopTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) RemoveTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) HasTask(jobId string) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
return gs.find(jobId) != nil
}
func (gs *groupScheduled) Run() {
gs.execTaskCheck.GoRunning(gs.execTask)
gs.feederTaskCheck.GoRunning(gs.runFeeder)
}
func (gs *groupScheduled) find(jobId string) Task {
job, idx := gs.schedJobs.Find(func(v interface{}, index int) bool {
if scheduledJob, ok := v.(*ScheduledJob); ok {
return scheduledJob.Job.GetId() == jobId
}
return false
})
if idx == -1 {
return nil
}
if scheduledJob, ok := job.(*ScheduledJob); ok {
return scheduledJob.Job
}
return nil
}
func (gs *groupScheduled) peek() *ScheduledJob {
gs.rm.Lock()
defer gs.rm.Unlock()
var v = gs.schedJobs.Peek()
if schedJob, ok := v.(*ScheduledJob); !ok {
return nil
} else {
return schedJob
}
}
func (gs *groupScheduled) calculateNextTick() time.Duration {
var parkTime = func(ts int64) time.Duration {
now := timer.NowNano()
if ts > now {
return time.Duration(ts - now)
}
return gs.taskDuration
}
schedJob := gs.peek()
if schedJob == nil {
return gs.taskDuration
}
return parkTime(schedJob.JobPriority)
}
func (gs *groupScheduled) execTask() {
var t = time.NewTimer(gs.taskDuration)
for {
t.Reset(gs.calculateNextTick())
select {
case <-system.Closed():
return
case <-t.C:
if gs.runTask() {
break
}
case <-gs.interrupt:
break
}
}
}
func (gs *groupScheduled) addFeeder(item *ScheduledJob) {
gs.rm.Lock()
defer gs.rm.Unlock()
_ = gs.schedJobs.Add(item)
}
func (gs *groupScheduled) runFeeder() {
for {
select {
case item := <-gs.feeder:
gs.addFeeder(item)
gs.interrupt <- struct{}{}
case <-system.Closed():
return
}
}
}
func (gs *groupScheduled) pop() (*ScheduledJob, bool) {
gs.rm.Lock()
defer gs.rm.Unlock()
var notExpired = func(t int64) bool {
return t > timer.NowNano()
}
var v = gs.schedJobs.Peek()
var schedJob, ok = v.(*ScheduledJob)
if !ok || notExpired(schedJob.JobPriority) {
return nil, true
}
gs.schedJobs.Pop()
return schedJob, false
}
func (gs *groupScheduled) runTask() bool {
//没到时间,不用执行
schedJob, ok := gs.pop()
if ok {
return true
}
_, err := schedJob.Job.Execute()
if schedJob.Job.GetState().IsQuit() {
return true
}
nextRunTime, err := schedJob.Trigger.NextFireTime(schedJob.JobPriority)
if err != nil {
return true
}
// 轮到下一个时间
schedJob.JobPriority = nextRunTime
gs.feeder <- schedJob
return false
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。