1 Star 0 Fork 0

evan / newtbig

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mq_nats_stream.go 4.40 KB
一键复制 编辑 原始数据 按行查看 历史
evan 提交于 2024-05-09 01:57 . v0.2.0
// Copyright 2020 newtbig Author. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mq
import (
"context"
"fmt"
"math"
"time"
log "gitee.com/lakertt/newtbig/logging"
"gitee.com/lakertt/newtbig/msg/framepb"
"gitee.com/lakertt/newtbig/utils"
uuid "github.com/satori/go.uuid"
"google.golang.org/protobuf/proto"
"github.com/nats-io/nats.go"
)
func (nc *NatsClient) AddStream(stream string, subjects []string) *nats.StreamInfo {
nctx := nats.Context(context.Background())
info, errSI := nc.nats_js.StreamInfo(stream)
if errSI != nil {
log.Logger.Warnf(fmt.Sprintf("Nats StreamInfo err:%s ", errSI.Error()))
}
if nil == info {
_, errAS := nc.nats_js.AddStream(&nats.StreamConfig{
Name: stream,
Subjects: subjects,
Retention: nats.WorkQueuePolicy,
Replicas: 1,
Discard: nats.DiscardOld,
Duplicates: 30 * time.Second,
}, nctx)
if errAS != nil {
log.Logger.Error(fmt.Sprintf("Nats AddStream err:%s ", errAS.Error()))
return nil
}
}
return info
}
func (nc *NatsClient) InitStreamWithSubjects() {
stream := nc.AddStream(nc.opts.AppName, nc.streamSubs)
if stream == nil {
log.Error("InitStreamWithSubjects AddStream err ! ")
return
}
nctx := nats.Context(context.Background())
tctx, cancel := context.WithTimeout(nctx, 240000*time.Hour)
deadlineCtx := nats.Context(tctx)
results := make(chan int64)
var totalTime int64
var totalMessages int64
log.Logger.Infof("InitStreamWithSubjects sub:%s ", nc.key)
utils.SafeGO(func() {
for !nc.isClose {
select {
case msg := <-nc.msg_send_chan:
now := time.Now().UnixMicro()
ack, err := nc.nats_js.Publish(msg.Subj, msg.Data)
results <- time.Now().UnixMicro() - now
if ack != nil {
log.Logger.Info("InitStreamWithSubjects ack:", ack.Domain, ack.Duplicate, ack.Sequence, ack.Stream)
}
if err != nil {
log.Logger.Error("InitStreamWithSubjects Publish err:", err.Error())
}
}
}
})
utils.SafeGO(func() {
for !nc.isClose {
sub, err := nc.nats_js.PullSubscribe(nc.pullKey, "group")
if err != nil {
log.Logger.Error("InitStreamWithSubjects PullSubscribe err:", err.Error())
return
}
for {
msgs, _ := sub.Fetch(1, deadlineCtx)
m := msgs[0]
m.Ack(nats.Context(nctx))
if m.Data == nil {
log.Logger.Errorf("InitStreamWithSubjects sub:%s data is nil", m.Subject)
continue
}
err := nc.streamHandler(m.Data)
if err != nil {
log.Logger.Error("InitStreamWithSubjects handler err:", err.Error())
continue
}
}
}
})
for {
select {
case <-deadlineCtx.Done():
cancel()
log.Logger.Infof("sent %d messages with average time of %f", totalMessages, math.Round(float64(totalTime/totalMessages)))
nc.nats_js.DeleteStream(nc.opts.AppName)
return
case usec := <-results:
totalTime += usec
totalMessages++
}
}
}
func (nc *NatsClient) SendMsg(subj string, msg *framepb.Msg) error {
defer func() {
utils.Put(msg)
}()
data, err1 := proto.Marshal(msg)
if err1 != nil {
return err1
}
nc.msg_send_chan <- &StreamMsg{Subj: subj, Data: data}
return nil
}
func (nc *NatsClient) CreatNatsQueueSubscription(subj string, queue string) (*nats.Subscription, error) {
id := uuid.NewV4().String()
sub, err := nc.nats_js.QueueSubscribeSync(subj, queue, nats.Durable(id), nats.DeliverNew())
return sub, err
}
func (nc *NatsClient) CreatNatsSubscription(subj string) (*nats.Subscription, error) {
id := uuid.NewV4().String()
sub, err := nc.nats_js.SubscribeSync(subj, nats.Durable(id), nats.DeliverNew())
return sub, err
}
func (nc *NatsClient) CreatNatsSubscriptionPull(subj string, queue string) (*nats.Subscription, error) {
sub, err := nc.nats_js.PullSubscribe(subj, queue)
return sub, err
}
func (nc *NatsClient) CreatNatsKeyValue(bukName string, ttl time.Duration) (nats.KeyValue, error) {
kv, err := nc.nats_js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: bukName,
TTL: ttl,
})
return kv, err
}
Go
1
https://gitee.com/lakertt/newtbig.git
git@gitee.com:lakertt/newtbig.git
lakertt
newtbig
newtbig
v0.2.0

搜索帮助