1 Star 3 Fork 3

tym_hmm/kafka-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
AGPL-3.0

kafka golang

基于 sarama 封装, 目的在于使用简洁及优化 运行模式为 消费组的方式

支持功能

  1. 客户端连接池,可复用连接
  2. 通过build可构建多个消息主题发送及主题消费
  3. 默认使用消费者组模式

使用

1. 消费者

通过build 可同时构建多个需要的业务消费,指定不同的参数传入

  • demo

var (
 addr    = "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092"
 topic   = "web_log"
 groupId = "testGroupId"
)

//使用阻塞模式
var consumerFactory = kafka_go.NewFactoryConsumer()
//使用非阻塞模式
// var consumerFactory = kafka_go.NewFactoryConsumerBackGround()

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")
//注册消费回调监听
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	fmt.Printf("%+v\n", context.GetMessageString())
})

//注册消费构建(多个以参数区分)
consumerFactory.RegisterConsumer(demoConsumerBuilder)

//运行消费者
consumerFactory.Run()
  • build 接口说明 BuildConsumer

使用方式

//消费构建
demoConsumerBuilder := kafka_go.NewBuildConsumer(addr, groupId, topic)
//设置kafka版本
demoConsumerBuilder.SetKafkaVersion("3.0.0")

接口使用前注意事项

  • 消费者可创建为一消费者多协程分区消费和一消费者一协程消费
  • 一消费者多协程分区消费不保存消息可靠性,重启后存在丢失消息的情况,但吞吐量较高,且接收消息后无法调用消息确认
  • 一消费一协程保证消息可靠,但吞吐量较底
  • 此组件针对消费组进行处理,需传入消费入groupdId

案例设置:(主要针对BuildConsumer设置)

//一消费多协程

//消费构建
demo2ConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId2, Conf.Topic2)
demo2ConsumerBuilder.SetDebug(true)
demo2ConsumerBuilder.SetMultiplePartition(true) //开启多协程消费 默认为false
demo2ConsumerBuilder.SetIsAutoCommit(false)//开启多协程消费,此配置失效
demo2ConsumerBuilder.SetKafkaVersion("3.0.0")
demo2ConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
		nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
		fmt.Printf("%s xx received2 [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
		context.GetSession().Ack()
})
//一消费一协程
demoConsumerBuilder := kafka_go.NewBuildConsumer(Conf.Addr, Conf.GroupId3, Conf.Topic3)
demoConsumerBuilder.SetDebug(true)
demoConsumerBuilder.SetIsAutoCommit(false)// 开启手动提交
demoConsumerBuilder.SetKafkaVersion("3.0.0")
demoConsumerBuilder.SetResponseListener(func(context *kafka_go.ConsumerMessageContext) {
	nowTime := time.Now().In(cstZone).Format(UTFALL_SECOND)
	fmt.Printf("%s xx received [partition:%d, topic:%s, content:%s]\n", nowTime, context.GetPartition(), context.GetTopic(), context.GetMessageString())
	//fmt.Printf("%+v\n", context.GetMessageString())
	context.GetSession().Ack()
})
接口名称 描述
SetKafkaVersion(kafkaVersion string) 设置kakfa版本号 底层会自动处理新版本接口兼容
SetIsAutoCommit(autoCommit bool) 是否自动提交 默认为true, 当为false是需执行手动提交
通过 回调上下文session处理 ConsumerMessageContext.getSession().Ack()进行提交
SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi 是否多分区指定消费(一个消费者跟根据分区数创建子协程),此模式存在重启后消息丢失
SetBalanceType(balanceType BalanceType) BuildConsumerApi 设置分区策略类型 CONSUMER
BALANCE_STRATEGY_ROUNDROBIN:轮询(默认)
CONSUMER_BALANCE_STRATEGY_STICKY:亲和性
CONSUMER_BALANCE_STRATEGY_RANGE: 随机
SetResponseListener(responseResult ConsumerResponseListener) 拉取消息事件监听
GetAddr() string 获取broker地址
GetGroupId() string 获取消费组
GetTopic() string 获取主题
GetBalanceType() BalanceType 获取分区策略方式
ToString() string 返回构建数据字符串
  • build 拉取消息事件监听回调 说明 ConsumerResponseListener

