1 Star 0 Fork 0

finder / finder-kafka-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
finder-kafka-consumer.go 1.97 KB
一键复制 编辑 原始数据 按行查看 历史
package consumer
import (
"github.com/Shopify/sarama"
log "github.com/alecthomas/log4go"
"github.com/bsm/sarama-cluster"
"os"
"os/signal"
)
type FinderKafkaConsumer struct {
Consumer *cluster.Consumer
brokers []string
groupId string
signals chan os.Signal
Config *cluster.Config
}
var kafkaConfig *cluster.Config
func init() {
kafkaConfig = cluster.NewConfig()
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Group.Return.Notifications = true
kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false // 手动提交
}
func GetConfig() *cluster.Config {
return kafkaConfig
}
func NewFinderKafkaConsumer(brokers []string,groupId string,topics []string) (*FinderKafkaConsumer,error) {
kafka:=&FinderKafkaConsumer{brokers:brokers}
kafka.groupId=groupId
// init consumer
consumer, err := cluster.NewConsumer(brokers, groupId, topics, kafkaConfig)
if err != nil {
return kafka,err
}
kafka.Consumer=consumer
return kafka,nil
}
//启动消费
func (this *FinderKafkaConsumer) Consume(errorFunc func(error),notificationsFunc func(*cluster.Notification),onMessage func(*sarama.ConsumerMessage)) {
// trap SIGINT to trigger a shutdown
this.signals = make(chan os.Signal, 1)
signal.Notify(this.signals, os.Interrupt)
if errorFunc!=nil{
// consume errors
go func() {
for err := range this.Consumer.Errors() {
errorFunc(err)
}
}()
}
if notificationsFunc!=nil{
// consume notifications
go func() {
for note := range this.Consumer.Notifications() {
notificationsFunc(note)
}
}()
}
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-this.Consumer.Messages():
if ok {
log.Debug("%s:%s/%d/%d\t%s\t%s\n", this.groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
onMessage(msg)
this.Consumer.MarkOffset(msg, "") // mark message as processed
successes++
}
case <-this.signals:
break Loop
}
}
}
Go
1
https://gitee.com/ifinder/finder-kafka-go.git
git@gitee.com:ifinder/finder-kafka-go.git
ifinder
finder-kafka-go
finder-kafka-go
v1.3.0

搜索帮助