1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
alarm
api
auth
build
common
algorithm
app
archive
attributes
banner
bus
coder
compress
config
data
debug
file
flow/flow
git
http
images
json
list
logger
mapper
option
queue
random
result
scheduler
cron.go
cron_test.go
group.go
pool.go
scheduler.go
scheduler_test.go
task.go
trigger.go
worker.go
secret
server
ssh
stringutil
svc
system
tag
template
timer
tls
trie
version
watcher
xml
yaml
bit_flag.go
common.go
integer.go
dao
discovery
loader
mq
perf
plugins
request
rpc
sensitive
thrift
.gitignore
LICENSE
Makefile
README.md
doc.go
error.md
go.mod
go.sum
goutils.go
goutils_test.go
version.go
克隆/下载
group.go 4.52 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 11个月前 . task duration
package scheduler
import (
"gitee.com/h79/goutils/common/queue"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"sync"
"time"
)
var _ queue.IPriority = (*ScheduledJob)(nil)
type ScheduledJob struct {
Trigger Trigger
Job Task
JobPriority int64
NextRunTime int64
}
// GValue for implement queue.IPriority
func (it *ScheduledJob) GValue() interface{} {
return it
}
// PValue for implement queue.IPriority
func (it *ScheduledJob) PValue() int64 {
return it.JobPriority
}
type groupScheduled struct {
rm sync.Mutex
schedJobs *queue.SQueue
feeder chan *ScheduledJob
feederTaskCheck system.RunningCheck
execTaskCheck system.RunningCheck
interrupt chan struct{}
taskDuration time.Duration
}
const defTaskDuration = time.Microsecond * 500
func newGroupScheduled(capacity int, taskDuration time.Duration) *groupScheduled {
if taskDuration < defTaskDuration {
taskDuration = defTaskDuration
}
return &groupScheduled{
schedJobs: queue.NewSQueue(capacity),
feeder: make(chan *ScheduledJob, 2),
interrupt: make(chan struct{}),
taskDuration: taskDuration,
}
}
func (gs *groupScheduled) AddTask(task Task) {
_ = gs.AddTriggerTask(task, NewSimpleTrigger(time.Second*5))
}
func (gs *groupScheduled) AddTriggerTask(task Task, trigger Trigger) error {
nextRunTime, err := trigger.NextFireTime(timer.NowNano())
if err != nil {
return err
}
gs.rm.Lock()
defer gs.rm.Unlock()
err = gs.schedJobs.Add(&ScheduledJob{Trigger: trigger, Job: task, JobPriority: nextRunTime})
if err == nil {
gs.Run()
}
return err
}
func (gs *groupScheduled) UpdateTask(jobId string, update func(task Task) bool) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
return update(task)
}
return false
}
func (gs *groupScheduled) StopTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) RemoveTask(jobId string) {
gs.rm.Lock()
defer gs.rm.Unlock()
if task := gs.find(jobId); task != nil {
task.Cancel()
}
}
func (gs *groupScheduled) HasTask(jobId string) bool {
gs.rm.Lock()
defer gs.rm.Unlock()
return gs.find(jobId) != nil
}
func (gs *groupScheduled) Run() {
gs.execTaskCheck.GoRunning(gs.execTask)
gs.feederTaskCheck.GoRunning(gs.runFeeder)
}
func (gs *groupScheduled) find(jobId string) Task {
job, idx := gs.schedJobs.Find(func(v interface{}, index int) bool {
if scheduledJob, ok := v.(*ScheduledJob); ok {
return scheduledJob.Job.GetId() == jobId
}
return false
})
if idx == -1 {
return nil
}
if scheduledJob, ok := job.(*ScheduledJob); ok {
return scheduledJob.Job
}
return nil
}
func (gs *groupScheduled) peek() *ScheduledJob {
gs.rm.Lock()
defer gs.rm.Unlock()
var v = gs.schedJobs.Peek()
if schedJob, ok := v.(*ScheduledJob); !ok {
return nil
} else {
return schedJob
}
}
func (gs *groupScheduled) calculateNextTick() time.Duration {
var parkTime = func(ts int64) time.Duration {
now := timer.NowNano()
if ts > now {
return time.Duration(ts - now)
}
return gs.taskDuration
}
schedJob := gs.peek()
if schedJob == nil {
return gs.taskDuration
}
return parkTime(schedJob.JobPriority)
}
func (gs *groupScheduled) execTask() {
var t = time.NewTimer(gs.taskDuration)
for {
t.Reset(gs.calculateNextTick())
select {
case <-system.Closed():
return
case <-t.C:
if gs.runTask() {
break
}
case <-gs.interrupt:
break
}
}
}
func (gs *groupScheduled) addFeeder(item *ScheduledJob) {
gs.rm.Lock()
defer gs.rm.Unlock()
_ = gs.schedJobs.Add(item)
}
func (gs *groupScheduled) runFeeder() {
for {
select {
case item := <-gs.feeder:
gs.addFeeder(item)
gs.interrupt <- struct{}{}
case <-system.Closed():
return
}
}
}
func (gs *groupScheduled) pop() (*ScheduledJob, bool) {
gs.rm.Lock()
defer gs.rm.Unlock()
var notExpired = func(t int64) bool {
return t > timer.NowNano()
}
var v = gs.schedJobs.Peek()
var schedJob, ok = v.(*ScheduledJob)
if !ok || notExpired(schedJob.JobPriority) {
return nil, true
}
gs.schedJobs.Pop()
return schedJob, false
}
func (gs *groupScheduled) runTask() bool {
//没到时间,不用执行
schedJob, ok := gs.pop()
if ok {
return true
}
_, err := schedJob.Job.Execute()
if schedJob.Job.GetState().IsQuit() {
return true
}
nextRunTime, err := schedJob.Trigger.NextFireTime(schedJob.JobPriority)
if err != nil {
return true
}
// 轮到下一个时间
schedJob.JobPriority = nextRunTime
gs.feeder <- schedJob
return false
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.91

搜索帮助