1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
producer_ex.go 2.07 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-09-02 18:23 . log
package mq
import (
"context"
"encoding/json"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
prod "github.com/apache/rocketmq-client-go/v2/producer"
"go.uber.org/zap"
)
var _ Producer = (*producerEx)(nil)
type producerEx struct {
cfg Config
producer rocketmq.Producer
}
func NewProducer(cfg Config) Producer {
return &producerEx{cfg: cfg}
}
func (rp *producerEx) Start() error {
p, e := rocketmq.NewProducer(
prod.WithNsResolver(primitive.NewPassthroughResolver([]string{rp.cfg.Client.Server})),
prod.WithRetry(rp.cfg.ProducerConf.Retry),
prod.WithQueueSelector(prod.NewManualQueueSelector()))
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init producer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start producer failed, error: %v", e).Log()
}
rp.producer = p
return nil
}
func (rp *producerEx) Stop() {
_ = rp.producer.Shutdown()
}
func (rp *producerEx) SendJson(topic string, event interface{}) error {
body, _ := json.Marshal(event)
return rp.SendBytes(topic, body)
}
func (rp *producerEx) SendBytes(topic string, data []byte) error {
logger.L().Debug("MQ",
zap.String("topic", topic),
zap.ByteString("data", data))
ctx := context.Background()
if _, err := rp.producer.SendSync(ctx, primitive.NewMessage(topic, data)); err != nil {
return result.Errorf(result.ErrMqPublishInternal, "MQ: Publish String topic(%s) failure, err= %+v", topic, err).Log()
}
return nil
}
func (rp *producerEx) SendAsync(topic string, data []byte) error {
logger.L().Debug("MQ",
zap.String("topic", topic),
zap.ByteString("data", data))
ctx := context.Background()
if err := rp.producer.SendAsync(ctx, func(ctx context.Context, result *primitive.SendResult, err error) {
}, primitive.NewMessage(topic, data)); err != nil {
return result.Errorf(result.ErrMqPublishInternal, "MQ: Publish String topic(%s) failure, err= %+v", topic, err).Log()
}
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.21.17

搜索帮助