1 Star 0 Fork 0

TGodfather/go-plugins

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
options.go 2.77 KB
一键复制 编辑 原始数据 按行查看 历史
tanzhuokang 提交于 2020-08-16 10:55 +08:00 . first commit
package jkafka
import (
"context"
"github.com/micro/go-micro/v2/util/log"
"strconv"
//"time"
"github.com/Shopify/sarama"
"github.com/micro/go-micro/v2/broker"
//log "github.com/micro/go-micro/v2/util/log"
)
var (
DefaultBrokerConfig = sarama.NewConfig()
DefaultClusterConfig = sarama.NewConfig()
)
type brokerConfigKey struct{}
type clusterConfigKey struct{}
func BrokerConfig(c *sarama.Config) broker.Option {
return setBrokerOption(brokerConfigKey{}, c)
}
func ClusterConfig(c *sarama.Config) broker.Option {
return setBrokerOption(clusterConfigKey{}, c)
}
type subscribeContextKey struct{}
// SubscribeContext set the context for broker.SubscribeOption
func SubscribeContext(ctx context.Context) broker.SubscribeOption {
return setSubscribeOption(subscribeContextKey{}, ctx)
}
type subscribeConfigKey struct{}
func SubscribeConfig(c *sarama.Config) broker.SubscribeOption {
return setSubscribeOption(subscribeConfigKey{}, c)
}
// consumerGroupHandler is the implementation of sarama.ConsumerGroupHandler
type consumerGroupHandler struct {
handler broker.Handler
subopts broker.SubscribeOptions
kopts broker.Options
cg sarama.ConsumerGroup
sess sarama.ConsumerGroupSession
}
func (*consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
/*
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Headers []*RecordHeader // only set if kafka is version 0.11+
}
*/
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
//var m broker.Message
//log.Logf(string(msg.Value), string(msg.Key))
// if err := h.kopts.Codec.Unmarshal(msg.Value, &m); err != nil {
// log.Logf("[kafka]: failed to unmarshal: %v\n", err)
// continue
// }
md := make(map[string]string)
md["Key"] = string(msg.Key)
md["Content-Type"] = "application/json"
md["TimeStamp"] = msg.Timestamp.Format("2006-01-02 15:04:05.000000000")
md["Partition"] = strconv.Itoa(int(msg.Partition))
md["Offset"] = strconv.FormatInt(msg.Offset,10)
brokerMsg := &broker.Message{
Header: md,
Body: msg.Value,
}
err := h.handler(&publication{m: brokerMsg, t: msg.Topic, km: msg, cg: h.cg, sess: sess})
//if err == nil && h.subopts.AutoAck {
if err == nil{
sess.MarkMessage(msg, "")
}else{
log.Logf("[Jkafka]: failed to h.handler\n", err)
}
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tgodfather/go-plugins.git
git@gitee.com:tgodfather/go-plugins.git
tgodfather
go-plugins
go-plugins
efc0651a8291

搜索帮助