1 Star 1 Fork 0

titan-kit/titan

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
consumer.go 5.56 KB
Copy Edit Raw Blame History
蝶衣人生 authored 2021-05-29 14:04 . first commit
package rabbitmq
import (
"errors"
"fmt"
"sync"
"time"
"gitee.com/titan-kit/titan/log"
"github.com/streadway/amqp"
)
type Delivery struct {
amqp.Delivery
}
// Consumer 基于RabbitMQ消息中间件的客户端实现。
type Consumer struct {
log *log.Slf4g
// Consumer的名字, "" is OK
name string
// MQ实例
client *Client
// 保护数据并发安全
mutex sync.RWMutex
// MQ的会话channel
ch *amqp.Channel
// MQ的exchange与其绑定的queues
exchangeBinds []*ExchangeBinds
// Qos取值范围
prefetch int
// 上层用于接收消费出来的消息的管道
callback chan<- Delivery
// 监听会话channel关闭
closeC chan *amqp.Error
// Consumer关闭控制
stopC chan struct{}
// Consumer状态
state uint8
}
func newConsumer(name string, client *Client) *Consumer {
return &Consumer{
name: name,
client: client,
stopC: make(chan struct{}),
}
}
func (c *Consumer) Name() string {
return c.name
}
// CloseChan 该接口仅用于测试使用, 勿手动调用
func (c *Consumer) CloseChan() {
c.mutex.Lock()
_ = c.ch.Close()
c.mutex.Unlock()
}
func (c *Consumer) SetExchangeBinds(eb []*ExchangeBinds) *Consumer {
c.mutex.Lock()
if c.state != StateOpened {
c.exchangeBinds = eb
}
c.mutex.Unlock()
return c
}
func (c *Consumer) SetMsgCallback(cb chan<- Delivery) *Consumer {
c.mutex.Lock()
c.callback = cb
c.mutex.Unlock()
return c
}
// SetQos 设置channel粒度的Qos, prefetch取值范围[0,∞), 默认为0
// 如果想要RoundRobin地进行消费,设置prefetch为1即可
// 注意:在调用Open前设置
func (c *Consumer) SetQos(prefetch int) *Consumer {
c.mutex.Lock()
c.prefetch = prefetch
c.mutex.Unlock()
return c
}
func (c *Consumer) Open() error {
// Open期间不允许对channel做任何操作
c.mutex.Lock()
defer c.mutex.Unlock()
// 参数校验
if c.client == nil {
return errors.New("rabbit mq bad consumer")
}
if len(c.exchangeBinds) <= 0 {
return errors.New("rabbit mq no exchangeBinds found, you should SetExchangeBinds before open")
}
// 状态检测
if c.state == StateOpened {
return errors.New("rabbit mq Consumer had been opened")
}
// 初始化channel
ch, err := c.client.channel()
if err != nil {
return fmt.Errorf("rabbit mq Create channel failed, %v", err)
}
err = func(ch *amqp.Channel) error {
var e error
if e = applyExchangeBinds(ch, c.exchangeBinds); e != nil {
return e
}
if e = ch.Qos(c.prefetch, 0, false); e != nil {
return e
}
return nil
}(ch)
if err != nil {
return fmt.Errorf("rabbit mq %v", err)
}
c.ch = ch
c.state = StateOpened
c.stopC = make(chan struct{})
c.closeC = make(chan *amqp.Error, 1)
c.ch.NotifyClose(c.closeC)
// 开始循环消费
opt := DefaultConsumeOption()
notify := make(chan error, 1)
c.consume(opt, notify)
for e := range notify {
if e != nil {
c.log.ErrorF("notify error %v", e)
continue
}
break
}
close(notify)
// 健康检测
go c.keepalive()
return nil
}
func (c *Consumer) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()
select {
case <-c.stopC:
// had been closed
default:
close(c.stopC)
}
}
// notifyErr 向上层抛出错误, 如果error为空表示执行完成.由上层负责关闭channel
func (c *Consumer) consume(opt *ConsumeOption, notifyErr chan<- error) {
for idx, eb := range c.exchangeBinds {
if eb == nil {
notifyErr <- fmt.Errorf("rabbit mq ExchangeBinds[%d] is nil, consumer(%s)", idx, c.name)
continue
}
for i, b := range eb.Bindings {
if b == nil {
notifyErr <- fmt.Errorf("rabbit mq Binding[%d] is nil, ExchangeBinds[%d], consumer(%s)", i, idx, c.name)
continue
}
for qi, q := range b.Queues {
if q == nil {
notifyErr <- fmt.Errorf("rabbit mq Queue[%d] is nil, ExchangeBinds[%d], Biding[%d], consumer(%s)", qi, idx, i, c.name)
continue
}
delivery, err := c.ch.Consume(q.Name, "", opt.AutoAck, opt.Exclusive, opt.NoLocal, opt.NoWait, opt.Args)
if err != nil {
notifyErr <- fmt.Errorf("rabbit mq Consumer(%s) consume queue(%s) failed, %v", c.name, q.Name, err)
continue
}
go c.deliver(delivery)
}
}
}
notifyErr <- nil
}
// FIXME 收到自己发出的消息
func (c *Consumer) deliver(delivery <-chan amqp.Delivery) {
for d := range delivery {
if c.callback != nil {
c.callback <- Delivery{d}
}
}
}
func (c *Consumer) State() uint8 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.state
}
func (c *Consumer) keepalive() {
select {
case <-c.stopC:
// 正常关闭
c.log.InfoF("RabbitMQ Consumer(%s) shutdown normally", c.Name())
c.mutex.Lock()
_ = c.ch.Close()
c.ch = nil
c.state = StateClosed
c.mutex.Unlock()
case err := <-c.closeC:
if err == nil {
c.log.ErrorF("RabbitMQ Consumer(%s)'s channel was closed, but Error detail is nil", c.name)
} else {
c.log.ErrorF("RabbitMQ Consumer(%s)'s channel was closed, code:%d, reason:%s", c.name, err.Code, err.Reason)
}
// channel被异常关闭了
c.mutex.Lock()
c.state = StateReopening
c.mutex.Unlock()
maxRetry := 99999999
for i := 0; i < maxRetry; i++ {
time.Sleep(time.Second)
if c.client.State() != StateOpened {
c.log.InfoF("Rabbit mq Consumer(%s) try to recover channel for %d times, but mq's state != StateOpened", c.name, i+1)
continue
}
if e := c.Open(); e != nil {
c.log.ErrorF("RabbitMQ Consumer(%s) recover channel failed for %d times, Err:%v", c.name, i+1, e)
continue
}
c.log.InfoF("RabbitMQ Consumer(%s) recover channel OK. Total try %d times", c.name, i+1)
return
}
c.log.ErrorF("RabbitMQ Consumer(%s) try to recover channel over maxRetry(%d), so exit", c.name, maxRetry)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/titan-kit/titan.git
git@gitee.com:titan-kit/titan.git
titan-kit
titan
titan
v0.0.4

Search