1 Star 0 Fork 0

sven / gqueue

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
redis.go 1.73 KB
一键复制 编辑 原始数据 按行查看 历史
sven 提交于 2023-11-28 16:13 . test
package gqueue
import (
"context"
"gitee.com/zjlsliupei/ghelp"
"github.com/beego/beego/v2/core/logs"
"github.com/go-redis/redis/v8"
"github.com/tidwall/gjson"
"strings"
"time"
)
type QueueRedis struct {
option string
rdb *redis.Client
}
func NewQueueRedis(option string) (*QueueRedis, error) {
q := QueueRedis{}
q.option = option
err := q.Init()
return &q, err
}
func (r *QueueRedis) Init() error {
option := gjson.Parse(r.option)
r.rdb = redis.NewClient(&redis.Options{
Addr: option.Get("addr").String(),
Password: option.Get("password").String(), // no password set
DB: int(option.Get("db").Int()), // use default DB
})
return nil
}
func (r *QueueRedis) Publish(queueName, msg string) (string, error) {
cmd := r.do("LPUSH", r.getRealQueueName(queueName), msg)
return "don`t need id", cmd.Err()
}
// getRealQueueName 获取实际queueName,如果有prefix会自动拼装上去
func (r *QueueRedis) getRealQueueName(queueName string) string {
return gjson.Get(r.option, "prefix").String() + queueName
}
func (r *QueueRedis) Subscribe(queueName string, cb func(msg string) bool) {
go func() {
for {
cmd := r.do("BLPOP", r.getRealQueueName(queueName), 5000)
content, err := cmd.Slice()
if err != nil {
// 判断如果是i/o timeout,马上发起请求
if strings.Index(err.Error(), "i/o timeout") >= 0 {
continue
} else {
logs.Error("BLPOP err", err)
time.Sleep(3 * time.Second)
continue
}
}
j := gjson.Parse(ghelp.JsonEncode(content))
msg := j.Get("1").String()
if cb(msg) {
// 执行相关ack操作
} else {
// 重试等机制
}
}
}()
}
func (r *QueueRedis) do(args ...interface{}) *redis.Cmd {
return r.rdb.Do(context.Background(), args...)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zjlsliupei/gqueue.git
git@gitee.com:zjlsliupei/gqueue.git
zjlsliupei
gqueue
gqueue
master

搜索帮助