1 Star 0 Fork 0

vick / kinfu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
frabbitmq.go 4.89 KB
一键复制 编辑 原始数据 按行查看 历史
vick 提交于 2024-03-04 19:35 . fix:fgin框架提交
package fgin
import (
"errors"
"fmt"
"sync"
"github.com/streadway/amqp"
)
var (
rabbitmqCfg = make(map[string]*FginRabbitmq)
)
// 框架的默认队列对象
type FginRabbitmq struct {
QueueName string
MaxRetry int32
MaxConsume int
RabbitmqConn *amqp.Connection
RabbitmqCh []*amqp.Channel
RabbitmqQueue *amqp.Queue
}
// 初始化rabbitmq
func initRabbitmq() {
if len(fginConfig.Rabbitmq) == 0 {
// 没有配置rabbitmq,直接返回
return
}
for _, rb := range fginConfig.Rabbitmq {
// 记录拨号成功
conn, err := amqp.Dial(rb.Address)
if err != nil {
panic("rabbitmq初始化失败, " + rb.QueueName + "拨号失败:" + err.Error())
}
rabbitmqCfg[rb.QueueName] = &FginRabbitmq{
QueueName: rb.QueueName,
MaxRetry: rb.MaxRetry,
MaxConsume: rb.MaxConsume,
RabbitmqConn: conn,
}
}
}
// 关闭已开启的连接和信道
func rabbitmqClose() {
for _, v := range rabbitmqCfg {
for _, ch := range v.RabbitmqCh {
ch.Close()
}
v.RabbitmqConn.Close()
}
}
// 声明默认队列
func DefaultQueue(queueName string) (*FginRabbitmq, error) {
cfg, ok := rabbitmqCfg[queueName]
if !ok {
return nil, errors.New("没有配置队列" + queueName)
}
conn := cfg.RabbitmqConn
// 创建信道
ch, err := conn.Channel()
if err != nil {
return nil, errors.New("rabbitmq初始化失败, " + queueName + "信道失败:" + err.Error())
}
cfg.RabbitmqCh = append(cfg.RabbitmqCh, ch)
q, err := ch.QueueDeclare(
queueName, // name
true, // durable 持久化
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
cfg.RabbitmqQueue = &q
return cfg, nil
}
// 默认生产者
func (fr *FginRabbitmq) DefaultProducter(contentType string, body []byte) error {
if fr.RabbitmqQueue == nil {
return errors.New("没有使用默认队列 DefaultQueue")
}
chLs := fr.RabbitmqCh
ch := chLs[0]
// 发送消息
if err := ch.Publish(
"", // 交换机
fr.QueueName, // 队列名,
false,
false,
amqp.Publishing{
Headers: amqp.Table{
"retry_count": fr.MaxRetry, // 重试最大次数
},
DeliveryMode: 2, // 消息持久化
ContentType: contentType,
Body: body,
},
); err != nil {
return err
}
return nil
}
// 默认消费者
func (fr *FginRabbitmq) DefaultConsumer(hander func(msg amqp.Delivery) error) error {
if fr.RabbitmqQueue == nil {
return errors.New("没有使用默认队列 DefaultQueue")
}
// 根据配置创建充足的信道
for i := len(fr.RabbitmqCh); i < fr.MaxConsume; i++ {
conn := fr.RabbitmqConn
ch, err := conn.Channel()
if err != nil {
info := fmt.Sprintf("创建消费者信道%v失败, err:%v", i, err.Error())
logSuger.Error(info)
}
fr.RabbitmqCh = append(fr.RabbitmqCh, ch)
}
// 创建多个消费者
var wg sync.WaitGroup
for i, ch := range fr.RabbitmqCh {
wg.Add(1)
go func(i int, ch *amqp.Channel) {
q, err := ch.QueueDeclare(
fr.QueueName, // name
true, // durable 持久化
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
info := fmt.Sprintf("创建消费者队列%v失败, err:%v", i, err.Error())
logSuger.Error(info)
return
}
msgs, err := ch.Consume(
q.Name, // 需要操作的队列名
fmt.Sprintf("%v_%v_%v", q.Name, "cosumer", i), // 消费者唯一id,不填,则自动生成一个唯一值
false, // 自动提交消息(即自动确认消息已经处理完成)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
info := fmt.Sprintf("消费者队列%v获取消息失败, err:%v", i, err.Error())
logSuger.Error(info)
return
}
// 当消费者连通后,消除等待
wg.Done()
for msg := range msgs {
if err := hander(msg); err != nil {
retryCount, ok := msg.Headers["retry_count"].(int32)
if !ok {
retryCount = 0
}
fr.retryAck(msg, ch, retryCount)
} else {
msg.Ack(false)
}
}
}(i, ch)
}
wg.Wait()
return nil
}
// 消费失败触发重试
func (fr *FginRabbitmq) retryAck(msg amqp.Delivery, ch *amqp.Channel, retryCount int32) {
if retryCount <= 0 {
logSuger.Error("已重试最大次数, body:", string(msg.Body))
msg.Reject(false)
return
}
endRetryCount := retryCount - 1
if err := ch.Publish(
"", // 交换机
fr.QueueName, // 队列名,
false,
false,
amqp.Publishing{
Headers: amqp.Table{
"retry_count": endRetryCount, // 重试最大次数,余量
},
DeliveryMode: 2, // 消息持久化
ContentType: msg.ContentType,
Body: msg.Body,
},
); err != nil {
logSuger.Error("消息重新入队失败, body:" + string(msg.Body) + "; err:" + err.Error())
// 再次重试发送
fr.retryAck(msg, ch, endRetryCount)
return
}
msg.Ack(false)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/wu-jin-feng/kinfu.git
git@gitee.com:wu-jin-feng/kinfu.git
wu-jin-feng
kinfu
kinfu
c48b5026ddc5

搜索帮助

Bbcd6f05 5694891 0cc6727d 5694891