1 Star 0 Fork 0

浅言腻耳 / kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
partition_consumer.go 833 Bytes
一键复制 编辑 原始数据 按行查看 历史
huangxiao 提交于 2021-06-03 17:45 . 初始化
package kafka
import "github.com/Shopify/sarama"
type PartitionConsumer interface {
AsyncClose()
Close() error
Messages(int) ([]*ConsumerMessage, error)
HighWaterMarkOffset() int64
}
type partitionConsumer struct {
partitionConsumer sarama.PartitionConsumer
}
func (p *partitionConsumer) AsyncClose() {
p.partitionConsumer.AsyncClose()
}
func (p *partitionConsumer) Close() error {
return p.partitionConsumer.Close()
}
// Messages 获取消息
func (p partitionConsumer) Messages(num int) ([]*ConsumerMessage, error) {
var msgList []*ConsumerMessage
for msg := range p.partitionConsumer.Messages() {
msgList = append(msgList, getMessage(msg))
num--
if num <= 0 {
break
}
}
return msgList, nil
}
func (p partitionConsumer) HighWaterMarkOffset() int64 {
return p.partitionConsumer.HighWaterMarkOffset()
}
1
https://gitee.com/hnorm/kafka.git
git@gitee.com:hnorm/kafka.git
hnorm
kafka
kafka
master

搜索帮助