代码拉取完成,页面将自动刷新
package gqueue
import (
"context"
"gitee.com/zjlsliupei/ghelp"
"github.com/beego/beego/v2/core/logs"
"github.com/go-redis/redis/v8"
"github.com/tidwall/gjson"
"strings"
"time"
)
type QueueRedis struct {
option string
rdb *redis.Client
}
func NewQueueRedis(option string) (*QueueRedis, error) {
q := QueueRedis{}
q.option = option
err := q.Init()
return &q, err
}
func (r *QueueRedis) Init() error {
option := gjson.Parse(r.option)
r.rdb = redis.NewClient(&redis.Options{
Addr: option.Get("addr").String(),
Password: option.Get("password").String(), // no password set
DB: int(option.Get("db").Int()), // use default DB
})
return nil
}
func (r *QueueRedis) Publish(queueName, msg string) (string, error) {
cmd := r.do("LPUSH", r.getRealQueueName(queueName), msg)
return "don`t need id", cmd.Err()
}
// getRealQueueName 获取实际queueName,如果有prefix会自动拼装上去
func (r *QueueRedis) getRealQueueName(queueName string) string {
return gjson.Get(r.option, "prefix").String() + queueName
}
func (r *QueueRedis) Subscribe(queueName string, cb func(msg string) bool) {
go func() {
for {
cmd := r.do("BLPOP", r.getRealQueueName(queueName), 5000)
content, err := cmd.Slice()
if err != nil {
// 判断如果是i/o timeout,马上发起请求
if strings.Index(err.Error(), "i/o timeout") >= 0 {
continue
} else {
logs.Error("BLPOP err", err)
time.Sleep(3 * time.Second)
continue
}
}
j := gjson.Parse(ghelp.JsonEncode(content))
msg := j.Get("1").String()
if cb(msg) {
// 执行相关ack操作
} else {
// 重试等机制
}
}
}()
}
func (r *QueueRedis) do(args ...interface{}) *redis.Cmd {
return r.rdb.Do(context.Background(), args...)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。