1 Star 1 Fork 0

titan-kit / titan

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
consumer.go 3.01 KB
一键复制 编辑 原始数据 按行查看 历史
package kafka
import (
"context"
"fmt"
"gitee.com/titan-kit/titan/log"
"github.com/Shopify/sarama"
)
// newConsumer 按提供的地址初始化一个消费者
func newConsumer(log *log.Slf4g, backends []string) consumer {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_1_0_0
return consumer{log: log, config: config, brokers: backends}
}
// consumer kafka消费者的默认实现,该实现类可以支持启动多个消费者,利用多线程优势来并行处理kafka的消息,从而提高处理效率。
type consumer struct {
log *log.Slf4g
config *sarama.Config
cfg map[string]string
brokers []string
callback Reader
started, closed bool
consumer sarama.ConsumerGroup
}
/**
* 以非阻塞的方式启动消费者线程,每个线程对应一个独立的消费者对象,所有消费者都归属于同一个消费组中,当读取到指定队列中的消息后会调用指定的callback进行处理。
*
* @param topic 队列名称
* @param groupId 消费者所属的消费组标示
* @param threads 消费者线程数
* @param callback 消费回调处理器
*/
func (c *consumer) start(topic, groupId string, errors func(error), callback Reader) {
c.log.InfoF("启动Kafka消费者线程[Topic=%s,groupId=%s]...", topic, groupId)
if c.started {
return
}
var err error
c.consumer, err = sarama.NewConsumerGroup(c.brokers, groupId, c.config)
if err != nil {
c.log.ErrorF(fmt.Sprintf("启动Kafka消费者线程失败:%+v\n", err))
}
// 消费者错误
go func() {
for err := range c.consumer.Errors() {
errors(err)
}
}()
go func() {
for {
err := c.consumer.Consume(context.Background(), []string{topic}, consumerGroupHandler{c.log, topic, callback})
if err != nil {
panic(err)
}
}
}()
c.started = true
}
// close 关闭所有消费者并清理资源。
func (c *consumer) close() {
c.closed = true
if err := c.consumer.Close(); err != nil {
c.log.ErrorF("关闭所有消费者并清理资源出错:%+v\n", err)
}
c.started = false
}
type consumerGroupHandler struct {
log *log.Slf4g
topic string
callback Reader
}
func (h consumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
h.log.InfoF("启动Kafka消费者线程:[%s]", sess.MemberID())
offsetReader := h.callback.GetOffset(h.topic)
for _, p := range offsetReader {
if p.Offset > 0 {
h.log.InfoF("Kafka重置[%s]分区读取偏移位置:partition=%d,offset=%d", sess.MemberID(), p.Partition, p.Offset)
sess.ResetOffset(h.topic, p.Partition, p.Offset, "")
}
}
return nil
}
func (h consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if err := h.callback.OnRead(msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value); nil == err {
sess.MarkMessage(msg, "") // 手动确认消息
}
}
return nil
}
1
https://gitee.com/titan-kit/titan.git
git@gitee.com:titan-kit/titan.git
titan-kit
titan
titan
v0.0.4

搜索帮助