2 Star 0 Fork 0

403716045/gcore

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
NatsMq.go 5.38 KB
一键复制 编辑 原始数据 按行查看 历史
xingang 提交于 2023-02-17 18:37 +08:00 . 重构
package natsHelper
import (
"encoding/json"
"errors"
"fmt"
"gitee.com/lv_baobao/gcore"
"gitee.com/lv_baobao/gcore/logHelper"
"time"
)
import "github.com/nats-io/nats.go"
func init() {
}
type NatsMq struct {
consumer *NatsConsumer
producer *NatsProducer
}
func NewNatsMq() *NatsMq {
return &NatsMq{
consumer: new(NatsConsumer),
producer: new(NatsProducer),
}
}
func (n *NatsMq) Subscribe(subject string, cb nats.MsgHandler) {
n.consumer.Subscribe(subject, cb)
}
func (n *NatsMq) QueueSubscribe(subject string, queue string, cb nats.MsgHandler) {
n.consumer.QueueSubscribe(subject, queue, cb)
}
func (n *NatsMq) Reply(subject string, fun func() interface{}) {
n.consumer.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
res := fun()
n.producer.Publish(msg.Reply, res)
})
}
func (n *NatsMq) ReplyWithPara(subject string, fun func(m *nats.Msg) interface{}) {
n.consumer.QueueSubscribe(subject, subject, func(msg *nats.Msg) {
defer func() {
if r := recover(); r != nil {
n.producer.Publish(msg.Reply, r)
}
}()
res := fun(msg)
n.producer.Publish(msg.Reply, res)
})
}
func (n *NatsMq) Publish(subject string, content interface{}) error {
return n.producer.Publish(subject, content)
}
func (n *NatsMq) Request(subject string, content interface{}, timeout time.Duration, v any) error {
request, err := n.producer.Request(subject, content, timeout)
if err != nil {
return err
}
return json.Unmarshal(request.Data, v)
}
type NatsSubscribeInfo struct {
subject string
queue string
handler nats.MsgHandler
}
type NatsConsumer struct {
Conn *nats.Conn
subs map[string]NatsSubscribeInfo
}
func (n *NatsConsumer) tryConnect() error {
if n.Conn != nil {
if n.Conn.IsClosed() {
logHelper.Info("Nats释放上次连接对象")
n.Conn.Close()
n.Conn = nil
} else if n.Conn.IsConnected() {
} else if n.Conn.IsDraining() {
} else if n.Conn.IsReconnecting() {
}
}
config := gcore.NewAppSettingsHelper().GetAppConfig()
logHelper.Info(fmt.Sprintf("Nats尝试连接服务器%s", config.NatsUrl))
opts := nats.Options{AllowReconnect: false, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second} //订阅不重连,自己实现消费者重连
opts.Url = config.NatsUrl
opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
logHelper.Info("Nats断开", err)
time.Sleep(time.Second * 3)
for _, subscribeInfo := range n.subs {
if subscribeInfo.queue == "" {
n.Subscribe(subscribeInfo.subject, subscribeInfo.handler)
} else {
n.QueueSubscribe(subscribeInfo.subject, subscribeInfo.queue, subscribeInfo.handler)
}
}
}
conn, err := opts.Connect()
if err == nil {
logHelper.Info("连接Nats服务器成功")
} else {
logHelper.Error(err)
}
n.Conn = conn
return err
}
func (n *NatsConsumer) Subscribe(subject string, cb nats.MsgHandler) {
if n.subs == nil {
n.subs = make(map[string]NatsSubscribeInfo, 0)
}
_, existsSub := n.subs[subject]
if !existsSub {
n.subs[subject] = NatsSubscribeInfo{subject: subject, handler: cb}
}
n.tryConnect()
if n.Conn == nil {
logHelper.Error("订阅失败,主题:" + subject)
time.Sleep(time.Second * 3)
n.Subscribe(subject, cb)
return
}
_, err := n.Conn.Subscribe(subject, cb)
if err != nil {
logHelper.Error("关注主题", subject, err)
time.Sleep(time.Second * 3)
n.Subscribe(subject, cb)
}
}
func (n *NatsConsumer) QueueSubscribe(subject string, queue string, cb nats.MsgHandler) {
if n.subs == nil {
n.subs = make(map[string]NatsSubscribeInfo, 0)
}
_, existsSub := n.subs[subject]
if !existsSub {
n.subs[subject] = NatsSubscribeInfo{subject: subject, queue: queue, handler: cb}
}
n.tryConnect()
if n.Conn == nil {
logHelper.Error("订阅失败,主题:" + subject)
time.Sleep(time.Second * 3)
n.Subscribe(subject, cb)
return
}
_, err := n.Conn.QueueSubscribe(subject, queue, cb)
if err != nil {
logHelper.Error("关注主题", subject, err)
time.Sleep(time.Second * 3)
n.QueueSubscribe(subject, queue, cb)
}
}
type NatsProducer struct {
Conn *nats.Conn
}
func (n *NatsProducer) tryConnect() error {
if n.Conn != nil {
logHelper.Info("Nats释放上次连接对象")
n.Conn.Close()
n.Conn = nil
}
config := gcore.NewAppSettingsHelper().GetAppConfig()
logHelper.Info("Nats尝试连接服务器")
opts := nats.Options{AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second}
opts.Url = config.NatsUrl
opts.DisconnectedErrCB = func(conn *nats.Conn, err error) {
logHelper.Info("Nats断开", err)
}
conn, err := opts.Connect()
//c, _ := natsHelper.NewEncodedConn(nc, natsHelper.JSON_ENCODER)
//defer conn.Close()
n.Conn = conn
return err
}
func (n *NatsProducer) Publish(subject string, content interface{}) error {
if n.Conn == nil || n.Conn.IsClosed() {
if n.tryConnect() != nil {
return errors.New("连接nats失败")
}
}
if n.Conn != nil && n.Conn.IsReconnecting() {
return errors.New("重连nats")
}
bytes, _ := json.Marshal(content)
err := n.Conn.Publish(subject, bytes)
return err
}
func (n *NatsProducer) Request(subject string, content interface{}, timeout time.Duration) (*nats.Msg, error) {
if n.Conn == nil || n.Conn.IsClosed() {
if n.tryConnect() != nil {
return nil, errors.New("连接nats失败")
}
}
if n.Conn != nil && n.Conn.IsReconnecting() {
return nil, errors.New("重连nats")
}
bytes, _ := json.Marshal(content)
return n.Conn.Request(subject, bytes, timeout)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/lv_baobao/gcore.git
git@gitee.com:lv_baobao/gcore.git
lv_baobao
gcore
gcore
3bf2efb9b087

搜索帮助