Ai
1 Star 0 Fork 0

yangtxiang/mg-fw

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
kafka_producer.go 2.08 KB
一键复制 编辑 原始数据 按行查看 历史
yangtxiang 提交于 2022-08-20 09:42 +08:00 . init
package kafka_producer
import (
"github.com/Shopify/sarama"
"github.com/pkg/errors"
"strings"
"sync/atomic"
"time"
)
//var KafkaProducer sarama.SyncProducer
type KafkaProducer struct {
addressList []string
producer sarama.SyncProducer
close int32
}
func New(addressList string) *KafkaProducer {
addrs := strings.Split(addressList, ",")
result := &KafkaProducer{
addressList: append([]string{}, addrs...),
producer: nil,
close: 0,
}
atomic.StoreInt32(&result.close, 0)
return result
}
func (kp *KafkaProducer) Init() error {
mqConfig := sarama.NewConfig()
// 设置producer
// 发送完数据需要leader和follow都确认
mqConfig.Producer.RequiredAcks = sarama.WaitForAll
// Partition选择随机
mqConfig.Producer.Partitioner = sarama.NewRandomPartitioner
// 成功交付的消息将在success channel返回
mqConfig.Producer.Return.Successes = true
// 配置版本
mqConfig.Version = sarama.V0_9_0_1
kafkaClient, err := sarama.NewClient(kp.addressList, mqConfig)
if err != nil {
return err
}
// 客户端生产者
producer, err := sarama.NewSyncProducerFromClient(kafkaClient)
if err != nil {
return err
}
kp.producer = producer
atomic.StoreInt32(&kp.close, 0)
return nil
}
func (kp *KafkaProducer) Close() error {
if n := atomic.LoadInt32(&kp.close); n > 0 {
return nil
}
if kp.producer != nil {
if err := kp.producer.Close(); err != nil {
kp.producer = nil
atomic.StoreInt32(&kp.close, 1)
return errors.Wrap(err, "关闭消息生产者出错")
}
kp.producer = nil
}
atomic.StoreInt32(&kp.close, 1)
return nil
}
// SendMessage 生产消息
func (kp *KafkaProducer) SendMessage(content, topic string, partitionKey ...string) (partition int32, offset int64, err error) {
if kp.producer == nil {
return 0, 0, errors.New("未初始化")
}
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(content),
Timestamp: time.Now(),
}
if len(partitionKey) > 0 && partitionKey[0] != "" {
msg.Key = sarama.StringEncoder(partitionKey[0])
}
partition, offset, err = kp.producer.SendMessage(msg)
return
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/maglsoft/mg-fw.git
git@gitee.com:maglsoft/mg-fw.git
maglsoft
mg-fw
mg-fw
v0.0.5

搜索帮助