64 Star 199 Fork 82

cloudzone/smartgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

stgclient

Read the docs

创建topic

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")
  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 4、建立链接调用Start()方法
  • 5、调用创建topic的方法CreateTopic(stgcommon.DEFAULT_TOPIC, "topicName", 8)
    • stgcommon.DEFAULT_TOPIC 为密钥需先import "git.oschina.net/cloudzone/smartgo/stgcommon"
    • topicName 为创建topic的名称
    • 8为读写的队列数

发送同步消息

  • 请求

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的Send方法Send(message.NewMessage("topicName", "tagName", []byte("msgbody")))

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应

    • Send方法返回值为(*SendResult, error)
    • SendResult中SendStatus值说明
      • SEND_OK 发送成功并同步到SLAVE成功
      • FLUSH_DISK_TIMEOUT 刷盘超时
      • FLUSH_SLAVE_TIMEOUT 同步到SLAVE超时
      • SLAVE_NOT_AVAILABLE SLAVE不可用

发送异步消息

  • 请求

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的SendCallBack方法SendCallBack(message.NewMessage("topicName", "tagName", []byte("msgbody")), func(sendResult *process.SendResult, err error) {})

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应

    • SendCallBack回调函数func(sendResult *process.SendResult, err error)
    • SendResult中SendStatus值说明
      • SEND_OK 发送成功并同步到SLAVE成功
      • FLUSH_DISK_TIMEOUT 刷盘超时
      • FLUSH_SLAVE_TIMEOUT 同步到SLAVE超时
      • SLAVE_NOT_AVAILABLE SLAVE不可用

发送OneWay消息

  • 请求

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"

  • 2、创建发送实例process.NewDefaultMQProducer("producerGroupId")

  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)

  • 4、建立链接调用Start()方法

  • 5、调用实例的SendOneWay方法SendOneWay(message.NewMessage("topicName", "tagName", []byte("msgbody")))

    • import "git.oschina.net/cloudzone/smartgo/stgcommon/message"
    • 创建Message的参数topicName 为创建topic的名称,tagName为标签名称,msgBody为消息内容。
  • 响应

    • SendOneWay有错误返回error

Push消费

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建消费实例process.NewDefaultMQPushConsumer("consumerGroupId")
  • 3、设置实例消费位置SetConsumeFromWhere(heartbeat.CONSUME_FROM_LAST_OFFSET)
    • import "git.oschina.net/cloudzone/smartgo/stgcommon/protocol/heartbeat"
    • CONSUME_FROM_LAST_OFFSET 一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费。
    • CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。
    • CONSUME_FROM_TIMESTAMP 一个新的订阅组第一次启动从指定时间点开始消费,后续再启动接着上次消费的进度开始消费,时间点设置参见DefaultMQPushConsumer.ConsumeTimestamp参数。
  • 4、设置消费模式SetMessageModel(heartbeat.CLUSTERING)
    • CLUSTERING 集群消费。
    • BROADCASTING 广播消费。
  • 5、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 6、设置订阅topic和tagSubscribe("topicName", "tagName")
  • 6、设置监听器RegisterMessageListener(&MessageListenerImpl{})
    • 普通消息需MessageListenerImpl实现MessageListenerConcurrently的接口
    • 顺序消息需MessageListenerImpl实现MessageListenerOrderly的接口
  • 7、建立链接调用Start()方法

Pull消费

  • 1、import "git.oschina.net/cloudzone/smartgo/stgclient/process"
  • 2、创建消费实例process.NewDefaultMQPullConsumer("consumerGroupId")
  • 3、设置stgregistry地址SetNamesrvAddr(namesrvAddr)
  • 4、建立链接调用Start()方法
  • 5、调用实例的FetchSubscribeMessageQueues("topicName")方法,拿到topic的所有队列
  • 6、调用实例的Pull(mq, "tagA", 0, 32)方法
    • mq为队列结构体
    • tagA为tag的标签名称
    • 0为该队列offset,需自行维护
    • 32为一次拉取数量。
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/cloudzone/smartgo.git
git@gitee.com:cloudzone/smartgo.git
cloudzone
smartgo
smartgo
dev

搜索帮助