4 Star 19 Fork 32

少林码僧/pkg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test.go 2.43 KB
一键复制 编辑 原始数据 按行查看 历史
webphp 提交于 2022-07-03 15:10 +08:00 . kafka test
package main
import (
"encoding/json"
"fmt"
"gitee.com/phper95/pkg/logger"
"gitee.com/phper95/pkg/mq"
"github.com/Shopify/sarama"
"go.uber.org/zap"
"os"
"os/signal"
"syscall"
"time"
)
var (
hosts = []string{"127.0.0.1:9092"}
topic = "test"
)
func main() {
produceAsyncMsg()
//produceSyncMsg()
consumeMsg()
}
type Msg struct {
ID int64 `json:"id"`
Name string `json:"name"`
CreateAt int64 `json:"create_at"`
}
func produceAsyncMsg() {
err := mq.InitAsyncKafkaProducer(mq.DefaultKafkaAsyncProducer, hosts, nil)
if err != nil {
fmt.Println("InitAsyncKafkaProducer error", err)
}
msg := Msg{
ID: 1,
Name: "test name async",
CreateAt: time.Now().Unix(),
}
msgBody, _ := json.Marshal(msg)
err = mq.GetKafkaAsyncProducer(mq.DefaultKafkaAsyncProducer).Send(&sarama.ProducerMessage{Topic: topic, Value: mq.KafkaMsgValueEncoder(msgBody)})
if err != nil {
fmt.Println("Send msg error", err)
} else {
fmt.Println("Send msg success")
}
//异步提交需要等待
time.Sleep(3 * time.Second)
}
func produceSyncMsg() {
err := mq.InitSyncKafkaProducer(mq.DefaultKafkaSyncProducer, hosts, nil)
if err != nil {
fmt.Println("InitSyncKafkaProducer error", err)
}
msg := Msg{
ID: 2,
Name: "test name sync",
CreateAt: time.Now().Unix(),
}
msgBody, _ := json.Marshal(msg)
partion, offset, err := mq.GetKafkaSyncProducer(mq.DefaultKafkaSyncProducer).Send(&sarama.ProducerMessage{Topic: topic, Value: mq.KafkaMsgValueEncoder(msgBody)})
if err != nil {
fmt.Println("Send msg error", err)
} else {
fmt.Println("Send msg success partion ", partion, "offset", offset)
}
}
func consumeMsg() {
_, err := mq.StartKafkaConsumer(hosts, []string{topic}, "test-group", nil, msgHandler)
if err != nil {
fmt.Println(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case s := <-signals:
mq.KafkaStdLogger.Println("kafka test receive system signal:", s)
return
}
}
func msgHandler(message *sarama.ConsumerMessage) (bool, error) {
fmt.Println("消费消息:", "topic:", message.Topic, "Partition:", message.Partition, "Offset:", message.Offset, "value:", string(message.Value))
msg := Msg{}
err := json.Unmarshal(message.Value, &msg)
if err != nil {
//解析不了的消息怎么处理?
logger.Error("Unmarshal error", zap.Error(err))
return true, nil
}
fmt.Println("msg : ", msg)
return true, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/phper95/pkg.git
git@gitee.com:phper95/pkg.git
phper95
pkg
pkg
38d933c4cdfa

搜索帮助