1 Star 3 Fork 1

WFL / xmachinery

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
README.md 6.93 KB
一键复制 编辑 原始数据 按行查看 历史
WFL 提交于 2021-03-23 12:48 . feat: 更新readme

xmachinery

介绍

xmachinery 是github.com/RichardKnop/machinery 的扩展包,添加定时任务管理API,扩展支持广播任务

核心代码

定时任务管理

func (server *XServer) registerScheduledTask(task ScheduledTask) error {
	//检查cron表达式
	schedule, err := secondsParser.Parse(task.Spec)
	if err != nil {
		return err
	}
	//移除旧任务
	server.removeScheduledTask(task.Id)
	//包装任务函数
	f := func() {
		//抢占任务锁
		err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1)
		if err != nil {
			return
		}
		//创建任务签名
		signature := task.Signature()
		//发送任务
		_, err = server.SendTask(signature)
		if err != nil {
			log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error())
		}
	}
	//添加定时任务
	entryId, err := server.scheduler.AddFunc(task.Spec, f)
	newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...)
	newTask.entryId = entryId
	//存储定时任务信息
	server.registeredScheduledTasks[task.Id] = newTask
	return err
}

广播任务broker

func (b *BrokerBroadcast) nextBroadCastTask(queue string) (result []byte, err error) {
	//默认拉取消息间隔为1000ms
	pollPeriodMilliseconds := 1000
	if b.GetConfig().Redis != nil {
		configuredPollPeriod := b.GetConfig().Redis.NormalTasksPollPeriod
		if configuredPollPeriod > 0 {
			pollPeriodMilliseconds = configuredPollPeriod
		}
	}
	pollPeriod := time.Duration(pollPeriodMilliseconds) * time.Millisecond

	if b.lastBroadcastMsgId == "" {
		//若客户端记录的最新消息id为空,则从流中最新的消息id
		msgs, err := b.rclient.XRevRangeN(context.Background(), queue, "+", "-", 1).Result()
		if err != nil {
			return []byte{}, err
		}
		if len(msgs) == 0 {
			b.lastBroadcastMsgId = "0"
			return []byte{}, redis.Nil
		}
		b.lastBroadcastMsgId = msgs[0].ID
	}

	//消费广播消息,一次一条
	streams, err := b.rclient.XRead(context.Background(), &redis.XReadArgs{
		Streams: []string{queue, b.lastBroadcastMsgId},
		Count:   1,
		Block:   pollPeriod,
	}).Result()
	if err != nil {
		return []byte{}, err
	}

	if len(streams) == 0 || len(streams[0].Messages) == 0 {
		return []byte{}, redis.Nil
	}
	msg := streams[0].Messages[0]
	b.lastBroadcastMsgId = msg.ID
	signatureV := msg.Values[defaultStreamBroadcastMsgKey]
	signatureStr, ok := signatureV.(string)
	if ok {
		return []byte(signatureStr), nil
	}

	return []byte{}, fmt.Errorf("not support msg type")
}
func (b *BrokerBroadcast) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
	...

	// 监听广播任务的协程
	// 任务到达时直接投递给消费者
	b.broadcastWG.Add(1)
	go func() {
		defer b.broadcastWG.Done()

		for {
			select {
			// 监听消费者是否结束
			case <-b.GetStopChan():
				return
			default:
				task, _ := b.nextBroadCastTask(b.redisBroadcastTasksKey)
				if len(task) > 0 {
					deliveries <- task
				}
			}
		}
	}()

	...
}

示例

定时任务管理api

package main

import (
	"fmt"
	"gitee.com/sqxwww/xmachinery"
	"github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	"github.com/RichardKnop/machinery/v2/config"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/tasks"
)

var server *xmachinery.XServer

func init() {
	server, _ = startServer()
}

func main() {
	server.RegisterScheduledTask(&xmachinery.ScheduledTask{
		Id:       "countDoneScheduler",
		TaskCode: "countDown",
		Spec:     "0/2 * * * * ?",
		Args:     []tasks.Arg{{Type: "int", Value: 5}},
	})
	worker()
}

func countDown(count int) error {
	if count <= 0 {
		fmt.Println("removing countDoneScheduler")
		//移除定时任务
		server.RemoveScheduledTask("countDoneScheduler")
		return nil
	}
	fmt.Println("current count is ", count)
	count--
	//替换定时任务
	server.RegisterScheduledTask(&xmachinery.ScheduledTask{
		Id:       "countDoneScheduler",
		TaskCode: "countDown",
		Spec:     "0/2 * * * * ?",
		Args:     []tasks.Arg{{Type: "int", Value: count}},
	})
	return nil
}

func startServer() (*xmachinery.XServer, error) {
	cnf := &config.Config{
		DefaultQueue:    "machinery_tasks",
		ResultsExpireIn: 3600,
		Redis: &config.RedisConfig{
			MaxIdle:                3,
			IdleTimeout:            240,
			ReadTimeout:            15,
			WriteTimeout:           15,
			ConnectTimeout:         15,
			NormalTasksPollPeriod:  1000,
			DelayedTasksPollPeriod: 500,
		},
	}

	broker := redisbroker.New(cnf, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	tmp := machinery.NewServer(cnf, broker, backend, lock)
	server := xmachinery.NewServer(tmp)

	tasks := map[string]interface{}{
		"countDown": countDown,
	}

	return server, server.RegisterTasks(tasks)
}

func worker() error {
	consumerTag := "machinery_worker"
	worker := server.NewWorker(consumerTag, 0)
	return worker.Launch()
}

广播任务broker

package main

import (
	"fmt"
	broadcastbroker "gitee.com/sqxwww/xmachinery/brokers/redis"
	"github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	"github.com/RichardKnop/machinery/v2/config"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"github.com/RichardKnop/machinery/v2/tasks"
	"sync"
	"time"
)

var server *machinery.Server

func init() {
	server, _ = startServer()
}

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 2; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker()
		}()
	}
	time.Sleep(time.Second)
	singature, _ := tasks.NewSignature("hello", nil)
	//设置广播任务头
	singature.Headers = map[string]interface{}{"broadcastTask": ""}
	server.SendTask(singature)
	wg.Wait()
}

func startServer() (*machinery.Server, error) {
	cnf := &config.Config{
		DefaultQueue:    "machinery_tasks",
		ResultsExpireIn: 3600,
		Redis: &config.RedisConfig{
			MaxIdle:                3,
			IdleTimeout:            240,
			ReadTimeout:            15,
			WriteTimeout:           15,
			ConnectTimeout:         15,
			NormalTasksPollPeriod:  1000,
			DelayedTasksPollPeriod: 500,
		},
	}
	//使用支持广播的broker
	broker := broadcastbroker.New(cnf, []string{"localhost:6379"}, 0)
	backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	server := machinery.NewServer(cnf, broker, backend, lock)

	tasks := map[string]interface{}{
		"hello": func() error {
			fmt.Println("hello broadcast broker")
			return nil
		},
	}

	return server, server.RegisterTasks(tasks)
}

func worker() error {
	consumerTag := "machinery_worker"
	worker := server.NewWorker(consumerTag, 0)
	errorsChan := make(chan error)

	worker.LaunchAsync(errorsChan)

	return <-errorsChan
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/sqxwww/xmachinery.git
git@gitee.com:sqxwww/xmachinery.git
sqxwww
xmachinery
xmachinery
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891