代码拉取完成,页面将自动刷新
package queue
import (
"errors"
"gitee.com/zhucheer/orange/cfg"
"sync"
)
type MqProducer interface {
SendMsg(topic string, body string) (mqMsg MqMsg, err error)
SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
}
type MqConsumer interface {
ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)
}
const (
_ = iota
SendMsg
ReceiveMsg
)
type MqMsg struct {
RunType int `json:"run_type"`
Topic string `json:"topic"`
MsgId string `json:"msg_id"`
Body []byte `json:"body"`
}
var mqProducerInstanceMap map[string]MqProducer
var mqConsumerInstanceMap map[string]MqConsumer
var mutex sync.Mutex
func init() {
mqProducerInstanceMap = make(map[string]MqProducer)
mqConsumerInstanceMap = make(map[string]MqConsumer)
}
// NewProducer 新建一个生产者实例
func NewProducer(groupName string) (mqClient MqProducer, err error) {
if item, ok := mqProducerInstanceMap[groupName]; ok {
return item, nil
}
driver := cfg.GetString("queue.driver", "")
endpoints := cfg.GetSliceString("queue.endpoints", []string{})
if len(endpoints) == 0 {
return mqClient, errors.New("endpoints is not found.")
}
retry := cfg.GetInt("queue.retry", 2)
if groupName == "" {
return mqClient, errors.New("mq groupName is empty.")
}
switch driver {
case "rocketmq":
mqClient = RegisterRocketProducerMust(endpoints, groupName, retry)
case "redis":
passwd := cfg.GetString("queue.passwd", "")
dbnum := cfg.GetInt("queue.redisdb", 0)
timeout := cfg.GetInt("queue.timeout", 3600)
mqClient = RegisterRedisMqProducerMust(RedisOption{
Addr: endpoints[0],
Passwd: passwd,
DBnum: dbnum,
Timeout: timeout,
}, PoolOption{
5, 50, 5,
}, groupName, retry)
default:
panic("queue driver is not support")
}
mutex.Lock()
defer mutex.Unlock()
mqProducerInstanceMap[groupName] = mqClient
return mqClient, nil
}
// NewConsumer 新建一个消费者实例
func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
if item, ok := mqConsumerInstanceMap[groupName]; ok {
return item, nil
}
endpoints := cfg.GetSliceString("queue.endpoints", []string{})
driver := cfg.GetString("queue.driver", "")
if len(endpoints) == 0 {
return mqClient, errors.New("endpoints is not found.")
}
if groupName == "" {
return mqClient, errors.New("mq groupName is empty.")
}
switch driver {
case "rocketmq":
mqClient = RegisterRocketConsumerMust(endpoints, groupName)
case "redis":
passwd := cfg.GetString("queue.passwd", "")
dbnum := cfg.GetInt("queue.redisdb", 0)
timeout := cfg.GetInt("queue.timeout", 3600)
mqClient = RegisterRedisMqConsumerMust(RedisOption{
Addr: endpoints[0],
Passwd: passwd,
DBnum: dbnum,
Timeout: timeout,
}, PoolOption{
5, 50, 5,
}, groupName)
default:
panic("queue driver is not support")
}
mutex.Lock()
defer mutex.Unlock()
mqConsumerInstanceMap[groupName] = mqClient
return mqClient, nil
}
func (m *MqMsg) BodyString() string {
return string(m.Body)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。