4 Star 17 Fork 7

NightTC/Gobige

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
KafkaSendMgr.go 4.57 KB
一键复制 编辑 原始数据 按行查看 历史
package kafka
import (
"context"
"gitee.com/night-tc/gobige/common"
"gitee.com/night-tc/gobige/logger"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)
type option func(*KafkaSendMgr)
/*
SetKafkaUser 设置Kafka的用户名和密码
如果用户名或密码为空,则不设置
该函数用于配置Kafka的SASL认证机制,使用PLAIN机制传输用户名和密码
*/
func SetKafkaUser(user, pwd string) option {
return func(mgr *KafkaSendMgr) {
if user == "" || pwd == "" {
return
}
mgr.Writer.Transport = &kafkago.Transport{
SASL: plain.Mechanism{
Username: user,
Password: pwd,
},
}
}
}
/*
SetKafkaScramUser 设置Kafka的SCRAM用户
如果用户名或密码为空,则不设置
该函数用于配置Kafka的SASL认证机制,使用SCRAM机制传输用户名和密码
参数algo指定SCRAM算法类型,如scram.SHA256或scram.SHA512
*/
func SetKafkaScramUser(user, pwd string, algo scram.Algorithm) option {
return func(mgr *KafkaSendMgr) {
if user == "" || pwd == "" {
return
}
saslmd, err := scram.Mechanism(algo, user, pwd)
if err != nil {
return
}
mgr.Writer.Transport = &kafkago.Transport{
SASL: saslmd,
}
}
}
/*
KafkaSendMgr 是 Kafka 消息发送管理器,负责管理 Kafka Writer、上下文、主题和地址等信息。
通过该结构体可以实现 Kafka 消息的异步发送、认证配置、启动与关闭等功能。
*/
type KafkaSendMgr struct {
Writer kafkago.Writer // Kafka Writer 实例
ctx context.Context // 用于控制 Writer 生命周期的上下文
cel context.CancelFunc // 用于取消上下文的函数
topics []string // Kafka 主题列表
addrs []string // Kafka Broker 地址列表
}
/*
NewKafkaMgr 创建并初始化一个 KafkaSendMgr 实例。
参数:
- topics: 主题列表
- addrs: Broker 地址列表
- compf: 消息发送完成回调函数(可为 nil,默认为内部 Completion)
- opts: 其他可选配置(如 SASL 认证)
返回:
- KafkaSendMgr 实例指针
*/
func NewKafkaMgr(topics []string, addrs []string, compf func(messages []kafkago.Message, err error), opts ...option) (result *KafkaSendMgr) {
result = new(KafkaSendMgr)
result.topics = topics
result.addrs = addrs
if compf == nil {
compf = result.Completion
}
result.Writer = kafkago.Writer{
Addr: kafkago.TCP(result.addrs...),
Async: true,
Completion: compf,
Logger: kafkago.LoggerFunc(result.DebugLogger),
ErrorLogger: kafkago.LoggerFunc(result.ErrorLogger),
Balancer: &kafkago.LeastBytes{},
}
for _, opt := range opts {
opt(result)
}
return
}
/*
Send 发送消息到 topics[index] 指定的主题。
参数:
- index: 主题在 topics 列表中的索引
- key: 消息 key
- value: 消息内容
*/
func (this *KafkaSendMgr) Send(index int, key string, value []byte) {
msg := kafkago.Message{
Topic: this.topics[index],
Key: common.StringToBytes(key),
Value: value,
}
err := this.Writer.WriteMessages(this.ctx, msg)
if err != nil {
this.Writer.Completion([]kafkago.Message{msg}, err)
}
}
/*
SendWithTopic 发送消息到指定的 topic。
参数:
- topic: 目标主题
- key: 消息 key
- value: 消息内容
*/
func (this *KafkaSendMgr) SendWithTopic(topic string, key string, value []byte) {
msg := kafkago.Message{
Topic: topic,
Key: common.StringToBytes(key),
Value: value,
}
err := this.Writer.WriteMessages(this.ctx, msg)
if err != nil {
this.Writer.Completion([]kafkago.Message{msg}, err)
}
}
/*
Start 启动 KafkaSendMgr,初始化上下文。
参数:
- ctx: 父级上下文
*/
func (this *KafkaSendMgr) Start(ctx context.Context) {
this.ctx, this.cel = context.WithCancel(ctx)
}
/*
Stop 安全关闭 KafkaSendMgr,关闭 Writer 并取消上下文。
*/
func (this *KafkaSendMgr) Stop() {
this.Writer.Close()
this.cel()
}
/*
Completion 默认的消息发送失败回调。
参数:
- messages: 发送失败的消息
- err: 错误信息
*/
func (this *KafkaSendMgr) Completion(messages []kafkago.Message, err error) {
if err == nil {
return
}
// 可以在此处添加自定义的错误处理逻辑
}
/*
ErrorLogger Kafka Writer 的错误日志回调。
参数:
- format: 日志格式
- param: 日志参数
*/
func (this *KafkaSendMgr) ErrorLogger(format string, param ...interface{}) {
logger.Errorf(format, param...)
}
/*
DebugLogger Kafka Writer 的调试日志回调。
参数:
- format: 日志格式
- param: 日志参数
*/
func (this *KafkaSendMgr) DebugLogger(format string, param ...interface{}) {
logger.Debugf(logger.LogKey_LogKafka, format, param...)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/night-tc/gobige.git
git@gitee.com:night-tc/gobige.git
night-tc
gobige
Gobige
2be58df1f1b9

搜索帮助