代码拉取完成,页面将自动刷新
package mq
import (
"context"
"encoding/json"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
prod "github.com/apache/rocketmq-client-go/v2/producer"
"go.uber.org/zap"
)
var _ Producer = (*producerEx)(nil)
type producerEx struct {
cfg Config
producer rocketmq.Producer
}
func NewProducer(cfg Config) Producer {
return &producerEx{cfg: cfg}
}
func (rp *producerEx) Start() error {
p, e := rocketmq.NewProducer(
prod.WithNsResolver(primitive.NewPassthroughResolver([]string{rp.cfg.Client.Server})),
prod.WithRetry(rp.cfg.ProducerConf.Retry),
prod.WithQueueSelector(prod.NewManualQueueSelector()))
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init producer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start producer failed, error: %v", e).Log()
}
rp.producer = p
return nil
}
func (rp *producerEx) Stop() {
_ = rp.producer.Shutdown()
}
func (rp *producerEx) SendJson(topic string, event interface{}) error {
body, _ := json.Marshal(event)
return rp.SendBytes(topic, body)
}
func (rp *producerEx) SendBytes(topic string, data []byte) error {
logger.L().Debug("MQ",
zap.String("topic", topic),
zap.ByteString("data", data))
ctx := context.Background()
if _, err := rp.producer.SendSync(ctx, primitive.NewMessage(topic, data)); err != nil {
return result.Errorf(result.ErrMqPublishInternal, "MQ: Publish String topic(%s) failure, err= %+v", topic, err).Log()
}
return nil
}
func (rp *producerEx) SendAsync(topic string, data []byte) error {
logger.L().Debug("MQ",
zap.String("topic", topic),
zap.ByteString("data", data))
ctx := context.Background()
if err := rp.producer.SendAsync(ctx, func(ctx context.Context, result *primitive.SendResult, err error) {
}, primitive.NewMessage(topic, data)); err != nil {
return result.Errorf(result.ErrMqPublishInternal, "MQ: Publish String topic(%s) failure, err= %+v", topic, err).Log()
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。