代码拉取完成,页面将自动刷新
package mq
import (
"context"
"fmt"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"time"
)
var _ Consumer = (*pullConsumer)(nil)
type pullConsumer struct {
cfg Config
stop bool
consumer rocketmq.PullConsumer
}
func NewPullConsumer(cfg Config) Consumer {
return &pullConsumer{cfg: cfg, stop: false}
}
func (con *pullConsumer) Start() error {
p, e := rocketmq.NewPullConsumer(
consumer.WithGroupName(con.cfg.Client.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{con.cfg.Client.Server})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: con.cfg.Credentials.AccessKey,
SecretKey: con.cfg.Credentials.SecretKey,
SecurityToken: con.cfg.Credentials.SecretToken},
),
consumer.WithInstance(con.cfg.Client.InstanceName),
consumer.WithNameServerDomain(con.cfg.Client.Domain),
)
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init pull consumer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start pull consumer failed, error: %v", e).Log()
}
con.consumer = p
return nil
}
func (con *pullConsumer) Stop() {
con.stop = true
_ = con.consumer.Shutdown()
}
func (con *pullConsumer) Subscribe(topic Topic, exp Expression, call func(msg *Message) Status) error {
ctx := context.Background()
queue := primitive.MessageQueue{
Topic: topic.Topic,
BrokerName: topic.BrokerName, // replace with your broker name. otherwise, pull will failed.
QueueId: topic.QueueId,
}
offset := int64(0)
for {
if con.stop {
return nil
}
resp, err := con.consumer.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("unexpectable err: %v \n", err)
return err
}
if resp.Status == primitive.PullFound {
callMsg(resp.GetMessageExts(), call)
}
offset = resp.NextBeginOffset
}
}
func callMsg(ext []*primitive.MessageExt, call func(msg *Message) Status) {
for _, msg := range ext {
m := Message{
MsgId: msg.MsgId,
}
call(&m)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。