1 Star 1 Fork 0

titan-kit/titan

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
esbProvider.go 4.79 KB
Copy Edit Raw Blame History
package rabbitmq
import (
"encoding/json"
"errors"
"gitee.com/titan-kit/titan/esb"
"gitee.com/titan-kit/titan/log"
"github.com/streadway/amqp"
)
var _ esb.Provider = &rabbitProvider{}
func NewProvider(node esb.Node, logger log.Logger, client *Client) *rabbitProvider {
return &rabbitProvider{node.String(), client, log.NewSlf4g("backends/RabbitMQProvider", logger)}
}
type rabbitProvider struct {
node string
client *Client
log *log.Slf4g
}
// Listen 监听指定的消息队列,当有新消息或事务应答消息送达时会调用监听器接口,可同时监听多个不同的队列。特别注意:同一个消息队列只能有一个监听器!
//
// @param name 要监听的消息队列
// @param listener 消息监听器
func (rp *rabbitProvider) Listen(name string, listener esb.MessageListener) (func(), error) {
rp.Cancel(name)
consumer, err := rp.client.Consumer(name)
if err != nil {
rp.log.ErrorF("Create consumer failed, %v", err)
return nil, errors.New("Create consumer failed ")
}
exchange, queue, route := DestroyQueueName(name)
msgC := make(chan Delivery, 1)
consumer.SetMsgCallback(msgC).SetQos(1).SetExchangeBinds([]*ExchangeBinds{
{
Exchange: DefaultExchange(exchange, amqp.ExchangeDirect, nil),
Bindings: []*Binding{
{RouteKey: route, Queues: []*Queue{DefaultQueue(queue, nil)}},
},
},
})
if err = consumer.Open(); err != nil {
rp.log.ErrorF("Open failed, %v\n", err)
return nil, errors.New("Open failed ")
}
go rp.recoverMask(msgC, listener)
return func() {
rp.log.Info("Close msg callback channel")
close(msgC)
}, nil
}
func (rp *rabbitProvider) recoverMask(msgC chan Delivery, listener esb.MessageListener) {
defer func() {
if err := recover(); err != nil {
rp.log.ErrorF("[RabbitMQ-%s] goroutine handle panic:%+v", rp.node, err)
go rp.recoverMask(msgC, listener)
}
}()
for {
select {
case msg := <-msgC:
rp.maskMsg(msg, listener)
}
}
}
func (rp *rabbitProvider) maskMsg(msg Delivery, listener esb.MessageListener) {
payload := &esb.MsgPayload{}
defer msg.Ack(false)
var err error
if err = json.Unmarshal(msg.Body, payload); err != nil {
rp.log.ErrorF("[RabbitMQ-%s] Decode msg failed: %s", rp.node, err)
return
}
rp.log.DebugF("[RabbitMQ-%s]收到消息 %+v", rp.node, payload)
// 检查签名
if esb.Signature(payload) != payload.Sign {
rp.log.WarningF("[ESBClient-%s]收到无效签名消息 %s", rp.node, payload.MsgId)
return
}
var retMsg *esb.MsgPayload
if payload.Phase == esb.SenderReq {
retMsg, err = esb.HandleNew(payload, listener)
} else {
retMsg, err = esb.HandleAck(payload, listener)
}
if err != nil {
rp.log.ErrorF("Handle msg error: %s", err)
return
}
if retMsg != nil {
if err = rp.sendMessage(retMsg); err != nil {
rp.log.ErrorF("[RabbitMQ-%s]发送消息到ESB发生错误:%+v", rp.node, err)
}
}
}
// Cancel 取消指定队列的监听。
//
// @param name 队列名称
func (rp *rabbitProvider) Cancel(name string) {
_ = rp.client.CloseConsumer(name)
}
// Send 发送ESB消息,可根据业务需求发送三类消息:
//
// NoticeMessage:通知类消息,只保证消息被成功发送到ESB中.
// SimplexMessage:单向事务消息,通过收到接收方的确认消息来完成事务.
// DuplexMessage:双向事务消息,通过接收方和发送方各分别确认来完对应的事务.
//
// @param message 消息
func (rp *rabbitProvider) Send(msg interface{}) error {
var mpl *esb.MsgPayload
switch msg.(type) {
case *esb.NoticeMessage:
mpl = esb.NoticePayload(msg.(*esb.NoticeMessage))
case *esb.SimplexMessage:
mpl = esb.SimplexPayload(msg.(*esb.SimplexMessage))
case *esb.DuplexMessage:
mpl = esb.DuplexPayload(msg.(*esb.DuplexMessage))
default:
mpl = &esb.MsgPayload{}
}
return rp.sendMessage(mpl)
}
func (rp *rabbitProvider) sendMessage(payload *esb.MsgPayload) error {
if name, err := payload.SendQueueName(); err == nil {
rp.log.DebugF("[RabbitMQ-%s]发送消息到ESB(%s):%s", rp.node, name, payload)
if p, err := rp.client.Producer(name); err == nil {
exchange, queue, route := DestroyQueueName(name)
return p.ForDirect(exchange, route, queue, payload.String())
} else {
rp.log.ErrorF("Create producer failed, %v", err)
return errors.New("Create producer failed ")
}
} else {
return err
}
}
// Close 关闭到消息中间件的连接,清除资源。
func (rp *rabbitProvider) Close() {
rp.client.Close()
}
// DestroyQueueName 使用当前客户端分解一个ESB消息的队列名称,{name}名称满足格式:sys_esb_{systemId}_{node}
// 其中{systemId}为目标系统的四位数数字ID,{node}为目标系统监听的ESB节点标示(参考{@link ESBNode}。
//
// @param name ESB消息的队列名称
// @return "sys_esb","{systemId}","{node}"
func DestroyQueueName(name string) (string, string, string) {
return "sys_esb", name[8:12], name[8:]
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/titan-kit/titan.git
git@gitee.com:titan-kit/titan.git
titan-kit
titan
titan
v0.0.4

Search