1 Star 0 Fork 0

micro-tools / wf

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
gnats_pubsub.go 2.88 KB
一键复制 编辑 原始数据 按行查看 历史
545403892 提交于 2023-09-27 22:16 . 升级go-ole
package gnats
import (
"errors"
"fmt"
"sync"
broker "github.com/nats-io/nats.go"
"gitee.com/micro-tools/wf/extend/utils"
"gitee.com/micro-tools/wf/os/glog"
)
var (
errAlreadySubscribed = errors.New("已订阅主题")
errNotSubscribed = errors.New("未订阅")
errEmptyTopic = errors.New("empty topic")
)
var _ utils.PubSub = (*pubsub)(nil)
// PubSub wraps utils Publisher exposing
// Close() method for NATS connection.
type PubSub interface {
utils.PubSub
Close()
}
type pubsub struct {
conn *broker.Conn
logger *glog.Logger
mu sync.Mutex
queue string
subscriptions map[string]*broker.Subscription
}
// NewPubSub returns NATS message publisher/subscriber.
// Parameter queue specifies the queue for the Subscribe method.
// If queue is specified (is not an empty string), Subscribe method
// will execute NATS QueueSubscribe which is conceptually different
// from ordinary subscribe. For more information, please take a look
// here: https://docs.nats.io/developing-with-nats/receiving/queues.
// If the queue is empty, Subscribe will be used.
func NewPubSub(url, queue string, logger *glog.Logger) (PubSub, error) {
conn, err := broker.Connect(url)
if err != nil {
return nil, err
}
ret := &pubsub{
conn: conn,
queue: queue,
logger: logger,
subscriptions: make(map[string]*broker.Subscription),
}
return ret, nil
}
func (ps *pubsub) Publish(topic, subtopic string, msg []byte) error {
subject := topic
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
if err := ps.conn.Publish(subject, msg); err != nil {
return err
}
return nil
}
func (ps *pubsub) Subscribe(topic string, handler utils.MessageHandler) error {
if topic == "" {
return errEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.subscriptions[topic]; ok {
return errAlreadySubscribed
}
nh := ps.natsHandler(handler)
if ps.queue != "" {
sub, err := ps.conn.QueueSubscribe(topic, ps.queue, nh)
if err != nil {
return err
}
ps.subscriptions[topic] = sub
return nil
}
sub, err := ps.conn.Subscribe(topic, nh)
if err != nil {
return err
}
ps.subscriptions[topic] = sub
return nil
}
func (ps *pubsub) Unsubscribe(topic string) error {
if topic == "" {
return errEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
sub, ok := ps.subscriptions[topic]
if !ok {
return errNotSubscribed
}
if err := sub.Unsubscribe(); err != nil {
return err
}
delete(ps.subscriptions, topic)
return nil
}
func (ps *pubsub) Close() {
ps.conn.Close()
}
func (ps *pubsub) natsHandler(h utils.MessageHandler) broker.MsgHandler {
return func(m *broker.Msg) {
if err := h(m.Data); err != nil {
ps.logger.Warning(fmt.Sprintf("failed to handle message: %s", err))
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/micro-tools/wf.git
git@gitee.com:micro-tools/wf.git
micro-tools
wf
wf
v1.0.2

搜索帮助