Read the docs
import "git.oschina.net/cloudzone/smartgo/stgclient/process"
process.NewDefaultMQProducer("producerGroupId")
SetNamesrvAddr(namesrvAddr)
Start()
方法CreateTopic(stgcommon.DEFAULT_TOPIC, "topicName", 8)
stgcommon.DEFAULT_TOPIC
为密钥需先import "git.oschina.net/cloudzone/smartgo/stgcommon"
topicName
为创建topic的名称8
为读写的队列数1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的Send方法Send(message.NewMessage("topicName", "tagName", []byte("msgbody")))
import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
(*SendResult, error)
SEND_OK
发送成功并同步到SLAVE成功FLUSH_DISK_TIMEOUT
刷盘超时FLUSH_SLAVE_TIMEOUT
同步到SLAVE超时SLAVE_NOT_AVAILABLE SLAVE
不可用1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的SendCallBack方法SendCallBack(message.NewMessage("topicName", "tagName", []byte("msgbody")), func(sendResult *process.SendResult, err error) {})
import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
func(sendResult *process.SendResult, err error)
SEND_OK
发送成功并同步到SLAVE成功FLUSH_DISK_TIMEOUT
刷盘超时FLUSH_SLAVE_TIMEOUT
同步到SLAVE超时SLAVE_NOT_AVAILABLE SLAVE
不可用1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
4、建立链接调用Start()
方法
5、调用实例的SendOneWay方法SendOneWay(message.NewMessage("topicName", "tagName", []byte("msgbody")))
import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
error
import "git.oschina.net/cloudzone/smartgo/stgclient/process"
process.NewDefaultMQPushConsumer("consumerGroupId")
SetConsumeFromWhere(heartbeat.CONSUME_FROM_LAST_OFFSET)
import "git.oschina.net/cloudzone/smartgo/stgcommon/protocol/heartbeat"
CONSUME_FROM_LAST_OFFSET
一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费。CONSUME_FROM_FIRST_OFFSET
一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。CONSUME_FROM_TIMESTAMP
一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,时间点设置参见DefaultMQPushConsumer.ConsumeTimestamp
参数。SetMessageModel(heartbeat.CLUSTERING)
CLUSTERING
集群消费。BROADCASTING
广播消费。SetNamesrvAddr(namesrvAddr)
Subscribe("topicName", "tagName")
RegisterMessageListener(&MessageListenerImpl{})
MessageListenerImpl
实现MessageListenerConcurrently
的接口MessageListenerImpl
实现MessageListenerOrderly
的接口Start()
方法import "git.oschina.net/cloudzone/smartgo/stgclient/process"
process.NewDefaultMQPullConsumer("consumerGroupId")
SetNamesrvAddr(namesrvAddr)
Start()
方法FetchSubscribeMessageQueues("topicName")
方法,拿到topic的所有队列Pull(mq, "tagA", 0, 32)
方法
mq
为队列结构体tagA
为tag的标签名称0
为该队列offset,需自行维护32
为一次拉取数量。此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。