Fetch the repository succeeded.
package idlrpc
import (
"github.com/go-co-op/gocron"
"sync"
"sync/atomic"
"time"
)
const (
invalidTimerHandler = 0
)
type (
timerFunc func() error
// timerInfo 定时器信息
timerInfo struct {
handler timerHandler // timer 标识
job *gocron.Job // cron 定时任务
status int32 // 任务状态
}
// TimerHandler 定时器句柄
timerHandler uint64
// timerMap 定时器缓存map
timerMap map[timerHandler]*timerInfo
// timerManager 封装的timer管理器
timerManager struct {
rwLock sync.RWMutex // 读写锁
scheduler *gocron.Scheduler // 定时器调度器
timers timerMap // 定时器缓存
timestamp int64 // 当前秒数
count int64 // 当前秒数内的调用次数
}
)
func newTimerManager() *timerManager {
t := &timerManager{
rwLock: sync.RWMutex{},
}
return t
}
// Init 初始化定时器管理器
func (t *timerManager) init(location *time.Location, maxConcurrentJobs int) error {
// 初始化资源
t.rwLock = sync.RWMutex{}
t.timers = make(timerMap, 1024)
// 初始化定时器句柄生成器
t.timestamp = time.Now().Unix()
t.count = 1
// 初始化调度器
t.scheduler = gocron.NewScheduler(location)
// 设置做大运行协程数
t.scheduler.SetMaxConcurrentJobs(maxConcurrentJobs, gocron.RescheduleMode)
// 设置添加任务后,不可以立即执行
t.scheduler.WaitForScheduleAll()
return nil
}
func (t *timerManager) start() {
// 启动调度器
t.scheduler.StartAsync()
}
// Tick 处理定时器主循环
func (t *timerManager) tick() {
// 检测timer handler 更新
t.doTickTimeHandle()
}
func (t *timerManager) addTimer(exec timerFunc, duration time.Duration) (timerHandler, error) {
// 生成定时器句柄
var err error
ti := timerInfo{
handler: t.next(),
job: nil,
status: 0,
}
// 加锁,scheduler 并非线程安全
t.rwLock.Lock()
defer t.rwLock.Unlock()
ti.job, err = t.scheduler.Every(duration).Do(exec)
// 添加失败
if err != nil {
return 0, err
}
t.timers[ti.handler] = &ti
return ti.handler, nil
}
func (t *timerManager) addTimerOnce(exec timerFunc, duration time.Duration) (timerHandler, error) {
// 生成定时器句柄
var err error
ti := timerInfo{
handler: t.next(),
job: nil,
status: 0,
}
// 添加定时器,该函数只执行一次
execWrapper := func() {
_ = exec()
// 删除定时器
_ = t.cancelTimer(ti.handler)
}
// 添加到调度器并执行, 这里需要对scheduler 枷锁,他并非线程安全
t.rwLock.Lock()
defer t.rwLock.Unlock()
ti.job, err = t.scheduler.Every(duration).Do(execWrapper)
// 添加失败
if err != nil {
return 0, err
}
// 成功则存储
t.timers[ti.handler] = &ti
return ti.handler, nil
}
func (t *timerManager) cancelTimer(handle timerHandler) error {
if handle == invalidTimerHandler {
return nil
}
// 查找 timer
t.rwLock.RLock()
ti, ok := t.timers[handle]
t.rwLock.RUnlock()
// 已经不存在了,什么都不用做
if !ok {
return nil
}
// 删除定时器, 同样这里也是需要上锁的
t.rwLock.Lock()
defer t.rwLock.Unlock()
t.scheduler.RemoveByReference(ti.job)
delete(t.timers, handle)
return nil
}
func (t *timerManager) doTickTimeHandle() {
// 每秒重置,timer handler 生成器调用次数
now := time.Now().Unix()
last := atomic.LoadInt64(&t.timestamp)
if now > last {
atomic.StoreInt64(&t.timestamp, now)
atomic.StoreInt64(&t.count, 1)
}
}
// next 产生一个timer handle 句柄
func (t *timerManager) next() timerHandler {
tm := atomic.LoadInt64(&t.timestamp)
count := atomic.AddInt64(&t.count, 1)
return timerHandler(tm<<32 + count)
}
func (t *timerManager) stop() {
// 停止调度器
t.rwLock.Lock()
for _, ti := range t.timers {
t.scheduler.RemoveByReference(ti.job)
}
t.timers = nil
t.rwLock.Unlock()
// 关闭调度器
t.scheduler.Clear()
t.scheduler.Stop()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。