1 Star 0 Fork 0

liuxuezhan / go-plugins

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
kafka.go 5.13 KB
一键复制 编辑 原始数据 按行查看 历史
liuxuezhan 提交于 2021-08-18 17:35 . 'new'
// Package kafka provides a kafka broker using sarama cluster
package kafka
import (
"context"
"sync"
"github.com/Shopify/sarama"
"github.com/google/uuid"
"gitee.com/liuxuezhan/go-micro-v1.18.0/broker"
"gitee.com/liuxuezhan/go-micro-v1.18.0/codec/json"
"gitee.com/liuxuezhan/go-micro-v1.18.0/config/cmd"
"gitee.com/liuxuezhan/go-micro-v1.18.0/util/log"
)
type kBroker struct {
addrs []string
c sarama.Client
p sarama.SyncProducer
sc []sarama.Client
scMutex sync.Mutex
opts broker.Options
}
type subscriber struct {
cg sarama.ConsumerGroup
t string
opts broker.SubscribeOptions
}
type publication struct {
t string
cg sarama.ConsumerGroup
km *sarama.ConsumerMessage
m *broker.Message
sess sarama.ConsumerGroupSession
}
func init() {
cmd.DefaultBrokers["kafka"] = NewBroker
}
func (p *publication) Topic() string {
return p.t
}
func (p *publication) Message() *broker.Message {
return p.m
}
func (p *publication) Ack() error {
p.sess.MarkMessage(p.km, "")
return nil
}
func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
return s.t
}
func (s *subscriber) Unsubscribe() error {
return s.cg.Close()
}
func (k *kBroker) Address() string {
if len(k.addrs) > 0 {
return k.addrs[0]
}
return "127.0.0.1:9092"
}
func (k *kBroker) Connect() error {
if k.c != nil {
return nil
}
pconfig := k.getBrokerConfig()
// For implementation reasons, the SyncProducer requires
// `Producer.Return.Errors` and `Producer.Return.Successes`
// to be set to true in its configuration.
pconfig.Producer.Return.Successes = true
pconfig.Producer.Return.Errors = true
c, err := sarama.NewClient(k.addrs, pconfig)
if err != nil {
return err
}
k.c = c
p, err := sarama.NewSyncProducerFromClient(c)
if err != nil {
return err
}
k.p = p
k.scMutex.Lock()
defer k.scMutex.Unlock()
k.sc = make([]sarama.Client, 0)
return nil
}
func (k *kBroker) Disconnect() error {
k.scMutex.Lock()
defer k.scMutex.Unlock()
for _, client := range k.sc {
client.Close()
}
k.sc = nil
k.p.Close()
return k.c.Close()
}
func (k *kBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&k.opts)
}
var cAddrs []string
for _, addr := range k.opts.Addrs {
if len(addr) == 0 {
continue
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{"127.0.0.1:9092"}
}
k.addrs = cAddrs
return nil
}
func (k *kBroker) Options() broker.Options {
return k.opts
}
func (k *kBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := k.opts.Codec.Marshal(msg)
if err != nil {
return err
}
_, _, err = k.p.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(b),
})
return err
}
func (k *kBroker) getSaramaClusterClient(topic string) (sarama.Client, error) {
config := k.getClusterConfig()
cs, err := sarama.NewClient(k.addrs, config)
if err != nil {
return nil, err
}
k.scMutex.Lock()
defer k.scMutex.Unlock()
k.sc = append(k.sc, cs)
return cs, nil
}
func (k *kBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
opt := broker.SubscribeOptions{
AutoAck: true,
Queue: uuid.New().String(),
}
for _, o := range opts {
o(&opt)
}
// we need to create a new client per consumer
c, err := k.getSaramaClusterClient(topic)
if err != nil {
return nil, err
}
cg, err := sarama.NewConsumerGroupFromClient(opt.Queue, c)
if err != nil {
return nil, err
}
h := &consumerGroupHandler{
handler: handler,
subopts: opt,
kopts: k.opts,
cg: cg,
}
ctx := context.Background()
topics := []string{topic}
go func() {
for {
select {
case err := <-cg.Errors():
if err != nil {
log.Log("consumer error:", err)
}
default:
err := cg.Consume(ctx, topics, h)
if err != nil {
log.Log(err)
}
if err == sarama.ErrClosedConsumerGroup {
return
}
}
}
}()
return &subscriber{cg: cg, opts: opt, t: topic}, nil
}
func (k *kBroker) String() string {
return "kafka"
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// default to json codec
Codec: json.Marshaler{},
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
var cAddrs []string
for _, addr := range options.Addrs {
if len(addr) == 0 {
continue
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{"127.0.0.1:9092"}
}
return &kBroker{
addrs: cAddrs,
opts: options,
}
}
func (k *kBroker) getBrokerConfig() *sarama.Config {
if c, ok := k.opts.Context.Value(brokerConfigKey{}).(*sarama.Config); ok {
return c
}
return DefaultBrokerConfig
}
func (k *kBroker) getClusterConfig() *sarama.Config {
if c, ok := k.opts.Context.Value(clusterConfigKey{}).(*sarama.Config); ok {
return c
}
clusterConfig := DefaultClusterConfig
// the oldest supported version is V0_10_2_0
if !clusterConfig.Version.IsAtLeast(sarama.V0_10_2_0) {
clusterConfig.Version = sarama.V0_10_2_0
}
clusterConfig.Consumer.Return.Errors = true
clusterConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
return clusterConfig
}
1
https://gitee.com/liuxuezhan/go-plugins.git
git@gitee.com:liuxuezhan/go-plugins.git
liuxuezhan
go-plugins
go-plugins
db1d4b8b101e

搜索帮助