代码拉取完成,页面将自动刷新
package kafka
import (
"context"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
"github.com/segmentio/kafka-go"
"time"
)
type Producer struct {
logger log.LoggerHelper
Addr string
Topic string
producer *kafka.Writer
}
func NewKafkaProducerClient(addr, topic string) *Producer {
producerClient := &Producer{
logger: log.NewNameLogWithDefaultLog("kafka-producer"),
Addr: addr,
Topic: topic,
}
producerClient.producer = &kafka.Writer{
Addr: kafka.TCP(addr), //TCP函数参数为不定长参数,可以传多个地址组成集群
Topic: topic,
Balancer: &kafka.Hash{}, // 用于对key进行hash,决定消息发送到哪个分区
MaxAttempts: 0,
WriteBackoffMin: 0,
WriteBackoffMax: 0,
BatchSize: 0,
BatchBytes: 0,
BatchTimeout: 0,
ReadTimeout: 0,
WriteTimeout: time.Second, // kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景
RequiredAcks: kafka.RequireNone, // 不需要任何节点确认就返回
Async: false,
Completion: nil,
Compression: 0,
Logger: nil,
ErrorLogger: nil,
Transport: nil,
AllowAutoTopicCreation: false, // 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用
}
return producerClient
}
func (p *Producer) SendMessage(ctx context.Context, message string) error {
msg := kafka.Message{
Topic: "",
Partition: 0,
Offset: 0,
HighWaterMark: 0,
Value: []byte(message),
Headers: nil,
WriterData: nil,
Time: time.Time{},
}
err := p.producer.WriteMessages(ctx, msg)
if err != nil {
p.logger.ErrorF("kafka send message error[%s]", err.Error())
}
return err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。