代码拉取完成,页面将自动刷新
基于golang封装kafka操作,简化kafka生产消费的使用
go get -u gitee.com/Cookie_XiaoD/easykafka
生产者
package main
import (
"gitee.com/Cookie_XiaoD/easykafka"
"gitee.com/Cookie_XiaoD/easykafka/spec"
"log"
"strconv"
"time"
)
var brokers = "127.0.0.1:9092"
func main() {
producer, err := easykafka.NewProducer(
brokers,
easykafka.WithProducerErrorHandler(func(err *easykafka.AsyncProduceError) {
log.Println("异步生产消息时发生错误:", err)
}),
easykafka.WithProducerAckMode(spec.WaitLeader))
if err != nil {
log.Fatalf(err.Error())
}
defer func() {
if err = producer.Close(); err != nil {
log.Println("关闭生产者发生错误:", err)
}
}()
seq := 1
for {
size, err := producer.SyncProduce("topic_example", "key", ExampleData{
Seq: strconv.Itoa(seq),
Content: "消息" + time.Now().Format("2006-01-02 15:04:05.000"),
})
if err != nil {
log.Println("发送消息错误:", err)
} else {
log.Println("发送成功,数据序号:", seq, "数据大小:", size)
seq++
}
time.Sleep(1 * time.Second)
}
}
type ExampleData struct {
Content string `json:"content"`
Seq string `json:"seq"`
}
消费者
package main
import (
"context"
"encoding/json"
"gitee.com/Cookie_XiaoD/easykafka"
"gitee.com/Cookie_XiaoD/easykafka/spec"
"log"
"time"
)
var consumer spec.Consumer
var msgs = make(chan spec.Msg, 10000)
var brokers = "127.0.0.1:9092"
func main() {
startConsumer()
var batch []spec.Msg
//模拟每收到5条数据就进行一次处理,处理完成后批量提交
for {
msg := <-msgs
batch = append(batch, msg)
if len(batch) != 5 {
log.Println("接收到", len(batch), "条数据")
continue
}
log.Println("接收到5条数据,开始处理")
for _, v := range batch {
var data ExampleData
err := json.Unmarshal(v.Data(), &data)
if err != nil {
continue
}
log.Println("处理数据:", data)
}
log.Println("处理完成开始批量提交")
err := consumer.ConfirmBatch(batch)
if err != nil {
log.Println("批量提交失败:", err)
} else {
log.Println("批量提交成功")
}
batch = batch[0:0]
time.Sleep(1 * time.Second)
}
}
func startConsumer() {
go func() {
var err error
consumer, err = easykafka.NewConsumer(
brokers,
[]string{"topic_example"},
"group_example",
handleMsg,
easykafka.WithConsumerErrorHandler(handleErr),
easykafka.WithConsumerAOR(spec.Earliest),
easykafka.WithConsumerManualCommit(true))
if err != nil {
log.Fatalf(err.Error())
}
defer func() {
if err := consumer.Close(); err != nil {
log.Println("关闭消费者发生错误:", err)
}
}()
log.Println("开始接收数据")
consumer.StartBlock(context.Background())
}()
}
func handleMsg(msg spec.Msg) {
log.Println("接收到数据", msg.Topic(), msg.Partition(), msg.Offset())
msgs <- msg
}
func handleErr(err *easykafka.ConsumeError) {
log.Println("发生错误:", err)
}
type ExampleData struct {
Content string `json:"content"`
Seq string `json:"seq"`
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。