2 Star 0 Fork 0

江苏艾雨文承养老机器人有限公司/aywc_judge

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
MQ.go 3.40 KB
一键复制 编辑 原始数据 按行查看 历史
dtal 提交于 2021-06-02 14:21 +08:00 . mtt0408--aywc_1 更改工作目录
package lib
import (
"fmt"
"gitee.com/aywc_1/aywc_judge/src/appinit"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"strings"
)
const (
QUEUE_NEWUSER = "newuser" //用户注册 对应的队列名称
QUEUE_NEWUSER_UNION = "newuser_union" //合作单位用户注册 对应的队列名称
EXCHANGE_USER = "UserExchange" //用户模块相关的交换机
EXCHANGE_USER_DELAY = "UserExchangeDelay"
ROUTER_KEY_USERREG = "userreg" //注册用户的路由key
EXCHANGE_TRANS = "TransExchange" //转账相关交换机
ROUTER_KEY_TRANS = "trans" //转账相关路由key
QUEUE_TRANS = "TransQueueA" //转账相关队列
)
type MQ struct {
Channel *amqp.Channel
notifyConfirm chan amqp.Confirmation
notifyReturn chan amqp.Return
}
var mq *MQ
func NewMQ() {
if appinit.GetConn().IsClosed() {
log.Info("MQ连接断开,重新连接")
appinit.InitRabbitmq()
}
c, err := appinit.GetConn().Channel()
if err != nil {
log.Fatal(err)
}
mq = &MQ{Channel: c}
}
func GetMQ() *MQ {
return mq
}
//申明队列以及绑定路由key
//多个队列 可以用逗号分隔
func (this *MQ) DecQueueAndBind(queues string, key string, exchange string) error {
qList := strings.Split(queues, ",")
for _, queue := range qList {
q, err := this.Channel.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
return err
}
err = this.Channel.QueueBind(q.Name, key, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
func (this *MQ) DecQueueAndBindWithArgs(queues string, key string, exchange string, args map[string]interface{}) error {
qList := strings.Split(queues, ",")
for _, queue := range qList {
q, err := this.Channel.QueueDeclare(queue, false, false, false, false, args)
if err != nil {
return err
}
err = this.Channel.QueueBind(q.Name, key, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
func (this *MQ) NotifyReturn() {
this.notifyReturn = this.Channel.NotifyReturn(make(chan amqp.Return))
go this.listenReturn() //使用协程执行
}
func (this *MQ) listenReturn() {
ret := <-this.notifyReturn
fmt.Println(ret.Headers)
if string(ret.Body) != "" {
log.Info("消息没有正确入列:", string(ret.Body))
}
}
func (this *MQ) SetConfirm() {
err := this.Channel.Confirm(false)
if err != nil {
log.Error(err)
}
this.notifyConfirm = this.Channel.NotifyPublish(make(chan amqp.Confirmation))
}
func (this *MQ) ListenConfirm() {
defer this.Channel.Close()
ret := <-this.notifyConfirm
if ret.Ack {
log.Info("confirm:消息发送成功")
} else {
log.Error("confirm:消息发送失败")
}
}
//发送延迟消息
func (this *MQ) SendDelayMessage(key string, exchange string, message string, delay int) error {
err := this.Channel.Publish(exchange, key, true, false,
amqp.Publishing{
Headers: map[string]interface{}{"x-delay": delay},
ContentType: "text/plain",
Body: []byte(message),
},
)
return err
}
func (this *MQ) SendMessage(key string, exchange string, message string) error {
err := this.Channel.Publish(exchange, key, true, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return err
}
func (this *MQ) Consume(queue string, key string, callbak func(<-chan amqp.Delivery, string)) {
msgs, err := this.Channel.Consume(queue, key, false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
callbak(msgs, key)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/aywc_1/aywc_judge.git
git@gitee.com:aywc_1/aywc_judge.git
aywc_1
aywc_judge
aywc_judge
v0.6.4

搜索帮助