代码拉取完成,页面将自动刷新
基于 sarama 封装, 目的在于使用简洁及优化 运行模式为 消费组的方式
通过build 可同时构建多个需要的业务消费,指定不同的参数传入
var (
addr = "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092"
topic = "web_log"
groupId = "testGroupId"
)
//使用阻塞模式
var consumerFactory = kafka_go.NewFactoryConsumer()
//使用非阻塞模式
// var consumerFactory = kafka_go.NewFactoryConsumerBackGround()
//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
//注册消费回调监听
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
fmt.Printf("%+v\n", context.GetMessageString())
})
//注册消费构建(多个以参数区分)
consumerFactory.RegisterConsumer(demoConsumerBuilder)
//运行消费者
consumerFactory.Run()
使用方式
//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
接口使用前注意事项
- 消费者可创建为一消费者多协程分区消费和一消费者一协程消费
- 一消费者多协程分区消费不保存消息可靠性,重启后存在丢失消息的情况,但吞吐量较高,且接收消息后无法调用消息确认
- 一消费一协程保证消息可靠,但吞吐量较底
- 此组件针对消费组进行处理,需传入消费入
groupdId
案例设置:(主要针对
BuildConsumer
设置)
//一消费多协程
//消费构建
demo2ConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId2, Conf.Topic2)
demo2ConsumerBuilder.SetDebug(true)
demo2ConsumerBuilder.SetMultiplePartition(true) //开启多协程消费 默认为false
demo2ConsumerBuilder.SetIsAutoCommit(false)//开启多协程消费,此配置失效
demo2ConsumerBuilder.SetKafkaVersion("3.0.0")
demo2ConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
fmt.Printf("%s xx received2 [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
context.GetSession().Ack()
})
//一消费一协程
demoConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId3, Conf.Topic3)
demoConsumerBuilder.SetDebug(true)
demoConsumerBuilder.SetIsAutoCommit(false)// 开启手动提交
demoConsumerBuilder.SetKafkaVersion("3.0.0")
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
fmt.Printf("%s xx received [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
//fmt.Printf("%+v\n", context.GetMessageString())
context.GetSession().Ack()
})
接口名称 | 描述 |
---|---|
SetKafkaVersion(kafkaVersion string) | 设置kakfa版本号 底层会自动处理新版本接口兼容 |
SetIsAutoCommit(autoCommit bool) | 是否自动提交 默认为true , 当为false是需执行手动提交 通过 回调上下文session处理 ConsumerMessageContext.getSession().Ack() 进行提交 |
SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi | 是否多分区指定消费(一个消费者跟根据分区数创建子协程),此模式存在重启后消息丢失 |
SetBalanceType(balanceType BalanceType) BuildConsumerApi | 设置分区策略类型 CONSUMER BALANCE_STRATEGY_ROUNDROBIN:轮询(默认) CONSUMER_BALANCE_STRATEGY_STICKY:亲和性 CONSUMER_BALANCE_STRATEGY_RANGE: 随机 |
SetResponseListener(responseResult ConsumerResponseListener) | 拉取消息事件监听 |
GetAddr() string | 获取broker地址 |
GetGroupId() string | 获取消费组 |
GetTopic() string | 获取主题 |
GetBalanceType() BalanceType | 获取分区策略方式 |
ToString() string | 返回构建数据字符串 |
type ConsumerResponseListener func(context *ConsumerMessageContext)
ConsumerMessageContext : 事件消息上下文
接口名称 | 说明 |
---|---|
GetBuilder() BuildConsumerApi | 返回当前执行回调中的build信息 |
GetGroupId() string | 返回当前组 |
GetTopic() string | 返回当前主题 |
GetPartition() int32 | 返回当前执行数据的分区 |
GetOffset() int64 | 返回当前数据的偏移数 |
GetMessage() []byte | 获取消息主体 字节数组 |
GetMessageString() string | 返回消息主体 字符串 |
GetTimeStamp() time.Time | 返回消息时间 |
GetVal() *sarama.ConsumerMessage | 返回原生消息内容 |
GetSession() *ConsumerSession | 返回当前执行的session |
接口名称 | 说明 |
---|---|
Ack() | 手动确认消息 |
IsAutoAck() bool | 是否自动提交 |
GetSession() sarama.ConsumerGroupSession | 原生session获取 |
GetMessage() *sarama.ConsumerMessage | 原生消息获取 |
通过build 可同时构建多个需要的业务生产者,指定不同的参数传入,默认生使用的是同步提交
底层维护一套连接池,根据设置最大连接数进行设置
var (
productFactory = kafka_go.NewFactoryProduct()
)
demoBuild := kafka_go.NewBuildProduct("demo1", "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092").SetDebug(true).SetMaxConnection(4)
err := productFactory.Register(demoBuild).Connect()
if err != nil {
log.Panicln(err)
}
num := 10
var wg = &sync.WaitGroup{}
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer wg.Done()
partition, offset, err := productFactory.Push("demo1", Conf.Topic3, "生产者消息")
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("发送成功 partition:%d, offset:%d", partition, offset)
}
}()
//time.Sleep(time.Second)
}
wg.Wait()
使用方式
接口 | 说明 |
---|---|
SetMaxConnection(maxConnection int32) BuildProductApi | 设置连接池数量 |
SetAckType(ackType ProductAckType) BuildProductApi | 设置消息确认方式 |
SetTransactional(isTransactional bool) BuildProductApi | 是否开启事务提交 |
GetName() string | 获取当前build name |
GetAddr() string | 获取当前服务地址 |
GetAddrSlice() []string | 获取当前服务地址切片 |
GetMaxConnection() int32 | 获取当前大连接数 |
GetConnStrategy() ProductBalanceType | 获取连接池加载类型, 默认为轮询 |
确认类型 | 说明 |
---|---|
PRODUCT_ACK_TYPE_NONAL | 不等待 broker 的 ack,这一操作提供了一个最低的延迟 |
PRODUCT_ACK_TYPE_FOLLOWER | 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack |
PRODUCT_ACK_TYPE_ALL | 等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复, 开启事务后可避免数据重复 |
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。