# kratos_kafka **Repository Path**: huoyingwhw/kratos_kafka ## Basic Information - **Project Name**: kratos_kafka - **Description**: kratos_kafka - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 3 - **Created**: 2023-02-06 - **Last Updated**: 2024-10-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 1、项目中主要使用的操作kafka等MQ的包 [❗️Go社区主流Kafka客户端简要对比](https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients/) ❗️这篇文章的结论是: ``` 本文对比了Go社区的三个主流kafka客户端包:Shopify/sarama、confluent-kafka-go和segmentio/kafka-go。 (1)sarama应用最广,也是我研究时间最长的一个包,但坑也是最多的,放弃; (2)confluent-kafka-go虽然是官方的,但是基于cgo,无奈放弃; (3)最后,我们选择了segmentio/kafka-go,已经在线上运行了一段时间,至今尚未发现重大问题。 ``` ### IMB/sarama [IBM/sarama](https://github.com/IBM/sarama) [开发者文档](https://pkg.go.dev/github.com/shopify/sarama) [Golang中如何正确的使用sarama包操作Kafka?](https://juejin.cn/post/6999263126713696293) [sarama包操作Kafka自动提交模式下丢失消息问题以及手动提交模式](https://juejin.cn/post/6999263126713696293#heading-4) ### segmentio/kafka-go [segmentio/kafka-go](https://github.com/segmentio/kafka-go) [❗️Go操作Kafka之kafka-go](https://www.liwenzhou.com/posts/Go/kafka-go/) ### kratos-transport [kratos-transport](https://github.com/tx7do/kratos-transport) 实际上,`kratos-transport`内部使用的是`segmentio/kafka-go`,然后对其做了一些封装操作 ## 2、当前实现的消息中间件demo - 使用kratos-transport包实现kafka收发消息功能 - 使用kratos-transport包实现nsq收发消息功能 - 使用kratos-transport包实现kafka与nsq手动ACK消息功能 - 简单封装segmentio/kafka-go中的Writer实现将消息根据userId的hash发送到指定分区 - `tests/kratos_nsq`以及`biz/biz/nsq.go中SendMsgToNSQ方法`使用了`延迟发送消息`的配置 - 使用[sarama](https://github.com/IBM/sarama)包在kratos项目中通过kafka实现**延迟队列** ## 3、使用kratos-transport时注意包版本问题 本项目中kratos-transport 的版本是`github.com/tx7do/kratos-transport@1.0.4`,使用相关的消息中间件的话需要手动下载指定版本的消息中间件的broker与transport 用到的各种kratos-transport中间件的包的版本如下 ```shell github.com/tx7do/kratos-transport v1.0.4 github.com/tx7do/kratos-transport/broker/kafka v0.0.0-20221116091410-c46ee7b37366 github.com/tx7do/kratos-transport/broker/nsq v1.0.1 github.com/tx7do/kratos-transport/broker/rocketmq v0.0.0-20221123013749-9f57ac1eb455 github.com/tx7do/kratos-transport/transport/kafka v0.0.0-20221116091410-c46ee7b37366 github.com/tx7do/kratos-transport/transport/nsq v0.0.0-20221209080746-42d7e4456703 ``` ### 一个比较老的kratos-transport版本 如果kratos-transport 的版本是 `github.com/tx7do/kratos-transport@v0.0.0-20220715094448-1ce1a74fa1a6`, 它里面内置了相关版本的broker与transport,不需要手动下载了。 ## 4、kratos中kafka根据用户id指定分区发送消息 ### 使用IBM/sarama [sarama_partitions](internal%2Ftests%2Fsarama_partitions)目录中有使用这个包根据用户id的hash往kafka指定分区发送消息的例子 ### 使用segmentio/kafka-go #### 我自己的实现 [segmentio_partitions](internal%2Ftests%2Fsegmentio_partitions)目录中有使用这个包根据用户id的hash往kafka指定分区发送消息的例子 我在项目中也简单封装了一下,测试的RPC接口为: `SendMsgToKafkaBySegHashPartition` #### 网上的一个issue 网上有人给`segmentio/kafka-go` 提了相关的issue:[segmentio/kafka-go#issue905](https://github.com/segmentio/kafka-go/issues/905) 并且`segmentio/kafka-go` 有一个消息发送到指定分区的测试用例: [balancer_test.go](https://github.com/segmentio/kafka-go/blob/main/balancer_test.go) ### 使用新版的的kratos-transport实现该功能❗️ 在Kratos框架中,没有官方的github.com/tx7do/kratos-transport/broker包。Kratos本身提供了github.com/go-kratos/kratos/v2/transport包用于处理微服务的传输层。 笔者给kratos-transport项目提了一个issue,该项目的作者已经把这个功能加进去了,相关issue:[kratos-transport#issue47](https://github.com/tx7do/kratos-transport/issues/47)。 但是需要注意,使用的kratos-transport的版本需要是1.0.5及以上的,我把它放在另外一个项目了:[kratos_kafka_new](https://gitee.com/huoyingwhw/kratos_kafka_new) ## 5、kafka分区扩容后不同包消费新分区数据的策略 ### 结论与一些细节说明 目前使用Golang处理kafka比较主流的包是`segmentio/kafka-go`与`IBM/sarama`,而这两个包在kafka分区自动扩容后对于新分区数据的消费的策略不同。 结论是,在默认配置与功能下,`segmentio/kafka-go`会自动消费新分区的数据;而`IBM/sarama` 采取了保守的rebalance策略,需要我们手动重启消费者消费新分区的数据,或者使用代码自己实现消费新分区数据。 **❗️但是不管用什么包,有几个点一定要注意:** - 重启消费者是否会导致消息丢失?这点很重要,我们需要在设计消费者代码时想到这一点。比较简单的实现方案是设置手动ACK,只有在业务逻辑处理完全成功的情况下或者代码有报错时可以忽略这条消息再ACK这条消息。当然自动ACK的策略在重启时是否会丢失消息依赖于第三方操作包的性能。 - 消费者代码幂等性的设计是否合理,比如我们使用sarama包想要自己实现监控新增分区并消费新分区的数据,这时涉及到将新分区的消费位点置0操作,如果代码中对新分区的判断有误,错误的将老分区的消费位点置0导致消费到了老数据,这时候消费者幂等性设计就显得尤为重要了! - 网上有一篇文章关于使用`Shopify/sarama`包自己实现消费新增分区数据的文章,它也提到了需要注意消费者`幂等性` 的设计:[golang kafka Shopify/sarama 消费者重置新增分区偏移量并进行重新消费](https://blog.csdn.net/Enjun97/article/details/129520077) ### 关于kafka的Rebalance相关的知识点 Kafka的rebalance机制是指在消费者组(Consumer Group)中,当消费者节点动态变化时,Kafka会重新分配分区(Partitions)给消费者,以实现负载均衡和高可用性。 详细讲解如下: <1>、消费者组(Consumer Group): - 消费者组是一组逻辑上关联的消费者实例,它们共同消费一个或多个主题(Topic)的分区。每个分区只能被一个消费者实例消费,但一个消费者实例可以消费多个分区。 <2>、Rebalance触发: - 当消费者组中的消费者节点发生变化,比如有新的消费者加入或现有消费者退出,或者消费者实例崩溃,都会触发rebalance。 - Rebalance的目的是重新分配分区,确保每个消费者实例负责消费合适数量的分区,以达到负载均衡的效果。 <3>、Rebalance过程: 当触发Rebalance时,Kafka会按照以下步骤进行: - (1)Coordinator选举:每个消费者组有一个Coordinator负责协调Rebalance过程。 - (2)重新分配分区:Coordinator计算出新的分区分配方案,然后通知各个消费者实例。 - (3)Consumer停止消费:在Rebalance期间,消费者会暂停消费当前分配的分区。 - (4)Consumer加入新分区:一旦新的分区分配方案确定,消费者会开始消费新分配的分区。 <4>、消费者Group的状态: - Rebalance过程中,消费者Group的状态会发生变化: - Preparing Rebalance:Coordinator正在准备Rebalance,Consumer暂停消费。 - Assigning Partitions:Coordinator分配新的分区给Consumer。 - Stable:Rebalance完成,Consumer开始消费新的分区。 <5>、Consumer Offset的管理: 在Rebalance期间,Kafka会确保Consumer Offset的一致性和正确性,避免数据重复消费或丢失。 这个机制的目的是确保消费者群体在集群的变化中保持一致的分区分配,从而实现负载均衡和高可用性。 <6>、参考资料 [Kafka rebalancing—Triggers, side-effects and and reducing ...](https://redpanda.com/guides/kafka-performance/kafka-rebalancing) [Medium - Apache Kafka Rebalance Protocol, or the magic behind ...](https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2) [Medium - Kafka Consumer Group Rebalance (1 of 2) | by Rob Golder](https://medium.com/lydtech-consulting/kafka-consumer-group-rebalance-1-of-2-7a3e00aa3bb4) [Conduktor - Consumer Incremental Rebalance & Static Group ...](https://www.conduktor.io/kafka/consumer-incremental-rebalance-and-static-group-membership/) [Stack Overflow - What does "Rebalancing" mean in Apache Kafka context?](https://stackoverflow.com/questions/30988002/what-does-rebalancing-mean-in-apache-kafka-context) [Confluent Blog - Cooperative Rebalancing in the Kafka Consumer, Streams ...](https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/) ### segmentio/kafka-go与IBM/sarama包消费新分区数据相关资料 **❓为什么我是用kafka-go这个包,当发生分区扩容的时候消费者能消费到新分区的数据,但是用sarama包分区扩容就消费不到新分区的数据呢❓ ** 这个问题涉及到两个不同的Go语言库,分别是kafka-go和sarama,在处理Kafka分区扩容时表现不同的原因可能是因为它们处理rebalance的策略不同。 kafka-go的处理方式: kafka-go库可能采用的是一种更及时的rebalance策略,当分区发生扩容时,消费者会立即加入新的分区,从而能够消费到新分区的数据。 这种方式可能会导致一些短暂的消费者停顿或重复消费,但确保了新分区的数据能够及时被消费。 sarama的处理方式: sarama库可能采用的是一种更保守的rebalance策略,它可能在rebalance期间暂停了消费者的消费,等待rebalance完成后再恢复。 这种方式下,消费者可能无法立即消费到新分区的数据,因为在rebalance期间,旧的消费者可能仍在消费旧分区,新分区的数据暂时被忽略。 这两种库在处理rebalance时的行为差异可能导致消费者在使用时的表现不同。如果希望kafka-go的行为与sarama类似,可以尝试调整kafka-go的配置,可能需要查阅kafka-go的文档或社区讨论来了解如何配置rebalance行为。 🌐 Sources [CSDN - golang kafka Shopify/sarama 消费者重置新增分区偏移量并](https://blog.csdn.net/Enjun97/article/details/129520077) [GitHub Issue - Sarama consumer groups not consuming from all topic](https://github.com/Shopify/sarama/issues/1809) [Medium - Kafka rebalancing—Triggers, side-effects and and reducing](https://redpanda.com/guides/kafka-performance/kafka-rebalancing) [Wolfogre's Blog - golang 消费kafka 的坑](https://blog.wolfogre.com/posts/golang-consume-kafka/) [CSDN - golang kafka Shopify/sarama 消费者重置新增分区偏移量并](https://blog.csdn.net/Enjun97/article/details/129520077) [阿里云 - 使用Sarama-Kafka-Go实现Kafka消费](https://www.alibabacloud.com/help/zh/sls/user-guide/use-sarama-kafka-go-to-achieve-kafka-consumption) [李学端 - Kafka(Go)教程(十一)---Consumer Group & Rebalance](https://www.lixueduan.com/posts/kafka/11-consumer-group-rebalance/) [Volcengine - 增加kafka分区会重分配](https://www.volcengine.com/theme/747573-Z-7-1) ## 6-3、使用segmentio/kafka包消费指定分区数据的例子 **使用这里往指定分区发送消息:**[segmentio_partitions](internal%2Ftests%2Fsegmentio_partitions) ### 指定消费单个分区数据 ❗️注意:Partition参数设置后就不要再设置GroupID参数了! [segmentio_consume_partition](internal%2Ftests%2Fsegmentio_consume_partition) ### 指定消费多个分区数据 [segmentio_consumer_partitions](internal%2Ftests%2Fsegmentio_consumer_partitions) ## 6-2、使用sarama包实现消费指定分区数据的例子 **使用这里往指定分区发送消息:**[segmentio_partitions](internal%2Ftests%2Fsegmentio_partitions) ### 指定消费单个分区数据 [consumer](internal%2Ftests%2Fsarama_kafka_basic%2Fconsumer) ### ❗️指定消费多个分区数据 [sarama_kafka.go](internal%2Fserver%2Fsarama_kafka.go) [segmentio_consumer_partitions](internal%2Ftests%2Fsegmentio_consumer_partitions) ## 8、Golang操作kafka遇到网络问题重试的案例 ### 消费单topic的例子 [consumer_retry_one_topic](internal%2Ftests%2Fconsumer_retry_one_topic) ### 消费多topic的例子 [consumer_retry_topics](internal%2Ftests%2Fconsumer_retry_topics) ### 相关参考资料 [Golang操作kafka遇到网络问题重试的案](https://www.cnblogs.com/paulwhw/p/18102523) [kafka消费者组与重平衡机制有了解吗?](https://blog.csdn.net/weixin_44637711/article/details/123703896) [kafka重平衡机制--一个消费组内存在任一消费者异动](https://blog.csdn.net/ShangHai0123/article/details/135277490) ## 7、在kratos项目中使用kafka实现延迟队列 ### 关于nsq与rocketmq实现延迟队列介绍 nsq自带延迟队列,[biz/nsq.go](internal%2Fbiz%2Fbiz_nsq.go)中的代码演示了如何使用kratos-transport这个包实现nsq的延迟队列。 rocketmq是阿里开源的一个消息中间件,它也带延迟队列功能,可以看一下这篇文章的介绍: [rocketmq实现延迟队列(精确到秒级)](https://www.cnblogs.com/tomj2ee/p/15772688.html) ### 使用IBM/sarama包操作kafka实现延迟队列功能 主要参考下面这个文章实现的: [Go+Kafka实现延迟消息](https://juejin.cn/post/7057584094766432270) ,项目中的代码也主要参考了这个项目[jiaxwu/dq](https://github.com/jiaxwu/dq/tree/main),笔者在这个项目的基础上加了一些额外的配置与功能。 `SendMsgToKafkaBySaramaPartition`可以通过这个接口测试消息根据userId发送到指定分区,另外也可以测试**延迟队列**效果。 ## 8、在kratos项目中使用Redis实现延迟队列 ### ❓待补充... 参考这里: [https://juejin.cn/post/7058699128284381221](https://juejin.cn/post/7058699128284381221) ,参考项目地址: [jiaxwu/dq](https://github.com/jiaxwu/dq/tree/main)