# xmachinery **Repository Path**: yixixi2020/xmachinery ## Basic Information - **Project Name**: xmachinery - **Description**: No description available - **Primary Language**: Go - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2024-11-06 - **Last Updated**: 2024-11-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # xmachinery ## 介绍 > xmachinery 是[github.com/RichardKnop/machinery](https://github.com/RichardKnop/machinery) 的扩展包,添加定时任务管理API,扩展支持广播任务 ## 核心代码 ### 定时任务管理 ```go func (server *XServer) registerScheduledTask(task ScheduledTask) error { //检查cron表达式 schedule, err := secondsParser.Parse(task.Spec) if err != nil { return err } //移除旧任务 server.removeScheduledTask(task.Id) //包装任务函数 f := func() { //抢占任务锁 err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1) if err != nil { return } //创建任务签名 signature := task.Signature() //发送任务 _, err = server.SendTask(signature) if err != nil { log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error()) } } //添加定时任务 entryId, err := server.scheduler.AddFunc(task.Spec, f) newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...) newTask.entryId = entryId //存储定时任务信息 server.registeredScheduledTasks[task.Id] = newTask return err } ``` ### 广播任务broker ```go func (b *BrokerBroadcast) nextBroadCastTask(queue string) (result []byte, err error) { //默认拉取消息间隔为1000ms pollPeriodMilliseconds := 1000 if b.GetConfig().Redis != nil { configuredPollPeriod := b.GetConfig().Redis.NormalTasksPollPeriod if configuredPollPeriod > 0 { pollPeriodMilliseconds = configuredPollPeriod } } pollPeriod := time.Duration(pollPeriodMilliseconds) * time.Millisecond if b.lastBroadcastMsgId == "" { //若客户端记录的最新消息id为空,则从流中最新的消息id msgs, err := b.rclient.XRevRangeN(context.Background(), queue, "+", "-", 1).Result() if err != nil { return []byte{}, err } if len(msgs) == 0 { b.lastBroadcastMsgId = "0" return []byte{}, redis.Nil } b.lastBroadcastMsgId = msgs[0].ID } //消费广播消息,一次一条 streams, err := b.rclient.XRead(context.Background(), &redis.XReadArgs{ Streams: []string{queue, b.lastBroadcastMsgId}, Count: 1, Block: pollPeriod, }).Result() if err != nil { return []byte{}, err } if len(streams) == 0 || len(streams[0].Messages) == 0 { return []byte{}, redis.Nil } msg := streams[0].Messages[0] b.lastBroadcastMsgId = msg.ID signatureV := msg.Values[defaultStreamBroadcastMsgKey] signatureStr, ok := signatureV.(string) if ok { return []byte(signatureStr), nil } return []byte{}, fmt.Errorf("not support msg type") } ``` ```go func (b *BrokerBroadcast) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) { ... // 监听广播任务的协程 // 任务到达时直接投递给消费者 b.broadcastWG.Add(1) go func() { defer b.broadcastWG.Done() for { select { // 监听消费者是否结束 case <-b.GetStopChan(): return default: task, _ := b.nextBroadCastTask(b.redisBroadcastTasksKey) if len(task) > 0 { deliveries <- task } } } }() ... } ``` ## 示例 ### 定时任务管理api ```go package main import ( "fmt" "gitee.com/sqxwww/xmachinery" "github.com/RichardKnop/machinery/v2" redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis" "github.com/RichardKnop/machinery/v2/config" eagerlock "github.com/RichardKnop/machinery/v2/locks/eager" "github.com/RichardKnop/machinery/v2/tasks" ) var server *xmachinery.XServer func init() { server, _ = startServer() } func main() { server.RegisterScheduledTask(&xmachinery.ScheduledTask{ Id: "countDoneScheduler", TaskCode: "countDown", Spec: "0/2 * * * * ?", Args: []tasks.Arg{{Type: "int", Value: 5}}, }) worker() } func countDown(count int) error { if count <= 0 { fmt.Println("removing countDoneScheduler") //移除定时任务 server.RemoveScheduledTask("countDoneScheduler") return nil } fmt.Println("current count is ", count) count-- //替换定时任务 server.RegisterScheduledTask(&xmachinery.ScheduledTask{ Id: "countDoneScheduler", TaskCode: "countDown", Spec: "0/2 * * * * ?", Args: []tasks.Arg{{Type: "int", Value: count}}, }) return nil } func startServer() (*xmachinery.XServer, error) { cnf := &config.Config{ DefaultQueue: "machinery_tasks", ResultsExpireIn: 3600, Redis: &config.RedisConfig{ MaxIdle: 3, IdleTimeout: 240, ReadTimeout: 15, WriteTimeout: 15, ConnectTimeout: 15, NormalTasksPollPeriod: 1000, DelayedTasksPollPeriod: 500, }, } broker := redisbroker.New(cnf, "localhost:6379", "", "", 0) backend := redisbackend.New(cnf, "localhost:6379", "", "", 0) lock := eagerlock.New() tmp := machinery.NewServer(cnf, broker, backend, lock) server := xmachinery.NewServer(tmp) tasks := map[string]interface{}{ "countDown": countDown, } return server, server.RegisterTasks(tasks) } func worker() error { consumerTag := "machinery_worker" worker := server.NewWorker(consumerTag, 0) return worker.Launch() } ``` ### 广播任务broker ```go package main import ( "fmt" broadcastbroker "gitee.com/sqxwww/xmachinery/brokers/redis" "github.com/RichardKnop/machinery/v2" redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" "github.com/RichardKnop/machinery/v2/config" eagerlock "github.com/RichardKnop/machinery/v2/locks/eager" "github.com/RichardKnop/machinery/v2/tasks" "sync" "time" ) var server *machinery.Server func init() { server, _ = startServer() } func main() { var wg sync.WaitGroup for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() worker() }() } time.Sleep(time.Second) singature, _ := tasks.NewSignature("hello", nil) //设置广播任务头 singature.Headers = map[string]interface{}{"broadcastTask": ""} server.SendTask(singature) wg.Wait() } func startServer() (*machinery.Server, error) { cnf := &config.Config{ DefaultQueue: "machinery_tasks", ResultsExpireIn: 3600, Redis: &config.RedisConfig{ MaxIdle: 3, IdleTimeout: 240, ReadTimeout: 15, WriteTimeout: 15, ConnectTimeout: 15, NormalTasksPollPeriod: 1000, DelayedTasksPollPeriod: 500, }, } //使用支持广播的broker broker := broadcastbroker.New(cnf, []string{"localhost:6379"}, 0) backend := redisbackend.New(cnf, "localhost:6379", "", "", 0) lock := eagerlock.New() server := machinery.NewServer(cnf, broker, backend, lock) tasks := map[string]interface{}{ "hello": func() error { fmt.Println("hello broadcast broker") return nil }, } return server, server.RegisterTasks(tasks) } func worker() error { consumerTag := "machinery_worker" worker := server.NewWorker(consumerTag, 0) errorsChan := make(chan error) worker.LaunchAsync(errorsChan) return <-errorsChan } ```