type ConsumerResponseListener func(context *ConsumerMessageContext)

ConsumerMessageContext : 事件消息上下文

接口名称 说明
GetBuilder() BuildConsumerApi 返回当前执行回调中的build信息
GetGroupId() string 返回当前组
GetTopic() string 返回当前主题
GetPartition() int32 返回当前执行数据的分区
GetOffset() int64 返回当前数据的偏移数
GetMessage() []byte 获取消息主体 字节数组
GetMessageString() string 返回消息主体 字符串
GetTimeStamp() time.Time 返回消息时间
GetVal() *sarama.ConsumerMessage 返回原生消息内容
GetSession() *ConsumerSession 返回当前执行的session
  • ConsumerSession 接口说明

接口名称 说明
Ack() 手动确认消息
IsAutoAck() bool 是否自动提交
GetSession() sarama.ConsumerGroupSession 原生session获取
GetMessage() *sarama.ConsumerMessage 原生消息获取

1. 生产者

通过build 可同时构建多个需要的业务生产者,指定不同的参数传入,默认生使用的是同步提交
底层维护一套连接池,根据设置最大连接数进行设置

  • demo

var (
    productFactory = kafka_go.NewFactoryProduct()
)
demoBuild := kafka_go.NewBuildProduct("demo1", "192.168.186.130:9092,192.168.186.201:9092,192.168.186.202:9092").SetDebug(true).SetMaxConnection(4)
err := productFactory.Register(demoBuild).Connect()
if err != nil {
    log.Panicln(err)
}
num := 10
var wg = &sync.WaitGroup{}
for i := 0; i < num; i++ {
	wg.Add(1)
	go func() {
		defer wg.Done()
		partition, offset, err := productFactory.Push("demo1", Conf.Topic3, "生产者消息")
		if err != nil {
			fmt.Println(err)
		} else {
			fmt.Printf("发送成功  partition:%d, offset:%d", partition, offset)
		}
	}()
		//time.Sleep(time.Second)
}
wg.Wait()

  • build 接口说明 BuildProduct

使用方式

接口 说明
SetMaxConnection(maxConnection int32) BuildProductApi 设置连接池数量
SetAckType(ackType ProductAckType) BuildProductApi 设置消息确认方式
SetTransactional(isTransactional bool) BuildProductApi 是否开启事务提交
GetName() string 获取当前build name
GetAddr() string 获取当前服务地址
GetAddrSlice() []string 获取当前服务地址切片
GetMaxConnection() int32 获取当前大连接数
GetConnStrategy() ProductBalanceType 获取连接池加载类型, 默认为轮询
  • ProductAckType 消息确认类型说明

确认类型 说明
PRODUCT_ACK_TYPE_NONAL 不等待 broker 的 ack,这一操作提供了一个最低的延迟
PRODUCT_ACK_TYPE_FOLLOWER 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack
PRODUCT_ACK_TYPE_ALL 等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复, 开启事务后可避免数据重复

空文件

简介

golang kafka 组件 生产者连接池,基于sarama, 目的在于使用简易上手, 快速调优 展开 收起
README
AGPL-3.0
取消

发行版 (10)

全部
2年前

贡献者 (1)

全部

语言

近期动态

2年前推送了新的 v1.0.29 标签
2年前推送了新的提交到 dev 分支,5b593e7...e23d351
2年前推送了新的 v1.0.29 分支
2年前推送了新的提交到 master 分支,5b593e7...e23d351
2年前推送了新的 v1.0.28 标签
加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tym_hmm/kafka-go.git
git@gitee.com:tym_hmm/kafka-go.git
tym_hmm
kafka-go
kafka-go
master

搜索帮助