1 Star 0 Fork 0

浅言腻耳 / kafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
partition_consumer_test.go 1.25 KB
一键复制 编辑 原始数据 按行查看 历史
huangxiao 提交于 2021-06-03 17:45 . 初始化
package kafka
import (
"fmt"
"strconv"
"testing"
)
func Test_partitionConsumer_Messages(t *testing.T) {
var addrs = []string{"10.105.36.200:9092", "10.105.4.216:9092"}
var config = Config{
Version: "V1_1_1_0",
}
config.Producer.Return.Successes = true
client, err := NewClient(addrs, &config)
if err != nil {
t.Errorf("newClient err:%v", err)
return
}
consumer, err := client.Consumer()
if err != nil {
t.Errorf("Consumer() err:%v", err)
return
}
topic := "scrm-test-demo-1"
partitions, err := consumer.Partitions(topic)
if err != nil {
t.Errorf("Partitions() err:%v", err)
return
}
for _, p := range partitions {
pc, err := consumer.ConsumePartition(topic, p, OffsetOldest)
if err != nil {
t.Errorf("ConsumePartition() err:%v", err)
return
}
msgs, err := pc.Messages(2)
if err != nil {
t.Errorf("Messages() err:%v", err)
return
}
for _, v := range msgs {
fmt.Println("msgs========")
fmt.Println(string(v.Value), "-", strconv.FormatInt(v.Offset, 10))
}
msgs, err = pc.Messages(2)
if err != nil {
t.Errorf("Messages() err:%v", err)
return
}
for _, v := range msgs {
fmt.Println("msgs========")
fmt.Println(string(v.Value), "-", strconv.FormatInt(v.Offset, 10))
}
pc.AsyncClose()
}
}
1
https://gitee.com/hnorm/kafka.git
git@gitee.com:hnorm/kafka.git
hnorm
kafka
kafka
master

搜索帮助