代码拉取完成,页面将自动刷新
package consumer
import (
"github.com/Shopify/sarama"
log "github.com/alecthomas/log4go"
"github.com/bsm/sarama-cluster"
"os"
"os/signal"
)
type FinderKafkaConsumer struct {
Consumer *cluster.Consumer
brokers []string
groupId string
signals chan os.Signal
Config *cluster.Config
}
var kafkaConfig *cluster.Config
func init() {
kafkaConfig = cluster.NewConfig()
kafkaConfig.Consumer.Return.Errors = true
kafkaConfig.Group.Return.Notifications = true
kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
kafkaConfig.Consumer.Offsets.AutoCommit.Enable = false // 手动提交
}
func GetConfig() *cluster.Config {
return kafkaConfig
}
func NewFinderKafkaConsumer(brokers []string,groupId string,topics []string) (*FinderKafkaConsumer,error) {
kafka:=&FinderKafkaConsumer{brokers:brokers}
kafka.groupId=groupId
// init consumer
consumer, err := cluster.NewConsumer(brokers, groupId, topics, kafkaConfig)
if err != nil {
return kafka,err
}
kafka.Consumer=consumer
return kafka,nil
}
//启动消费
func (this *FinderKafkaConsumer) Consume(errorFunc func(error),notificationsFunc func(*cluster.Notification),onMessage func(*sarama.ConsumerMessage)) {
// trap SIGINT to trigger a shutdown
this.signals = make(chan os.Signal, 1)
signal.Notify(this.signals, os.Interrupt)
if errorFunc!=nil{
// consume errors
go func() {
for err := range this.Consumer.Errors() {
errorFunc(err)
}
}()
}
if notificationsFunc!=nil{
// consume notifications
go func() {
for note := range this.Consumer.Notifications() {
notificationsFunc(note)
}
}()
}
// consume messages, watch signals
var successes int
Loop:
for {
select {
case msg, ok := <-this.Consumer.Messages():
if ok {
log.Debug("%s:%s/%d/%d\t%s\t%s\n", this.groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
onMessage(msg)
this.Consumer.MarkOffset(msg, "") // mark message as processed
successes++
}
case <-this.signals:
break Loop
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。