1 Star 0 Fork 0

xingang / gcore2

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
RabbitMqConsumer.go 7.99 KB
一键复制 编辑 原始数据 按行查看 历史
xingang 提交于 2023-02-15 14:52 . gcore2
package rabitmq
import (
"errors"
"fmt"
"gitee.com/qq358678184_admin/gcore2/logHelper"
"github.com/streadway/amqp"
"log"
"sync"
"time"
)
type RabbitMqConsumer struct {
isConnected bool
conn *amqp.Connection
channel *amqp.Channel
lock sync.Mutex
}
func (bus *RabbitMqConsumer) tryConnect() (bool, error) {
if bus.conn == nil || bus.conn.IsClosed() {
bus.lock.Lock()
defer bus.lock.Unlock()
if bus.conn == nil || bus.conn.IsClosed() {
logHelper.Info("开始尝试连接mq")
connectUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", rabbitMqConfig.UserName, rabbitMqConfig.PassWord, rabbitMqConfig.Host, rabbitMqConfig.Port, rabbitMqConfig.VirtualHost)
conn, err := amqp.Dial(connectUrl)
if err != nil {
logHelper.Error(err)
return false, err
}
bus.conn = conn
bus.isConnected = true
logHelper.Info("连接mq成功", connectUrl)
}
}
return true, nil
}
func (bus *RabbitMqConsumer) initDirect(exchangeOpt ExchangeOption, queueOpt QueueOption) error {
if bus.conn == nil || bus.conn.IsClosed() {
bus.tryConnect()
}
if bus.conn == nil {
logHelper.Error("rabbitMq连接失败")
return errors.New("rabbitMq连接失败")
}
//defer bus.conn.Close()
channel, err := bus.conn.Channel()
if err != nil {
logHelper.Error(err)
return err
}
bus.channel = channel
channel.ExchangeDeclare(
exchangeOpt.Name,
amqp.ExchangeDirect,
exchangeOpt.Durable,
exchangeOpt.AutoDelete,
false,
true,
exchangeOpt.Arguments,
)
_, err = channel.QueueDeclare(
queueOpt.Name, // name
queueOpt.Durable, // durable
queueOpt.AutoDelete, // delete when unused
queueOpt.Exclusive, // exclusive
false, // no-wait
queueOpt.Arguments, // arguments
)
if err != nil {
logHelper.Error(err)
return err
}
if queueOpt.RoutingKey == "" {
queueOpt.RoutingKey = queueOpt.Name
}
err = channel.QueueBind(
queueOpt.Name,
queueOpt.RoutingKey,
exchangeOpt.Name,
false,
nil,
)
if err != nil {
logHelper.Error(err)
return err
}
return nil
}
func (bus RabbitMqConsumer) SubscribeDirect(exchangeOpt ExchangeOption, queueOpt QueueOption, f func([]byte) bool) {
logHelper.Info("开始订阅", queueOpt.Name)
err := bus.initDirect(exchangeOpt, queueOpt)
if err != nil {
time.Sleep(time.Second * 5)
bus.SubscribeDirect(exchangeOpt, queueOpt, f)
return
}
closeChan := make(chan *amqp.Error, 1)
notifyClose := bus.channel.NotifyClose(closeChan)
closeFlag := false
defer func() {
r := recover()
if r != nil {
bus.channel.Close()
bus.conn.Close()
}
}()
msgList, err := bus.channel.Consume(
queueOpt.Name, //队列名称
"", //消费者
false, //自动确认
false, //排他
false,
false,
nil,
)
if err != nil {
log.Printf("消费失败,%s", err)
time.Sleep(time.Second * 5)
bus.SubscribeDirect(exchangeOpt, queueOpt, f)
return
}
go func() {
defer func() {
logHelper.Info("释放先前连接")
bus.channel.Close()
bus.conn.Close()
}()
for {
select {
case e, ok := <-notifyClose:
if !ok {
break
}
logHelper.Error("channel close,err is ", e.Error())
time.Sleep(5 * time.Second)
closeFlag = true
case msg, ok := <-msgList:
if !ok {
break
}
res := f(msg.Body)
if res {
msg.Ack(true)
} else {
msg.Nack(true, true)
}
}
if closeFlag {
break
}
}
go func() {
bus.SubscribeDirect(exchangeOpt, queueOpt, f)
}()
}()
}
func (bus *RabbitMqConsumer) SubscribeFanoutEx(queueName string, exchangeOpt ExchangeOption, f func([]byte)) {
logHelper.Info("开始订阅", queueName)
if bus.conn == nil || bus.conn.IsClosed() {
_, err := bus.tryConnect()
if err != nil {
time.Sleep(time.Second * 5)
bus.SubscribeFanout(queueName, exchangeOpt, f)
return
}
}
channel, err := bus.conn.Channel()
if err != nil {
logHelper.Error(err)
time.Sleep(time.Second * 5)
bus.SubscribeFanout(queueName, exchangeOpt, f)
return
}
bus.channel = channel
channel.ExchangeDeclare(
exchangeOpt.Name,
amqp.ExchangeFanout,
exchangeOpt.Durable,
exchangeOpt.AutoDelete,
false,
true,
exchangeOpt.Arguments,
)
_, err = channel.QueueDeclare(
queueName, // name
false, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logHelper.Error(err)
time.Sleep(time.Second * 5)
bus.SubscribeFanout(queueName, exchangeOpt, f)
return
}
err = channel.QueueBind(
queueName,
queueName,
exchangeOpt.Name,
false,
nil,
)
if err != nil {
logHelper.Error(err)
fmt.Println("rabbitMq队列绑定失败:", err)
time.Sleep(time.Second * 5)
bus.SubscribeFanout(queueName, exchangeOpt, f)
return
}
closeChan := make(chan *amqp.Error, 1)
notifyClose := bus.channel.NotifyClose(closeChan)
closeFlag := false
defer func() {
r := recover()
if r != nil {
err4 := bus.channel.Close()
if err4 != nil {
logHelper.Error(err4)
}
//if closeChan != nil && closeFlag {
// logHelper.Info("关闭通道,SubscribeFanout")
// close(closeChan)
//}
}
}()
msgList, err2 := bus.channel.Consume(
queueName, //队列名称
"", //消费者
false, //自动确认
false, //排他
false,
false,
nil,
)
if err2 != nil {
fmt.Println("rabbitMq消费失败:", err2)
logHelper.Error(err2)
time.Sleep(time.Second * 5)
bus.SubscribeFanout(queueName, exchangeOpt, f)
return
}
go func() {
defer func() {
logHelper.Info("释放先前连接")
bus.channel.Close()
bus.conn.Close()
}()
for true {
select {
case e, ok := <-notifyClose:
if !ok {
logHelper.Info("channel notifyClose")
break
}
if e == nil {
break
}
logHelper.Error("channel close,err is ", e.Error())
time.Sleep(time.Second * 5)
closeFlag = true
case msg, ok := <-msgList:
if !ok || msg.Body == nil {
break
}
f(msg.Body)
msg.Ack(true)
default:
//logHelper.Info("结束")
break
}
if closeFlag {
break
}
}
//close(closeChan)
go func() {
bus.SubscribeFanout(queueName, exchangeOpt, f)
}()
}()
}
func (bus *RabbitMqConsumer) SubscribeFanout(queueName string, exchangeOpt ExchangeOption, f func([]byte)) error {
logHelper.Info("开始订阅", queueName)
if bus.conn == nil || bus.conn.IsClosed() {
_, err := bus.tryConnect()
if err != nil {
return err
}
}
channel, err := bus.conn.Channel()
if err != nil {
if err != nil {
return err
}
}
bus.channel = channel
err = channel.ExchangeDeclare(
exchangeOpt.Name,
amqp.ExchangeFanout,
exchangeOpt.Durable,
exchangeOpt.AutoDelete,
false,
true,
exchangeOpt.Arguments,
)
if err != nil {
return err
}
_, err = channel.QueueDeclare(
queueName, // name
false, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
err = channel.QueueBind(
queueName,
queueName,
exchangeOpt.Name,
false,
nil,
)
if err != nil {
return err
}
closeChan := make(chan *amqp.Error, 1)
notifyClose := bus.channel.NotifyClose(closeChan)
closeFlag := false
defer func() {
r := recover()
if r != nil {
err = bus.channel.Close()
if err != nil {
logHelper.Error(err)
}
}
}()
msgList, err2 := bus.channel.Consume(
queueName, //队列名称
"", //消费者
false, //自动确认
false, //排他
false,
false,
nil,
)
if err2 != nil {
return err2
}
go func() {
defer func() {
logHelper.Info("释放先前连接")
bus.channel.Close()
bus.conn.Close()
}()
for true {
select {
case e, ok := <-notifyClose:
if !ok {
logHelper.Info("channel notifyClose")
break
}
if e == nil {
break
}
logHelper.Error("channel close,err is ", e.Error())
time.Sleep(time.Second * 5)
closeFlag = true
case msg, ok := <-msgList:
if !ok || msg.Body == nil {
break
}
f(msg.Body)
msg.Ack(true)
}
if closeFlag {
break
}
}
}()
return nil
}
1
https://gitee.com/qq358678184_admin/gcore2.git
git@gitee.com:qq358678184_admin/gcore2.git
qq358678184_admin
gcore2
gcore2
a1f72da8efa7

搜索帮助