代码拉取完成,页面将自动刷新
package main
import (
"context"
"fmt"
"gitee.com/ifinder/finder-kafka-go/kafkax/consumer"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"os"
"strings"
)
func main() {
D()
}
//===========consumer group 方案
type exampleConsumerGroupHandler struct{}
func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
// 使用 consumerGroup
func E() {
config:=sarama.NewConfig()
config.Version = sarama.V2_0_0_0 // specify appropriate version
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable=true
group,err:=sarama.NewConsumerGroup([]string{"mq.cps.cvod.net:9092"},"group-default",config)
if err!=nil{
panic(err)
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR", err)
}
}()
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{"msg_order_search_update"}
handler := exampleConsumerGroupHandler{}
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
//=============
func D() {
config:=consumer.GetConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
cs,_:=consumer.NewFinderKafkaConsumer([]string{"mq.cps.cvod.net:9092"},"test-consumer-group",[]string{"msg_order_search_update"})
cs.Consume(func(err error) {
fmt.Println(err)
}, func(notification *cluster.Notification) {
fmt.Printf("%+v",notification)
}, func(message *sarama.ConsumerMessage) {
fmt.Println(message)
})
}
func C() {
config:=consumer.GetConfig()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
cs,_:=consumer.NewFinderKafkaConsumer([]string{"mq.cps.cvod.net:9092"},"test-consumer-group",[]string{"msg_order_search_update"})
go func() {
for err := range cs.Consumer.Errors() {
fmt.Println("Error: ", err.Error())
}
}()
go func() {
for note := range cs.Consumer.Notifications() {
fmt.Println(note)
}
}()
//1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发.
//2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.
//3) exactly once: 消息只会发送一次.
// 首选 2: 所以需要保证 业务处理成功,并且 offset 成功写入到 redis
for msg := range cs.Consumer.Messages() { //这种方式也能接收到以前得offset消息
fmt.Println(msg)
}
}
func B() {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest //OffsetOldest 可以消费 OffsetNewest 不可以消费
cs,err:=cluster.NewConsumer([]string{"mq.cps.cvod.net:9092"},"group-01",[]string{"msg_order_search_update"},config)
if err!=nil{
panic(err)
}
for{
select {
case n := <-cs.Notifications():
fmt.Println(n)
case err := <-cs.Errors():
fmt.Println(err)
case m:=<-cs.Messages():
fmt.Printf("msg:%+v",m)
}
}
}
func A() {
groupID := "group-1"
topicList := "topic_aaa"
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest //初始从最新的offset开始
c, err := cluster.NewConsumer(strings.Split("mq.cps.cvod.net:9092", ","), groupID, strings.Split(topicList, ","), config)
if err != nil {
fmt.Println(err)
return
}
defer c.Close()
go func() {
for err := range c.Errors() {
fmt.Println(err.Error())
}
}()
go func() {
for note := range c.Notifications() {
fmt.Printf("Rebalanced: %+v\n", note)
}
}()
for msg := range c.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Value)
c.MarkOffset(msg, "") //MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。