1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
push_consumer.go 1.84 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-04-24 20:33 . 除掉支付
package mq
import (
"context"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
var _ Consumer = (*pushConsumer)(nil)
type pushConsumer struct {
cfg Config
consumer rocketmq.PushConsumer
}
func NewPushConsumer(cfg Config) Consumer {
return &pushConsumer{cfg: cfg}
}
func (con *pushConsumer) Start() error {
p, e := rocketmq.NewPushConsumer(
consumer.WithGroupName(con.cfg.Client.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{con.cfg.Client.Server})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: con.cfg.Credentials.AccessKey,
SecretKey: con.cfg.Credentials.SecretKey,
SecurityToken: con.cfg.Credentials.SecretToken},
),
consumer.WithInstance(con.cfg.Client.InstanceName),
consumer.WithNameServerDomain(con.cfg.Client.Domain),
)
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init push consumer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start push consumer failed, error: %v", e).Log()
}
con.consumer = p
return nil
}
func (con *pushConsumer) Stop() {
_ = con.consumer.Shutdown()
}
func (con *pushConsumer) Subscribe(topic Topic, exp Expression, call func(msg *Message) Status) error {
err := con.consumer.Subscribe(topic.Topic, consumer.MessageSelector{
Type: consumer.ExpressionType(exp.Type),
Expression: exp.Exp,
}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, mq := range msgs {
m := Message{
MsgId: mq.MsgId,
}
switch call(&m) {
case ConFail:
return consumer.ConsumeRetryLater, nil
}
}
return consumer.ConsumeSuccess, nil
})
return err
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.13

搜索帮助