1 Star 3 Fork 0

gm/mingo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
redismq.go 4.89 KB
一键复制 编辑 原始数据 按行查看 历史
Leon 提交于 2024-11-07 15:47 +08:00 . feat:Map
package redismq
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"gitee.com/liangguoming/mingo"
"github.com/redis/go-redis/v9"
)
type processItem struct {
dataType reflect.Type
cbs []MqCallback
}
// 回调函数
type MqCallback func(id string, data any)
type RedisMq struct {
rdb *redis.Client
actionMap map[string]processItem
Prefix string
Group string
Streams []string
Consumer string
}
// data 会被 json 序列化
func (t *RedisMq) Add(channel string, dataPtr any) error {
buff, err := json.Marshal(dataPtr)
if err != nil {
return err
}
t.rdb.XAdd(context.Background(), &redis.XAddArgs{
Stream: t.Prefix + channel,
ID: "*",
Values: []string{"val", string(buff)},
})
return nil
}
func (t *RedisMq) On(channel string, cb MqCallback, dataPtr any) error {
streamName := t.Prefix + channel
ctx := context.Background()
// 检查 stream 是否存在
exist, err := t.rdb.Exists(context.Background(), streamName).Result()
if err != nil {
return err
}
if exist == 0 {
// 未存在 stream
t.rdb.XGroupCreateMkStream(ctx, streamName, t.Group, "0")
} else {
// 已存在 stream,检查消费组未存在就创建
infos, err := t.rdb.XInfoGroups(ctx, streamName).Result()
if err != nil {
return err
}
if !mingo.Includes(infos, func(item *redis.XInfoGroup) bool {
return item.Name == t.Group
}) {
t.rdb.XGroupCreate(ctx, streamName, t.Group, "0")
}
}
tmp := t.actionMap[streamName]
tmp.dataType = reflect.TypeOf(dataPtr).Elem()
if tmp.cbs == nil {
tmp.cbs = make([]MqCallback, 0)
}
tmp.cbs = append(tmp.cbs, cb)
t.actionMap[streamName] = tmp
return nil
}
// 消费消息
func (t *RedisMq) consumMsg(channel string, id string, data string) {
action, exist := t.actionMap[channel]
if !exist {
return
}
buff := reflect.New(action.dataType)
err := json.Unmarshal([]byte(data), buff.Interface())
if err != nil {
fmt.Println(err)
return
}
for _, item := range action.cbs {
go item(id, buff.Elem().Interface())
// item(id, buff.Interface())
}
}
// 清理已消费的消息
// master 节点定期调用
func (t *RedisMq) Clean() error {
ctx := context.Background()
for _, stream := range t.Streams {
ids := []string{}
infos, err := t.rdb.XInfoGroups(ctx, stream).Result()
fmt.Println("group info ", infos)
if err != nil {
fmt.Println(err)
return err
}
for _, group := range infos {
fmt.Println(group.Pending, group.LastDeliveredID)
if group.Pending > 0 {
pendingRes, err := t.rdb.XPending(ctx, stream, group.Name).Result()
if err != nil {
fmt.Println("gen pedding info fail:", err)
return err
}
if pendingRes != nil {
ids = append(ids, pendingRes.Lower)
}
} else {
ids = append(ids, group.LastDeliveredID)
}
}
if len(ids) > 0 {
sort.Strings(ids)
lastId := ids[0]
t.rdb.XTrimMinID(ctx, stream, lastId)
}
}
return nil
}
func streamParams(streams []string, id string) []string {
lenght := len(streams)
res := make([]string, lenght*2)
for i := 0; i < lenght; i++ {
res[i] = streams[i]
}
for i := lenght; i < 2*lenght; i++ {
res[i] = id
}
return res
}
// 启动服务
func (t *RedisMq) Start() {
for key := range t.actionMap {
t.Streams = append(t.Streams, key)
}
if len(t.Streams) == 0 {
return
}
ctx := context.Background()
go func() {
// 故障恢复后,先拉取 pending 数据
pendingStreams := streamParams(t.Streams, "0")
for {
infos, err := t.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: t.Group, Consumer: t.Consumer, Streams: pendingStreams, Count: 1}).Result()
if err != nil {
break
}
if len(infos) == 0 {
break
}
processMsg := 0
for _, item := range infos {
for _, msg := range item.Messages {
t.consumMsg(item.Stream, msg.ID, msg.Values["val"].(string))
processMsg++
}
t.rdb.XAck(ctx, item.Stream, t.Group, mingo.ForEach(item.Messages, func(item *redis.XMessage) string {
return item.ID
})...)
}
// 一条消息都没处理,证明 pedding 队列已经清空
if processMsg == 0 {
break
}
}
// 监听新消息
newStreams := streamParams(t.Streams, ">")
for {
infos, err := t.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: t.Group, Consumer: t.Consumer, Streams: newStreams, Count: 1, Block: 0}).Result()
if err != nil {
fmt.Println(err)
continue
}
for _, item := range infos {
for _, msg := range item.Messages {
t.consumMsg(item.Stream, msg.ID, msg.Values["val"].(string))
}
t.rdb.XAck(ctx, item.Stream, t.Group, mingo.ForEach(item.Messages, func(item *redis.XMessage) string {
return item.ID
})...)
}
}
}()
}
// 创建新消息队列客户端
func NewRedisMq(redisOption *redis.Options, prefix string, group string) *RedisMq {
rdb := redis.NewClient(redisOption)
return &RedisMq{rdb: rdb, actionMap: make(map[string]processItem), Streams: make([]string, 0), Prefix: prefix, Consumer: "default", Group: group}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/liangguoming/mingo.git
git@gitee.com:liangguoming/mingo.git
liangguoming
mingo
mingo
v0.13.2

搜索帮助