DelayQueue 是使用 Go 语言基于 Redis 实现的支持延时/定时投递的消息队列。
主要特性:
package main
import (
"strconv"
"time"
"github.com/go-redis/redis/v8"
"scrm.cg.xxx/uu-comm-pkg/queue/delayqueue"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
client := delayqueue.NewRedisV8Wrapper(redisCli)
queue := delayqueue.NewQueue0("example", client, func(payload string) bool {
// callback returns true to confirm successful consumption.
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
println(payload)
return true
}).WithConcurrent(4)
// send delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Second, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// send schedule message
for i := 0; i < 10; i++ {
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Second))
if err != nil {
panic(err)
}
}
// start consume
done := queue.StartConsume()
<-done
}
如果您在使用其他的 redis 客户端, 可以将其包装到RedisCli接口中
如果您不想在初始化时设置callback, 您可以使用 WithCallback 函数
默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 delayqueue.NewPublisher
.
func consumer() {
queue := NewQueue0("test", redisCli, cb)
queue.StartConsume()
}
func producer() {
publisher := NewPublisher0("test", redisCli)
publisher.SendDelayMsg(strconv.Itoa(i), 0)
}
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue
callback 函数负责接收并消费消息。callback 返回 true 确认已成功消费,返回 false 表示处理失败,需要重试。
如果没有设置 callback, 调用 StartConsume 时会 panic。
queue := NewQueue0("test", redisCli)
queue.WithCallback(func(payload string) bool {
return true
})
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue
为 DelayQueue 设置 logger
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue
设置消费者并发数
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue
设置消费者从 Redis 拉取消息的时间间隔
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue
设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue
FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit
UseHashTagKey()
UseHashTagKey() 会在 Redis Key 上添加 hash tag 确保同一个队列的所有 Key 分布在同一个哈希槽中。
如果您正在使用 Codis/阿里云/腾讯云等 Redis 集群,请在 NewQueue 时添加这个选项:NewQueue("test", redisCli, cb, UseHashTagKey())
。UseHashTagKey 选项在队列创建后禁止修改。
注意: 修改(添加或移除)此选项会导致无法访问 Redis 中已有的数据。
see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
WithDefaultRetryCount(count uint)
设置队列中消息的默认重试次数。
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
我们提供了 Monitor
来监控运行数据:
monitor := delayqueue.NewMonitor0("example", redisCli)
我们可以使用 Monitor.ListenEvent
注册一个可以收到队列中所有事件的监听器, 从而实现自定义的事件上报和指标监控。
Monitor 可以受到所有 Worker 的事件, 包括运行在其它服务器上的 Worker.
type EventListener interface {
OnEvent(*Event)
}
// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error)
Event 的定义在 events.go.
此外,我们提供了一个 Demo,它会每分钟显示一次队列中产生和处理的消息数量。
Demo 完整代码在 example/monitor.
type MyProfiler struct {
List []*Metrics
Start int64
}
func (p *MyProfiler) OnEvent(event *delayqueue.Event) {
sinceUptime := event.Timestamp - p.Start
upMinutes := sinceUptime / 60
if len(p.List) <= int(upMinutes) {
p.List = append(p.List, &Metrics{})
}
current := p.List[upMinutes]
switch event.Code {
case delayqueue.NewMessageEvent:
current.ProduceCount += event.MsgCount
case delayqueue.DeliveredEvent:
current.DeliverCount += event.MsgCount
case delayqueue.AckEvent:
current.ConsumeCount += event.MsgCount
case delayqueue.RetryEvent:
current.RetryCount += event.MsgCount
case delayqueue.FinalFailedEvent:
current.FailCount += event.MsgCount
}
}
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
client := delayqueue.NewRedisV8Wrapper(redisCli)
queue := delayqueue.NewQueue0("example", client, func(payload string) bool {
return true
})
start := time.Now()
queue.EnableReport()
// setup monitor
monitor := delayqueue.NewMonitor0("example", client)
listener := &MyProfiler{
Start: start.Unix(),
}
monitor.ListenEvent(listener)
// print metrics every minute
tick := time.Tick(time.Minute)
go func() {
for range tick {
minutes := len(listener.List) - 1
fmt.Printf("%d: %#v", minutes, listener.List[minutes])
}
}()
// start test
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
done := queue.StartConsume()
<-done
}
Monitor 使用 redis 的发布订阅功能来收集数据,使用 Monitor 前必须在所有 Worker 处调用 EnableReport
来启用上报。
如果你不想使用 redis pub/sub, 可以调用 DelayQueue.ListenEvent
来直接收集数据。请注意,DelayQueue.ListenEvent
只能收到当前 Worker 的事件, 而 Monitor 可以收到所有 Worker 的事件。
另外,DelayQueue.ListenEvent
会覆盖掉 Monitor 的监听器,再次调用 EnableReport
后 Monitor 才能恢复工作。
Monitor 也可以直接获得一些队列的状态信息。
func (m *Monitor) GetPendingCount() (int64, error)
返回未到投递时间的消息数。
func (m *Monitor) GetReadyCount() (int64, error)
返回已到投递时间但尚未发给 Worker 的消息数。
func (m *Monitor) GetProcessingCount() (int64, error)
返回 Worker 正在处理中的消息数。
如果需要在 Redis Cluster 上工作, 请使用 NewQueueOnCluster
:
redisCli := redis.NewUniversalClient(&redis.NewUniversalClient{
Addrs: []string{
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
},
})
callback := func(s string) bool {
return true
}
queue := NewQueue0("test", redisCli, callback)
如果是阿里云,腾讯云的 Redis 集群版或 codis, twemproxy 这类透明式的集群, 使用 NewQueue
并启用 UseHashTagKey() 即可:
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
return true
}
queue := delayqueue.NewQueue0("example", redisCli, callback, UseHashTagKey())
完整流程如图所示:
整个消息队列中一共有 7 个 Redis 数据结构:
如上图所示整个消息队列中一共涉及 6 个操作:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。