20 Star 165 Fork 27

qiqi/orange

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mq.go 2.94 KB
一键复制 编辑 原始数据 按行查看 历史
qiqi 提交于 2020-06-18 16:43 . 队列消费redis驱动支持
package queue
import (
"errors"
"gitee.com/zhucheer/orange/cfg"
"sync"
)
type MqProducer interface {
SendMsg(topic string, body string) (mqMsg MqMsg, err error)
SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error)
}
type MqConsumer interface {
ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error)
}
const (
_ = iota
SendMsg
ReceiveMsg
)
type MqMsg struct {
RunType int `json:"run_type"`
Topic string `json:"topic"`
MsgId string `json:"msg_id"`
Body []byte `json:"body"`
}
var mqProducerInstanceMap map[string]MqProducer
var mqConsumerInstanceMap map[string]MqConsumer
var mutex sync.Mutex
func init() {
mqProducerInstanceMap = make(map[string]MqProducer)
mqConsumerInstanceMap = make(map[string]MqConsumer)
}
// NewProducer 新建一个生产者实例
func NewProducer(groupName string) (mqClient MqProducer, err error) {
if item, ok := mqProducerInstanceMap[groupName]; ok {
return item, nil
}
driver := cfg.GetString("queue.driver", "")
endpoints := cfg.GetSliceString("queue.endpoints", []string{})
if len(endpoints) == 0 {
return mqClient, errors.New("endpoints is not found.")
}
retry := cfg.GetInt("queue.retry", 2)
if groupName == "" {
return mqClient, errors.New("mq groupName is empty.")
}
switch driver {
case "rocketmq":
mqClient = RegisterRocketProducerMust(endpoints, groupName, retry)
case "redis":
passwd := cfg.GetString("queue.passwd", "")
dbnum := cfg.GetInt("queue.redisdb", 0)
timeout := cfg.GetInt("queue.timeout", 3600)
mqClient = RegisterRedisMqProducerMust(RedisOption{
Addr: endpoints[0],
Passwd: passwd,
DBnum: dbnum,
Timeout: timeout,
}, PoolOption{
5, 50, 5,
}, groupName, retry)
default:
panic("queue driver is not support")
}
mutex.Lock()
defer mutex.Unlock()
mqProducerInstanceMap[groupName] = mqClient
return mqClient, nil
}
// NewConsumer 新建一个消费者实例
func NewConsumer(groupName string) (mqClient MqConsumer, err error) {
if item, ok := mqConsumerInstanceMap[groupName]; ok {
return item, nil
}
endpoints := cfg.GetSliceString("queue.endpoints", []string{})
driver := cfg.GetString("queue.driver", "")
if len(endpoints) == 0 {
return mqClient, errors.New("endpoints is not found.")
}
if groupName == "" {
return mqClient, errors.New("mq groupName is empty.")
}
switch driver {
case "rocketmq":
mqClient = RegisterRocketConsumerMust(endpoints, groupName)
case "redis":
passwd := cfg.GetString("queue.passwd", "")
dbnum := cfg.GetInt("queue.redisdb", 0)
timeout := cfg.GetInt("queue.timeout", 3600)
mqClient = RegisterRedisMqConsumerMust(RedisOption{
Addr: endpoints[0],
Passwd: passwd,
DBnum: dbnum,
Timeout: timeout,
}, PoolOption{
5, 50, 5,
}, groupName)
default:
panic("queue driver is not support")
}
mutex.Lock()
defer mutex.Unlock()
mqConsumerInstanceMap[groupName] = mqClient
return mqClient, nil
}
func (m *MqMsg) BodyString() string {
return string(m.Body)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/zhucheer/orange.git
git@gitee.com:zhucheer/orange.git
zhucheer
orange
orange
v0.2.17

搜索帮助