Fetch the repository succeeded.
package kafka
import (
"context"
"gitee.com/sasukebo/go-micro/v4/broker"
log "gitee.com/sasukebo/go-micro/v4/logger"
"github.com/Shopify/sarama"
)
var (
DefaultBrokerConfig = sarama.NewConfig()
DefaultClusterConfig = sarama.NewConfig()
)
type brokerConfigKey struct{}
type clusterConfigKey struct{}
func BrokerConfig(c *sarama.Config) broker.Option {
return setBrokerOption(brokerConfigKey{}, c)
}
func ClusterConfig(c *sarama.Config) broker.Option {
return setBrokerOption(clusterConfigKey{}, c)
}
type asyncProduceErrorKey struct{}
type asyncProduceSuccessKey struct{}
func AsyncProducer(errors chan<- *sarama.ProducerError, successes chan<- *sarama.ProducerMessage) broker.Option {
// set default opt
var opt = func(options *broker.Options) {}
if successes != nil {
opt = setBrokerOption(asyncProduceSuccessKey{}, successes)
}
if errors != nil {
opt = setBrokerOption(asyncProduceErrorKey{}, errors)
}
return opt
}
type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
}
type subscribeConfigKey struct{}
func SubscribeConfig(c *sarama.Config) broker.SubscribeOption {
return setSubscribeOption(subscribeConfigKey{}, c)
}
// consumerGroupHandler is the implementation of sarama.ConsumerGroupHandler
type consumerGroupHandler struct {
handler broker.Handler
subopts broker.SubscribeOptions
kopts broker.Options
cg sarama.ConsumerGroup
sess sarama.ConsumerGroupSession
}
func (*consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
var m broker.Message
p := &publication{m: &m, t: msg.Topic, km: msg, cg: h.cg, sess: sess}
eh := h.kopts.ErrorHandler
if err := h.kopts.Codec.Unmarshal(msg.Value, &m); err != nil {
p.err = err
p.m.Body = msg.Value
if eh != nil {
eh(p)
} else {
log.Errorf("[kafka]: failed to unmarshal: %v", err)
}
continue
}
if p.m.Body == nil {
p.m.Body = msg.Value
}
// if we don't have headers, create empty map
if m.Header == nil {
m.Header = make(map[string]string)
}
for _, header := range msg.Headers {
m.Header[string(header.Key)] = string(header.Value)
}
m.Header["Micro-Topic"] = msg.Topic // only for RPC server, it somehow inspect Header for topic
if _, ok := m.Header["Content-Type"]; !ok {
m.Header["Content-Type"] = "application/json" // default to json codec
}
err := h.handler(p)
if err == nil && h.subopts.AutoAck {
sess.MarkMessage(msg, "")
} else if err != nil {
p.err = err
if eh != nil {
eh(p)
} else {
log.Errorf("[kafka]: subscriber error: %v", err)
}
}
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。