代码拉取完成,页面将自动刷新
package mq
import (
"context"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
var _ Consumer = (*pushConsumer)(nil)
type pushConsumer struct {
cfg Config
consumer rocketmq.PushConsumer
}
func NewPushConsumer(cfg Config) Consumer {
return &pushConsumer{cfg: cfg}
}
func (con *pushConsumer) Start() error {
p, e := rocketmq.NewPushConsumer(
consumer.WithGroupName(con.cfg.Client.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{con.cfg.Client.Server})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: con.cfg.Credentials.AccessKey,
SecretKey: con.cfg.Credentials.SecretKey,
SecurityToken: con.cfg.Credentials.SecretToken},
),
consumer.WithInstance(con.cfg.Client.InstanceName),
consumer.WithNameServerDomain(con.cfg.Client.Domain),
)
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init push consumer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start push consumer failed, error: %v", e).Log()
}
con.consumer = p
return nil
}
func (con *pushConsumer) Stop() {
_ = con.consumer.Shutdown()
}
func (con *pushConsumer) Subscribe(topic Topic, exp Expression, call func(msg *Message) Status) error {
err := con.consumer.Subscribe(topic.Topic, consumer.MessageSelector{
Type: consumer.ExpressionType(exp.Type),
Expression: exp.Exp,
}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, mq := range msgs {
m := Message{
MsgId: mq.MsgId,
}
switch call(&m) {
case ConFail:
return consumer.ConsumeRetryLater, nil
}
}
return consumer.ConsumeSuccess, nil
})
return err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。