1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
pull_consumer.go 2.26 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2023-04-24 20:33 . 除掉支付
package mq
import (
"context"
"fmt"
"gitee.com/h79/goutils/common/result"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"time"
)
var _ Consumer = (*pullConsumer)(nil)
type pullConsumer struct {
cfg Config
stop bool
consumer rocketmq.PullConsumer
}
func NewPullConsumer(cfg Config) Consumer {
return &pullConsumer{cfg: cfg, stop: false}
}
func (con *pullConsumer) Start() error {
p, e := rocketmq.NewPullConsumer(
consumer.WithGroupName(con.cfg.Client.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{con.cfg.Client.Server})),
consumer.WithCredentials(primitive.Credentials{
AccessKey: con.cfg.Credentials.AccessKey,
SecretKey: con.cfg.Credentials.SecretKey,
SecurityToken: con.cfg.Credentials.SecretToken},
),
consumer.WithInstance(con.cfg.Client.InstanceName),
consumer.WithNameServerDomain(con.cfg.Client.Domain),
)
if e != nil {
return result.Errorf(result.ErrMqInitInternal, "[MQ] Init pull consumer failed, error: %v", e).Log()
}
if e := p.Start(); e != nil {
return result.Errorf(result.ErrMqStartInternal, "[MQ] Start pull consumer failed, error: %v", e).Log()
}
con.consumer = p
return nil
}
func (con *pullConsumer) Stop() {
con.stop = true
_ = con.consumer.Shutdown()
}
func (con *pullConsumer) Subscribe(topic Topic, exp Expression, call func(msg *Message) Status) error {
ctx := context.Background()
queue := primitive.MessageQueue{
Topic: topic.Topic,
BrokerName: topic.BrokerName, // replace with your broker name. otherwise, pull will failed.
QueueId: topic.QueueId,
}
offset := int64(0)
for {
if con.stop {
return nil
}
resp, err := con.consumer.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("unexpectable err: %v \n", err)
return err
}
if resp.Status == primitive.PullFound {
callMsg(resp.GetMessageExts(), call)
}
offset = resp.NextBeginOffset
}
}
func callMsg(ext []*primitive.MessageExt, call func(msg *Message) Status) {
for _, msg := range ext {
m := Message{
MsgId: msg.MsgId,
}
call(&m)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.0

搜索帮助

A270a887 8829481 3d7a4017 8829481