20 Star 165 Fork 27

qiqi/orange

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
redismq.go 5.43 KB
一键复制 编辑 原始数据 按行查看 历史
qiqi 提交于 2020-06-18 16:43 . 队列消费redis驱动支持
package queue
import (
"encoding/json"
"errors"
"fmt"
"gitee.com/zhucheer/orange/utils"
"github.com/gomodule/redigo/redis"
"github.com/zhuCheer/pool"
"math/rand"
"time"
)
type RedisMq struct {
poolName string
groupName string
retry int
timeout int
}
type PoolOption struct {
InitCap int
MaxCap int
IdleTimeout int
}
type RedisOption struct {
Addr string
Passwd string
DBnum int
Timeout int
}
var redisPoolMap map[string]pool.Pool
func init() {
redisPoolMap = make(map[string]pool.Pool)
}
// SendMsg 按字符串类型生产数据
func (r *RedisMq) SendMsg(topic string, body string) (mqMsg MqMsg, err error) {
return r.SendByteMsg(topic, []byte(body))
}
// SendByteMsg 生产数据
func (r *RedisMq) SendByteMsg(topic string, body []byte) (mqMsg MqMsg, err error) {
if r.poolName == "" {
return mqMsg, errors.New("RedisMq producer not register")
}
if topic == "" {
return mqMsg, errors.New("RedisMq topic is empty")
}
msgId := getRandMsgId()
rdx, put, err := getRedis(r.poolName, r.retry)
defer put()
if err != nil {
return
}
mqMsg = MqMsg{
RunType: SendMsg,
Topic: topic,
MsgId: msgId,
Body: body,
}
mqMsgJson, _ := json.Marshal(mqMsg)
queueName := fmt.Sprintf("ORANGE-REDISQUEUE:%s-%s", r.groupName, topic)
_, err = redis.Int64(rdx.Do("LPUSH", queueName, mqMsgJson))
rdx.Do("EXPIRE", queueName, r.timeout)
return
}
// ListenReceiveMsgDo 消费数据
func (r *RedisMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg MqMsg)) (err error) {
if r.poolName == "" {
return errors.New("RedisMq producer not register")
}
if topic == "" {
return errors.New("RedisMq topic is empty")
}
queueName := fmt.Sprintf("ORANGE-REDISQUEUE:%s-%s", r.groupName, topic)
go func() {
for range time.Tick(1000 * time.Millisecond) {
mqMsgList := r.loopReadQueue(queueName)
for _, item := range mqMsgList {
receiveDo(item)
}
}
}()
return
}
func (r *RedisMq) loopReadQueue(queueName string) (mqMsgList []MqMsg) {
rdx, put, err := getRedis(r.poolName, r.retry)
defer put()
if err != nil {
return
}
for {
infoByte, err := redis.Bytes(rdx.Do("RPOP", queueName))
if err != nil || len(infoByte) == 0 {
break
}
var mqMsg MqMsg
json.Unmarshal(infoByte, &mqMsg)
if mqMsg.MsgId != "" {
mqMsgList = append(mqMsgList, mqMsg)
}
}
return mqMsgList
}
func RegisterRedisMqProducerMust(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (client MqProducer) {
var err error
client, err = RegisterRedisMq(connOpt, poolOpt, groupName, retry)
if err != nil {
panic(err)
}
return client
}
// RegisterRedisMqConsumerMust 注册消费者
func RegisterRedisMqConsumerMust(connOpt RedisOption, poolOpt PoolOption, groupName string) (client MqConsumer) {
var err error
client, err = RegisterRedisMq(connOpt, poolOpt, groupName, 0)
if err != nil {
panic(err)
}
return client
}
// RegisterRedisMq 注册redismq实例
func RegisterRedisMq(connOpt RedisOption, poolOpt PoolOption, groupName string, retry int) (mqIns *RedisMq, err error) {
poolName, err := registerRedis(connOpt.Addr, connOpt.Passwd, connOpt.DBnum, poolOpt)
if err != nil {
return
}
if retry <= 0 {
retry = 0
}
mqIns = &RedisMq{
poolName: poolName,
groupName: groupName,
retry: retry,
timeout: connOpt.Timeout,
}
return mqIns, nil
}
// RegisterRedis 注册一个redis配置
func registerRedis(host, passwd string, dbnum int, opt PoolOption) (poolName string, err error) {
poolName = utils.Md5ToString(fmt.Sprintf("%s-%s-%d", host, passwd, dbnum))
if _, ok := redisPoolMap[poolName]; ok {
return poolName, nil
}
connRedis := func() (interface{}, error) {
conn, err := redis.Dial("tcp", host)
if err != nil {
return nil, err
}
if passwd != "" {
_, err := conn.Do("AUTH", passwd)
if err != nil {
return nil, err
}
}
if dbnum > 0 {
_, err := conn.Do("SELECT", dbnum)
if err != nil {
return nil, err
}
}
return conn, err
}
// closeRedis 关闭连接
closeRedis := func(v interface{}) error {
return v.(redis.Conn).Close()
}
// pingRedis 检测连接连通性
pingRedis := func(v interface{}) error {
conn := v.(redis.Conn)
val, err := redis.String(conn.Do("PING"))
if err != nil {
return err
}
if val != "PONG" {
return errors.New("queue redis ping is error ping => " + val)
}
return nil
}
p, err := pool.NewChannelPool(&pool.Config{
InitialCap: opt.InitCap,
MaxCap: opt.MaxCap,
Factory: connRedis,
Close: closeRedis,
Ping: pingRedis,
IdleTimeout: time.Duration(opt.IdleTimeout) * time.Second,
})
if err != nil {
return poolName, err
}
mutex.Lock()
defer mutex.Unlock()
redisPoolMap[poolName] = p
return poolName, nil
}
// getRedis 获取一个redis db连接
func getRedis(poolName string, retry int) (db redis.Conn, put func(), err error) {
put = func() {}
if _, ok := redisPoolMap[poolName]; ok == false {
return nil, put, errors.New("db connect is nil")
}
redisPool := redisPoolMap[poolName]
conn, err := redisPool.Get()
for i := 0; i < retry; i++ {
if err == nil {
break
}
conn, err = redisPool.Get()
time.Sleep(time.Second)
}
if err != nil {
return nil, put, err
}
put = func() {
redisPool.Put(conn)
}
db = conn.(redis.Conn)
return db, put, nil
}
func getRandMsgId() (msgId string) {
rand.Seed(time.Now().UnixNano())
randnum := rand.Intn(999) + 1
timeCode := time.Now().UnixNano()
msgId = fmt.Sprintf("%d%.4d", timeCode, randnum)
return msgId
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/zhucheer/orange.git
git@gitee.com:zhucheer/orange.git
zhucheer
orange
orange
v0.2.17

搜索帮助