代码拉取完成,页面将自动刷新
package kafka
import (
"fmt"
"sync"
"sync/atomic"
"github.com/Shopify/sarama"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/publisher"
)
type client struct {
stats *outputs.Stats
hosts []string
topic outil.Selector
key *fmtstr.EventFormatString
index string
codec codec.Codec
config sarama.Config
producer sarama.AsyncProducer
wg sync.WaitGroup
}
type msgRef struct {
client *client
count int32
total int
failed []publisher.Event
batch publisher.Batch
err error
}
func newKafkaClient(
stats *outputs.Stats,
hosts []string,
index string,
key *fmtstr.EventFormatString,
topic outil.Selector,
writer codec.Codec,
cfg *sarama.Config,
) (*client, error) {
c := &client{
stats: stats,
hosts: hosts,
topic: topic,
key: key,
index: index,
codec: writer,
config: *cfg,
}
return c, nil
}
func (c *client) Connect() error {
debugf("connect: %v", c.hosts)
// try to connect
producer, err := sarama.NewAsyncProducer(c.hosts, &c.config)
if err != nil {
logp.Err("Kafka connect fails with: %v", err)
return err
}
c.producer = producer
c.wg.Add(2)
go c.successWorker(producer.Successes())
go c.errorWorker(producer.Errors())
return nil
}
func (c *client) Close() error {
debugf("closed kafka client")
c.producer.AsyncClose()
c.wg.Wait()
c.producer = nil
return nil
}
func (c *client) Publish(batch publisher.Batch) error {
events := batch.Events()
c.stats.NewBatch(len(events))
ref := &msgRef{
client: c,
count: int32(len(events)),
total: len(events),
failed: nil,
batch: batch,
}
ch := c.producer.Input()
for i := range events {
d := &events[i]
msg, err := c.getEventMessage(d)
if err != nil {
logp.Err("Dropping event: %v", err)
ref.done()
c.stats.Dropped(1)
continue
}
msg.ref = ref
msg.initProducerMessage()
ch <- &msg.msg
}
return nil
}
func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
event := &data.Content
msg := &message{partition: -1, data: *data}
if event.Meta != nil {
if value, ok := event.Meta["partition"]; ok {
if partition, ok := value.(int32); ok {
msg.partition = partition
}
}
if value, ok := event.Meta["topic"]; ok {
if topic, ok := value.(string); ok {
msg.topic = topic
}
}
}
if msg.topic == "" {
topic, err := c.topic.Select(event)
if err != nil {
return nil, fmt.Errorf("setting kafka topic failed with %v", err)
}
msg.topic = topic
if event.Meta == nil {
event.Meta = map[string]interface{}{}
}
event.Meta["topic"] = topic
}
serializedEvent, err := c.codec.Encode(c.index, event)
if err != nil {
return nil, err
}
buf := make([]byte, len(serializedEvent))
copy(buf, serializedEvent)
msg.value = buf
// message timestamps have been added to kafka with version 0.10.0.0
if c.config.Version.IsAtLeast(sarama.V0_10_0_0) {
msg.ts = event.Timestamp
}
if c.key != nil {
if key, err := c.key.RunBytes(event); err == nil {
msg.key = key
}
}
return msg, nil
}
func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
defer c.wg.Done()
defer debugf("Stop kafka ack worker")
for libMsg := range ch {
msg := libMsg.Metadata.(*message)
msg.ref.done()
}
}
func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
defer c.wg.Done()
defer debugf("Stop kafka error handler")
for errMsg := range ch {
msg := errMsg.Msg.Metadata.(*message)
msg.ref.fail(msg, errMsg.Err)
}
}
func (r *msgRef) done() {
r.dec()
}
func (r *msgRef) fail(msg *message, err error) {
switch err {
case sarama.ErrInvalidMessage:
logp.Err("Kafka (topic=%v): dropping invalid message", msg.topic)
case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize:
logp.Err("Kafka (topic=%v): dropping too large message of size %v.",
msg.topic,
len(msg.key)+len(msg.value))
default:
r.failed = append(r.failed, msg.data)
r.err = err
}
r.dec()
}
func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
if i > 0 {
return
}
debugf("finished kafka batch")
stats := r.client.stats
err := r.err
if err != nil {
failed := len(r.failed)
success := r.total - failed
r.batch.RetryEvents(r.failed)
stats.Failed(failed)
if success > 0 {
stats.Acked(success)
}
debugf("Kafka publish failed with: %v", err)
} else {
r.batch.ACK()
stats.Acked(r.total)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。