1 Star 0 Fork 0

天雨流芳/go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
producer.go 1.91 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-07-26 16:14 . 增加kafka调用
package kafka
import (
"context"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
"github.com/segmentio/kafka-go"
"time"
)
type Producer struct {
logger log.LoggerHelper
Addr string
Topic string
producer *kafka.Writer
}
func NewKafkaProducerClient(addr, topic string) *Producer {
producerClient := &Producer{
logger: log.NewNameLogWithDefaultLog("kafka-producer"),
Addr: addr,
Topic: topic,
}
producerClient.producer = &kafka.Writer{
Addr: kafka.TCP(addr), //TCP函数参数为不定长参数,可以传多个地址组成集群
Topic: topic,
Balancer: &kafka.Hash{}, // 用于对key进行hash,决定消息发送到哪个分区
MaxAttempts: 0,
WriteBackoffMin: 0,
WriteBackoffMax: 0,
BatchSize: 0,
BatchBytes: 0,
BatchTimeout: 0,
ReadTimeout: 0,
WriteTimeout: time.Second, // kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景
RequiredAcks: kafka.RequireNone, // 不需要任何节点确认就返回
Async: false,
Completion: nil,
Compression: 0,
Logger: nil,
ErrorLogger: nil,
Transport: nil,
AllowAutoTopicCreation: false, // 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用
}
return producerClient
}
func (p *Producer) SendMessage(ctx context.Context, message string) error {
msg := kafka.Message{
Topic: "",
Partition: 0,
Offset: 0,
HighWaterMark: 0,
Value: []byte(message),
Headers: nil,
WriterData: nil,
Time: time.Time{},
}
err := p.producer.WriteMessages(ctx, msg)
if err != nil {
p.logger.ErrorF("kafka send message error[%s]", err.Error())
}
return err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
157c77bd6e0e

搜索帮助

0d507c66 1850385 C8b1a773 1850385