代码拉取完成,页面将自动刷新
package jkafka
import (
"context"
"github.com/micro/go-micro/v2/util/log"
"strconv"
//"time"
"github.com/Shopify/sarama"
"github.com/micro/go-micro/v2/broker"
//log "github.com/micro/go-micro/v2/util/log"
)
var (
DefaultBrokerConfig = sarama.NewConfig()
DefaultClusterConfig = sarama.NewConfig()
)
type brokerConfigKey struct{}
type clusterConfigKey struct{}
func BrokerConfig(c *sarama.Config) broker.Option {
return setBrokerOption(brokerConfigKey{}, c)
}
func ClusterConfig(c *sarama.Config) broker.Option {
return setBrokerOption(clusterConfigKey{}, c)
}
type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
}
type subscribeConfigKey struct{}
func SubscribeConfig(c *sarama.Config) broker.SubscribeOption {
return setSubscribeOption(subscribeConfigKey{}, c)
}
// consumerGroupHandler is the implementation of sarama.ConsumerGroupHandler
type consumerGroupHandler struct {
handler broker.Handler
subopts broker.SubscribeOptions
kopts broker.Options
cg sarama.ConsumerGroup
sess sarama.ConsumerGroupSession
}
func (*consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
/*
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Headers []*RecordHeader // only set if kafka is version 0.11+
}
*/
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
//var m broker.Message
//log.Logf(string(msg.Value), string(msg.Key))
// if err := h.kopts.Codec.Unmarshal(msg.Value, &m); err != nil {
// log.Logf("[kafka]: failed to unmarshal: %v\n", err)
// continue
// }
md := make(map[string]string)
md["Key"] = string(msg.Key)
md["Content-Type"] = "application/json"
md["TimeStamp"] = msg.Timestamp.Format("2006-01-02 15:04:05.000000000")
md["Partition"] = strconv.Itoa(int(msg.Partition))
md["Offset"] = strconv.FormatInt(msg.Offset,10)
brokerMsg := &broker.Message{
Header: md,
Body: msg.Value,
}
err := h.handler(&publication{m: brokerMsg, t: msg.Topic, km: msg, cg: h.cg, sess: sess})
//if err == nil && h.subopts.AutoAck {
if err == nil{
sess.MarkMessage(msg, "")
}else{
log.Logf("[Jkafka]: failed to h.handler\n", err)
}
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。