代码拉取完成,页面将自动刷新
package kafka
import (
"context"
"gitee.com/night-tc/gobige/common"
"gitee.com/night-tc/gobige/logger"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)
type option func(*KafkaSendMgr)
/*
SetKafkaUser 设置Kafka的用户名和密码
如果用户名或密码为空,则不设置
该函数用于配置Kafka的SASL认证机制,使用PLAIN机制传输用户名和密码
*/
func SetKafkaUser(user, pwd string) option {
return func(mgr *KafkaSendMgr) {
if user == "" || pwd == "" {
return
}
mgr.Writer.Transport = &kafkago.Transport{
SASL: plain.Mechanism{
Username: user,
Password: pwd,
},
}
}
}
/*
SetKafkaScramUser 设置Kafka的SCRAM用户
如果用户名或密码为空,则不设置
该函数用于配置Kafka的SASL认证机制,使用SCRAM机制传输用户名和密码
参数algo指定SCRAM算法类型,如scram.SHA256或scram.SHA512
*/
func SetKafkaScramUser(user, pwd string, algo scram.Algorithm) option {
return func(mgr *KafkaSendMgr) {
if user == "" || pwd == "" {
return
}
saslmd, err := scram.Mechanism(algo, user, pwd)
if err != nil {
return
}
mgr.Writer.Transport = &kafkago.Transport{
SASL: saslmd,
}
}
}
/*
KafkaSendMgr 是 Kafka 消息发送管理器,负责管理 Kafka Writer、上下文、主题和地址等信息。
通过该结构体可以实现 Kafka 消息的异步发送、认证配置、启动与关闭等功能。
*/
type KafkaSendMgr struct {
Writer kafkago.Writer // Kafka Writer 实例
ctx context.Context // 用于控制 Writer 生命周期的上下文
cel context.CancelFunc // 用于取消上下文的函数
topics []string // Kafka 主题列表
addrs []string // Kafka Broker 地址列表
}
/*
NewKafkaMgr 创建并初始化一个 KafkaSendMgr 实例。
参数:
- topics: 主题列表
- addrs: Broker 地址列表
- compf: 消息发送完成回调函数(可为 nil,默认为内部 Completion)
- opts: 其他可选配置(如 SASL 认证)
返回:
- KafkaSendMgr 实例指针
*/
func NewKafkaMgr(topics []string, addrs []string, compf func(messages []kafkago.Message, err error), opts ...option) (result *KafkaSendMgr) {
result = new(KafkaSendMgr)
result.topics = topics
result.addrs = addrs
if compf == nil {
compf = result.Completion
}
result.Writer = kafkago.Writer{
Addr: kafkago.TCP(result.addrs...),
Async: true,
Completion: compf,
Logger: kafkago.LoggerFunc(result.DebugLogger),
ErrorLogger: kafkago.LoggerFunc(result.ErrorLogger),
Balancer: &kafkago.LeastBytes{},
}
for _, opt := range opts {
opt(result)
}
return
}
/*
Send 发送消息到 topics[index] 指定的主题。
参数:
- index: 主题在 topics 列表中的索引
- key: 消息 key
- value: 消息内容
*/
func (this *KafkaSendMgr) Send(index int, key string, value []byte) {
msg := kafkago.Message{
Topic: this.topics[index],
Key: common.StringToBytes(key),
Value: value,
}
err := this.Writer.WriteMessages(this.ctx, msg)
if err != nil {
this.Writer.Completion([]kafkago.Message{msg}, err)
}
}
/*
SendWithTopic 发送消息到指定的 topic。
参数:
- topic: 目标主题
- key: 消息 key
- value: 消息内容
*/
func (this *KafkaSendMgr) SendWithTopic(topic string, key string, value []byte) {
msg := kafkago.Message{
Topic: topic,
Key: common.StringToBytes(key),
Value: value,
}
err := this.Writer.WriteMessages(this.ctx, msg)
if err != nil {
this.Writer.Completion([]kafkago.Message{msg}, err)
}
}
/*
Start 启动 KafkaSendMgr,初始化上下文。
参数:
- ctx: 父级上下文
*/
func (this *KafkaSendMgr) Start(ctx context.Context) {
this.ctx, this.cel = context.WithCancel(ctx)
}
/*
Stop 安全关闭 KafkaSendMgr,关闭 Writer 并取消上下文。
*/
func (this *KafkaSendMgr) Stop() {
this.Writer.Close()
this.cel()
}
/*
Completion 默认的消息发送失败回调。
参数:
- messages: 发送失败的消息
- err: 错误信息
*/
func (this *KafkaSendMgr) Completion(messages []kafkago.Message, err error) {
if err == nil {
return
}
// 可以在此处添加自定义的错误处理逻辑
}
/*
ErrorLogger Kafka Writer 的错误日志回调。
参数:
- format: 日志格式
- param: 日志参数
*/
func (this *KafkaSendMgr) ErrorLogger(format string, param ...interface{}) {
logger.Errorf(format, param...)
}
/*
DebugLogger Kafka Writer 的调试日志回调。
参数:
- format: 日志格式
- param: 日志参数
*/
func (this *KafkaSendMgr) DebugLogger(format string, param ...interface{}) {
logger.Debugf(logger.LogKey_LogKafka, format, param...)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。