1 Star 3 Fork 1

WFL / xmachinery

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
server.go 4.26 KB
一键复制 编辑 原始数据 按行查看 历史
WFL 提交于 2021-03-23 12:34 . feat: 初始化项目
package xmachinery
import (
"github.com/RichardKnop/machinery/v2"
backendsiface "github.com/RichardKnop/machinery/v2/backends/iface"
brokersiface "github.com/RichardKnop/machinery/v2/brokers/iface"
"github.com/RichardKnop/machinery/v2/config"
lockiface "github.com/RichardKnop/machinery/v2/locks/iface"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/RichardKnop/machinery/v2/utils"
"github.com/robfig/cron/v3"
"sync"
"time"
"unsafe"
)
type ServerRef struct {
config *config.Config
registeredTasks *sync.Map
broker brokersiface.Broker
backend backendsiface.Backend
lock lockiface.Lock
scheduler *cron.Cron
prePublishHandler func(*tasks.Signature)
}
type XServer struct {
sync.Mutex
*machinery.Server
scheduler *cron.Cron
machineryLock lockiface.Lock
registeredScheduledTasks map[string]*ScheduledTask
}
//通过原生server构建xserver
func NewServer(server *machinery.Server) *XServer {
mySrv := &XServer{
Server: server,
registeredScheduledTasks: make(map[string]*ScheduledTask),
}
ref := new(ServerRef)
lock := (*lockiface.Lock)(unsafe.Pointer(uintptr(unsafe.Pointer(server)) + unsafe.Offsetof(ref.lock)))
mySrv.machineryLock = *lock
//创建秒级定时器
mySrv.scheduler = cron.New(cron.WithSeconds())
mySrv.scheduler.Start()
return mySrv
}
//注册定时任务
func (server *XServer) RegisterScheduledTask(task *ScheduledTask) error {
server.Lock()
defer server.Unlock()
return server.registerScheduledTask(*task)
}
func (server *XServer) registerScheduledTask(task ScheduledTask) error {
//检查cron表达式
schedule, err := secondsParser.Parse(task.Spec)
if err != nil {
return err
}
//移除旧任务
server.removeScheduledTask(task.Id)
//包装任务函数
f := func() {
//抢占任务锁
err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}
//创建任务签名
signature := task.Signature()
//发送任务
_, err = server.SendTask(signature)
if err != nil {
log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error())
}
}
//添加定时任务
entryId, err := server.scheduler.AddFunc(task.Spec, f)
newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...)
newTask.entryId = entryId
//存储定时任务信息
server.registeredScheduledTasks[task.Id] = newTask
return err
}
//批量注册定时任务
func (server *XServer) RegisterScheduledTasks(tasks []*ScheduledTask) error {
server.Lock()
defer server.Unlock()
for _, task := range tasks {
err := server.registerScheduledTask(*task)
if err != nil {
return err
}
}
return nil
}
//重载所有定时任务
func (server *XServer) ReloadScheduledTasks(tasks []*ScheduledTask) error {
newTaskMap := make(map[string]*ScheduledTask, len(tasks))
for _, task := range tasks {
newTaskMap[task.Id] = task
}
server.Lock()
defer server.Unlock()
/*
* 移除未入参但已注册的
*/
var deleteTasks []*ScheduledTask
for _, oldTask := range server.registeredScheduledTasks {
if _, ok := newTaskMap[oldTask.Id]; !ok {
deleteTasks = append(deleteTasks, oldTask)
}
}
for _, task := range deleteTasks {
server.removeScheduledTask(task.Id)
}
/*
* 重新注册变更或新增的
*/
var reloadTasks []*ScheduledTask
for _, newTask := range newTaskMap {
if oldTask, ok := server.registeredScheduledTasks[newTask.Id]; !ok || !newTask.Equal(oldTask) {
reloadTasks = append(reloadTasks, newTask)
}
}
for _, task := range reloadTasks {
err := server.registerScheduledTask(*task)
if err != nil {
return err
}
}
return nil
}
//移除定时任务
func (server *XServer) RemoveScheduledTask(id string) {
server.Lock()
defer server.Unlock()
server.removeScheduledTask(id)
}
func (server *XServer) removeScheduledTask(id string) {
if old, ok := server.registeredScheduledTasks[id]; ok {
server.scheduler.Remove(old.entryId)
delete(server.registeredScheduledTasks, id)
}
}
var secondsParser = cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/sqxwww/xmachinery.git
git@gitee.com:sqxwww/xmachinery.git
sqxwww
xmachinery
xmachinery
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891