Fetch the repository succeeded.
package rabbitmq
import (
"errors"
"fmt"
"sync"
"time"
"gitee.com/titan-kit/titan/log"
"github.com/streadway/amqp"
)
var (
StateClosed = uint8(0)
StateOpened = uint8(1)
StateReopening = uint8(2)
)
// Option 是ESB选项.
type Option func(*options)
type options struct {
url string // RabbitMQ连接的url
logger log.Logger
}
func New(opts ...Option) *Client {
options := options{url: "amqp://guest:guest@localhost:5672", logger: log.DefaultLogger}
for _, o := range opts {
o(&options)
}
return &Client{
log: log.NewSlf4g("backends/rabbitmq", options.logger),
opt: options,
state: StateClosed,
producerMap: make(map[string]*Producer),
consumerMap: make(map[string]*Consumer),
}
}
type Client struct {
log *log.Slf4g
opt options
// 保护内部数据并发读写
mutex sync.RWMutex
// RabbitMQ TCP连接
conn *amqp.Connection
producerMap map[string]*Producer
consumerMap map[string]*Consumer
// RabbitMQ 监听连接错误
closeC chan *amqp.Error
// 监听用户手动关闭
stopC chan struct{}
// MQ状态
state uint8
}
func (c *Client) Open() (mq *Client, err error) {
// 进行Open期间不允许做任何跟连接有关的事情
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state == StateOpened {
return c, errors.New("RabbitMQ had been opened")
}
if c.conn, err = amqp.Dial(c.opt.url); err != nil {
return c, fmt.Errorf("RabbitMQ Dial err: %v", err)
}
c.state = StateOpened
c.stopC = make(chan struct{})
c.closeC = make(chan *amqp.Error, 1)
c.conn.NotifyClose(c.closeC)
go c.keepalive()
return c, nil
}
func (c *Client) Producer(name string) (*Producer, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state != StateOpened {
return nil, fmt.Errorf("MQ: Not initialized, now state is %b", c.State())
}
p, ok := c.producerMap[name]
if !ok {
p = newProducer(name, c)
c.producerMap[name] = p
}
return p, nil
}
func (c *Client) CloseProducer(name string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state != StateOpened {
return fmt.Errorf("MQ: Not initialized, now state is %b", c.State())
}
if p, ok := c.producerMap[name]; ok {
p.Close()
return nil
}
return errors.New("MQ: producer not exist")
}
func (c *Client) Consumer(name string) (*Consumer, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state != StateOpened {
return nil, fmt.Errorf("MQ: Not initialized, now state is %b", c.State())
}
consumer, ok := c.consumerMap[name]
if !ok {
consumer = newConsumer(name, c)
c.consumerMap[name] = consumer
}
return consumer, nil
}
func (c *Client) CloseConsumer(name string) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state != StateOpened {
return fmt.Errorf("MQ: Not initialized, now state is %b", c.State())
}
if consumer, ok := c.consumerMap[name]; ok {
consumer.Close()
return nil
}
return errors.New("MQ: producer not exist")
}
func (c *Client) Close() {
c.log.Info("RabbitMQ client Close...")
c.mutex.Lock()
// Close producers
for _, p := range c.producerMap {
p.Close()
}
c.producerMap = make(map[string]*Producer)
// Close consumers
for _, co := range c.consumerMap {
co.Close()
}
c.consumerMap = make(map[string]*Consumer)
// Close mq connection
select {
case <-c.stopC:
// had been closed
default:
close(c.stopC)
}
c.mutex.Unlock()
// wait done
for c.State() != StateClosed {
time.Sleep(time.Second)
}
}
func (c *Client) State() uint8 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.state
}
func (c *Client) keepalive() {
select {
case <-c.stopC:
// 正常关闭
c.log.Info("Shutdown RabbitMQ normally.")
c.mutex.Lock()
_ = c.conn.Close()
c.state = StateClosed
c.mutex.Unlock()
case err := <-c.closeC:
if err == nil {
c.log.Error("Disconnected with RabbitMQ, but Error detail is nil")
} else {
c.log.ErrorF("Disconnected with RabbitMQ, code:%d, reason:%s", err.Code, err.Reason)
}
// tcp连接中断, 重新连接
c.mutex.Lock()
c.state = StateReopening
c.mutex.Unlock()
maxRetry := 99999999
for i := 0; i < maxRetry; i++ {
time.Sleep(time.Second)
if _, e := c.Open(); e != nil {
c.log.ErrorF("Connection RabbitMQ recover failed for %d times, %v", i+1, e)
continue
}
c.log.InfoF("Connection RabbitMQ recover OK. Total try %d times", i+1)
return
}
c.log.ErrorF("Try to reconnect to RabbitMQ failed over maxRetry(%d), so exit.", maxRetry)
}
}
func (c *Client) channel() (*amqp.Channel, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.conn.Channel()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。