代码拉取完成,页面将自动刷新
package kafka_producer
import (
"github.com/Shopify/sarama"
"github.com/pkg/errors"
"strings"
"sync/atomic"
"time"
)
//var KafkaProducer sarama.SyncProducer
type KafkaProducer struct {
addressList []string
producer sarama.SyncProducer
close int32
}
func New(addressList string) *KafkaProducer {
addrs := strings.Split(addressList, ",")
result := &KafkaProducer{
addressList: append([]string{}, addrs...),
producer: nil,
close: 0,
}
atomic.StoreInt32(&result.close, 0)
return result
}
func (kp *KafkaProducer) Init() error {
mqConfig := sarama.NewConfig()
// 设置producer
// 发送完数据需要leader和follow都确认
mqConfig.Producer.RequiredAcks = sarama.WaitForAll
// Partition选择随机
mqConfig.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的消息将在success channel返回
mqConfig.Producer.Return.Successes = true
// 配置版本
mqConfig.Version = sarama.V0_9_0_1
kafkaClient, err := sarama.NewClient(kp.addressList, mqConfig)
if err != nil {
return err
}
// 客户端生产者
producer, err := sarama.NewSyncProducerFromClient(kafkaClient)
if err != nil {
return err
}
kp.producer = producer
atomic.StoreInt32(&kp.close, 0)
return nil
}
func (kp *KafkaProducer) Close() error {
if n := atomic.LoadInt32(&kp.close); n > 0 {
return nil
}
if kp.producer != nil {
if err := kp.producer.Close(); err != nil {
kp.producer = nil
atomic.StoreInt32(&kp.close, 1)
return errors.Wrap(err, "关闭消息生产者出错")
}
kp.producer = nil
}
atomic.StoreInt32(&kp.close, 1)
return nil
}
// SendMessage 生产消息
func (kp *KafkaProducer) SendMessage(content, topic string, partitionKey ...string) (partition int32, offset int64, err error) {
if kp.producer == nil {
return 0, 0, errors.New("未初始化")
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(content),
Timestamp: time.Now(),
}
if len(partitionKey) > 0 && partitionKey[0] != "" {
msg.Key = sarama.StringEncoder(partitionKey[0])
}
partition, offset, err = kp.producer.SendMessage(msg)
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。