代码拉取完成,页面将自动刷新
package rabbitmq
import (
"encoding/json"
"reflect"
"time"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
)
// 发送消息到交换机
func (r *rabbitMQ) SendToExchange(exchangeName ExchangeName, msg interface{}, routingKey ...string) (err error) {
if exchangeName == "" {
return errors.New("交换机不能为空")
}
var rk string
if routingKey != nil && len(routingKey) > 0 {
rk = routingKey[0]
}
// 直接发送
return r.send(&sendReq{
Exchange: exchangeName,
RoutingKey: rk,
Msg: msg,
})
}
// 发送消息到指定队列,不是交换机
func (r *rabbitMQ) SendToQueue(queueName QueueName, msg interface{}) error {
// 检查参数
if queueName == "" {
return errors.New("队列不能为空")
}
// 直接发送
return r.send(&sendReq{
RoutingKey: string(queueName),
Msg: msg,
})
}
// 发送延迟消息到指定队列,不是交换机
func (r *rabbitMQ) SendToQueueDelay(queueName QueueName, delay time.Duration, msg interface{}) error {
// 检查参数
if queueName == "" {
return errors.New("队列不能为空")
}
if delay <= time.Second {
return errors.New("延迟时间必须大于等于1秒")
}
_, exist := r.queueDelayMap[queueName][delay]
if !exist {
// 自动创建不存在的延迟队列
err := r.declareDelayQueue(queueName, delay)
if err != nil {
return err
}
if _, ok := r.queueDelayMap[queueName]; !ok {
r.queueDelayMap[queueName] = make(map[time.Duration]struct{})
}
r.queueDelayMap[queueName][delay] = struct{}{}
}
// 直接发送
return r.send(&sendReq{
Queue: queueName,
Msg: msg,
Delay: delay,
})
}
func (r *rabbitMQ) convertMsg(msg interface{}) (data []byte, err error) {
ref := reflect.TypeOf(msg)
for ref.Kind() == reflect.Ptr {
ref = ref.Elem()
}
switch ref.Kind() {
case reflect.Struct, reflect.Map, reflect.Slice:
// 结构体,map,转json
data, err = json.Marshal(msg)
if err != nil {
return nil, errors.Wrap(err, "消息序列json化失败")
}
default:
// 其他转字符串
return nil, errors.New("消息类型只支持结构体和map")
}
return data, nil
}
type sendReq struct {
Exchange ExchangeName // 交换机
Queue QueueName // 队列名
RoutingKey string // 路由
Msg interface{} // 数据
Delay time.Duration // 延迟时间
}
// 发送消息
// 交换机和路由都为空,用队列名做路由发消息给队列
func (r *rabbitMQ) send(req *sendReq) error {
// 断言消息类型
body, err := r.convertMsg(req.Msg)
if err != nil {
return err
}
if r.conn.IsClosed() {
err = r.reConn()
if err != nil {
return err
}
}
ch, err := r.conn.Channel()
if err != nil {
return errors.Wrap(err, "获取mq通道失败")
}
defer func(ch *amqp.Channel) {
_ = ch.Close()
}(ch)
// 使用事务模式
err = ch.Tx()
if err != nil {
return errors.Wrap(err, "开启mq事务模式失败")
}
// 延时队列的消息只通过路由发送到队列
if req.Delay > 0 {
req.Exchange = ""
req.RoutingKey = string(r.getDelayQueueName(req.Queue, req.Delay))
}
err = ch.Publish(string(req.Exchange), req.RoutingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: body,
DeliveryMode: 2, // 持久化消息
})
if err != nil {
_ = ch.TxRollback()
return errors.Wrap(err, "消息发送失败")
}
err = ch.TxCommit()
if err != nil {
return errors.Wrap(err, "提交mq事务失败")
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。