1 Star 0 Fork 2

安易科技(北京)有限公司/chameleon

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
nats.go 16.33 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
package messenger
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"gitee.com/anesec/chameleon/apis"
"gitee.com/anesec/chameleon/pkg/errdefs"
"gitee.com/anesec/chameleon/pkg/utils"
"gitee.com/anesec/mobius/goroutine"
"github.com/klauspost/compress/zstd"
natsgo "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
log "github.com/sirupsen/logrus"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DefaultStream = "Chameleon"
EnforcePolicyPrefix = "policy.enforce"
HeartbeatPrefix = "fort_heartbeat"
GenericPrefix = "fort_common"
defaultHost = "nats://127.0.0.1:4222"
defaultUser = "diss"
defaultPassword = "uWfRpVbazHEjx@76"
defaultReconnectWait = time.Second * 5
defaultPingInterval = time.Second * 15
defaultTimeout = time.Second * 5
defaultFetchInterval = time.Second * 30
defaultMaxPingOutstanding = 2
)
func newNats(ctx context.Context, options *messengerOptions) (Messenger, error) {
msgr := &nats{}
if options.stateChange != nil {
msgr.stateChange.Store(&options.stateChange)
}
if err := msgr.parse(options); err != nil {
return nil, err
}
var err error
retries := 5
for {
if err = msgr.connect(); err == nil {
break
}
retries--
if retries == 0 {
_ = msgr.Close()
return nil, err
}
log.Warnf("Connecting to %s failed, %v", msgr.url(), err)
log.Infof("Try re-connect %s in 30 seconds", msgr.url())
select {
case <-time.After(time.Second * 30):
case <-ctx.Done():
return nil, ctx.Err()
}
}
goroutine.Run(context.TODO(), func(_ context.Context) {
msgr.sentinel()
}, "nats::sentinel")
return msgr, nil
}
type nats struct {
*natsgo.Conn
streams []*jetstream.StreamConfig
jsc jetstream.JetStream
host string
user string
password string
cluster string
server string
stateChange atomic.Pointer[func(State)]
closed int32
rwl sync.RWMutex
}
func (msgr *nats) Name() string {
return Nats
}
func (msgr *nats) Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error) {
// 消费数据模式,订阅|拉取
// 如是拉取需要处理周期和批次
var (
option fetchOptions
messages chan *apis.Message
errs chan error
err error
)
errs = make(chan error, 16)
if atomic.LoadInt32(&msgr.closed) == 1 {
errs <- fmt.Errorf("nats: %w", errdefs.ErrClosed)
close(errs)
return nil, errs
}
for _, o := range options {
o.apply(&option)
}
if !msgr.isConnected() {
log.Errorf("Nats connection(%s) is not ready, %v", msgr.url(), err)
errs <- err
close(errs)
return nil, errs
}
messages = make(chan *apis.Message, 16)
switch option.mode {
case Push:
goroutine.Run(ctx, func(ctx context.Context) {
defer func() {
close(messages)
close(errs)
}()
if err = msgr.consume(ctx, option, messages); err != nil {
log.Errorf("NATS consume topic %s failed, %v", option.topic, err)
errs <- err
}
}, fmt.Sprintf("consumer.%s", option.topic))
case Pull:
goroutine.Run(ctx, func(ctx context.Context) {
defer func() {
close(messages)
close(errs)
}()
if err = msgr.fetch(ctx, option, messages); err != nil {
log.Errorf("NATS fetch topic %s topic failed, %v", option.topic, err)
errs <- err
}
}, fmt.Sprintf("fetcher.%s", option.topic))
default:
errs <- fmt.Errorf("invalid nats work mode %q", option.mode)
close(errs)
close(messages)
messages = nil
}
return messages, errs
}
func (msgr *nats) Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error {
var (
data []byte
err error
)
if msg == nil {
return nil
}
if atomic.LoadInt32(&msgr.closed) == 1 {
return fmt.Errorf("nats: %w", errdefs.ErrClosed)
}
if !msgr.isConnected() {
return fmt.Errorf("nats connection(%s) is not ready", msgr.url())
}
opts := writeOptions{stream: true}
for _, option := range options {
option.apply(&opts)
}
if opts.topic == "" {
return fmt.Errorf("unable to write message, topic is empty")
}
if !opts.rawData {
data, err = json.Marshal(msg)
} else {
data, err = json.Marshal(msg.Data)
}
if err != nil {
return err
}
buf := utils.Acquire(utils.BufferSize(len(data)))
defer utils.Recycle(buf)
if err = msgr.compress(data, buf); err != nil {
return err
}
log.Debugf("topic: %s", opts.topic)
log.Debugf("%s", data)
msgr.rwl.RLock()
if opts.stream && msgr.jsc != nil {
_, err = msgr.jsc.Publish(ctx, opts.topic, buf.Bytes())
} else if !opts.stream && msgr.Conn != nil {
err = msgr.Publish(opts.topic, buf.Bytes())
} else if atomic.LoadInt32(&msgr.closed) == 1 {
err = fmt.Errorf("nats: %w", errdefs.ErrClosed)
} else {
err = fmt.Errorf("nats: %w", errdefs.ErrNotInitialized)
}
msgr.rwl.RUnlock()
return err
}
func (msgr *nats) Close() error {
// 关闭所有链接
if !atomic.CompareAndSwapInt32(&msgr.closed, 0, 1) {
return nil
}
msgr.rwl.Lock()
if msgr.Conn != nil {
msgr.Conn.Close()
msgr.Conn = nil
msgr.jsc = nil
}
msgr.rwl.Unlock()
return nil
}
func (msgr *nats) parse(options *messengerOptions) error {
if options != nil {
if options.Endpoint != "" {
if !strings.HasPrefix(options.Endpoint, "nats://") {
options.Endpoint = fmt.Sprintf("nats://%s", options.Endpoint)
}
msgr.host = options.Endpoint
}
if options.User != "" && options.Credential != "" {
msgr.user = options.User
msgr.password = options.Credential
}
}
if msgr.host == "" {
msgr.host = defaultHost
}
addr, err := url.Parse(msgr.host)
if err != nil {
return fmt.Errorf("invalid nats host, %s", msgr.host)
}
if addr.Scheme == "" {
addr.Scheme = "nats"
}
if addr.Port() == "" {
addr.Host += ":4222"
}
msgr.server = addr.Host
msgr.host = fmt.Sprintf("%s://%s", addr.Scheme, addr.Host)
if addr.User != nil {
msgr.user = addr.User.Username()
msgr.password, _ = addr.User.Password()
}
if msgr.user == "" {
msgr.user = defaultUser
}
if msgr.password == "" {
msgr.password = defaultPassword
}
return nil
}
func (msgr *nats) connect() error {
var err error
msgr.rwl.Lock()
defer msgr.rwl.Unlock()
log.Infof("Connecting to %s", msgr.url())
msgr.Conn, err = natsgo.Connect(msgr.host,
natsgo.Name("Chameleon Messenger"),
natsgo.RetryOnFailedConnect(false),
natsgo.MaxReconnects(-1),
natsgo.Timeout(defaultTimeout),
natsgo.ReconnectWait(defaultReconnectWait),
natsgo.PingInterval(defaultPingInterval),
natsgo.MaxPingsOutstanding(defaultMaxPingOutstanding),
natsgo.UserInfo(msgr.user, msgr.password),
natsgo.Secure(&tls.Config{
InsecureSkipVerify: true,
MinVersion: tls.VersionTLS12,
}),
natsgo.ReconnectHandler(func(conn *natsgo.Conn) {
msgr.server = conn.ConnectedAddr()
msgr.cluster = conn.ConnectedClusterName()
if err = msgr.createOrUpdateStream(); err != nil {
log.Warnf("NATS initializes required streams failed, %v", err)
log.Infof("NATS try reconnect to %s in 30 seconds", msgr.url())
go func() {
time.Sleep(30 * time.Second)
_ = conn.ForceReconnect()
}()
return
}
msgr.notifyStateChanged(Online)
log.Infof("NATS connected to %s", msgr.url())
}),
natsgo.DisconnectErrHandler(func(conn *natsgo.Conn, err error) {
msgr.notifyStateChanged(Offline)
log.Errorf("NATS connection from %s lost, reason: %v, try reconnect in %s", msgr.url(), err, defaultReconnectWait)
}),
)
if err != nil {
return err
}
if msgr.jsc, err = jetstream.New(msgr.Conn); err != nil {
msgr.Conn.Close()
msgr.Conn = nil
return err
}
msgr.server = msgr.ConnectedAddr()
msgr.cluster = msgr.ConnectedClusterName()
if err = msgr.createOrUpdateStream(); err != nil {
msgr.Conn.Close()
msgr.jsc = nil
msgr.Conn = nil
return err
}
msgr.notifyStateChanged(Online)
log.Infof("NATS connected to %s", msgr.url())
return nil
}
func (msgr *nats) subscribe(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
// 直接订阅topic,不绑定stream
if opts.topic == "" {
return fmt.Errorf("unable to subscribe message, topic is empty")
}
var err error
_, err = msgr.Conn.Subscribe(opts.topic, func(raw *natsgo.Msg) {
if raw == nil {
return
}
jsm := &jetStreamMsg{raw}
if msg := msgr.process(jsm); msg != nil {
messages <- msg
}
})
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
}
}
}
// 订阅模式
func (msgr *nats) consume(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
var (
consumer jetstream.Consumer
consumerCtx jetstream.ConsumeContext
err error
)
if opts.stream == "" {
if err = msgr.subscribe(ctx, opts, messages); err != nil {
return err
}
}
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckNonePolicy,
//MaxRequestExpires: time.Second * 15,
FilterSubjects: []string{opts.topic},
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
if consumerCtx, err = consumer.Consume(func(raw jetstream.Msg) {
if msg := msgr.process(raw); msg != nil {
messages <- msg
}
}); err != nil {
return err
}
for {
select {
case <-ctx.Done():
consumerCtx.Stop()
return ctx.Err()
case <-consumerCtx.Closed():
return fmt.Errorf("consumer [%s] has been closed", opts.topic)
}
}
}
// 拉取模式
func (msgr *nats) fetch(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
if opts.topic == "" {
return fmt.Errorf("topic is empty")
}
if opts.stream == "" {
return fmt.Errorf("fetch message should bind stream")
}
var (
consumer jetstream.Consumer
err error
)
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckExplicitPolicy,
MaxRequestExpires: time.Second * 15,
FilterSubject: opts.topic,
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
var (
msgs jetstream.MessageBatch
interval time.Duration
)
interval = defaultFetchInterval
if opts.interval > 0 {
interval = opts.interval
}
ticker := time.NewTicker(interval)
consumerTicker := time.NewTicker(time.Minute)
defer ticker.Stop()
defer consumerTicker.Stop()
for {
select {
case <-ticker.C:
log.Debugf("Start fetch [%s] message", opts.topic)
if msgs, err = consumer.Fetch(opts.batch, jetstream.FetchMaxWait(time.Second*5)); err != nil {
log.Errorf("Consumer [%s] fetch message failed, %v", opts.topic, err)
break
}
if err = msgs.Error(); err != nil {
log.Errorf("Consumer [%s] fetch message failed, %v", opts.topic, err)
break
}
for raw := range msgs.Messages() {
if msg := msgr.process(raw); msg != nil {
messages <- msg
}
if err = raw.Ack(); err != nil {
log.Warnf("Consumer [%s] acknowlege message failed, %v", opts.topic, err)
}
}
case <-consumerTicker.C:
_, err = msgr.jsc.Consumer(ctx, opts.stream, opts.topic)
if err != nil {
log.Debugf("Attempt to recreate the consumer[%s]", opts.topic)
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckExplicitPolicy,
MaxRequestExpires: time.Second * 15,
FilterSubject: opts.topic,
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (msgr *nats) process(raw jetstream.Msg) (msg *apis.Message) {
var err error
topic := raw.Subject()
buf := utils.Acquire(utils.BufferSize(len(raw.Data()) * 3))
defer utils.Recycle(buf)
if err = msgr.decompress(raw.Data(), buf); err != nil {
log.Errorf("Consumer [%s] received malfored data failed, %v", topic, err)
return
}
if buf.Len() == 0 {
log.Warnf("Consumer [%s] received empty message", topic)
return nil
}
log.Debugf("%s", buf.Bytes())
msg = &apis.Message{}
if err = json.Unmarshal(buf.Bytes(), msg); err != nil {
log.Errorf("Consumer [%s] received malfored data failed, %v", topic, err)
return nil
}
msg.Topic = raw.Reply()
return
}
func (msgr *nats) sentinel() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
// check nats message queue state every minute
case <-ticker.C:
if !msgr.isConnected() {
return
}
msgr.rwl.Lock()
// 检查是否需要重新创建流
for _, stream := range msgr.streams {
_, err := msgr.jsc.Stream(context.TODO(), stream.Name)
if err == nil || !errors.Is(err, jetstream.ErrStreamNotFound) {
continue
}
if _, err = msgr.jsc.CreateOrUpdateStream(context.TODO(), *stream); err != nil && !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
log.Warnf("Add stram '%s' failed, %v", stream.Name, err)
}
}
msgr.rwl.Unlock()
}
}
}
func (msgr *nats) isConnected() (connected bool) {
msgr.rwl.RLock()
connected = msgr != nil && msgr.Conn.IsConnected()
msgr.rwl.RUnlock()
return
}
func (msgr *nats) notifyStateChanged(state State) {
if stateChange := msgr.stateChange.Load(); stateChange != nil {
(*stateChange)(state)
}
}
func (msgr *nats) createOrUpdateStream() (err error) {
if len(msgr.streams) == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10*time.Duration(len(msgr.streams)))
defer cancel()
for _, stream := range msgr.streams {
if _, err = msgr.jsc.CreateOrUpdateStream(ctx, *stream); err != nil {
return err
}
}
return
}
func (msgr *nats) compress(data []byte, out *bytes.Buffer) error {
encoder, err := zstd.NewWriter(out, zstd.WithEncoderLevel(zstd.SpeedDefault))
if err != nil {
return err
}
if _, err = encoder.Write(data); err != nil {
_ = encoder.Close()
return err
}
return encoder.Close()
}
func (msgr *nats) decompress(data []byte, out *bytes.Buffer) error {
decoder, err := zstd.NewReader(bytes.NewBuffer(data))
if err != nil {
return err
}
defer decoder.Close()
_, err = io.Copy(out, decoder)
return err
}
func (msgr *nats) url() string {
if msgr.cluster != "" {
return fmt.Sprintf("nats://%s [cluster=%s]", msgr.server, msgr.cluster)
}
return fmt.Sprintf("nats://%s", msgr.server)
}
var errIllegalJetStreamMsg = errors.New("illegal jetstream message")
type jetStreamMsg struct {
msg *natsgo.Msg
}
func (jsm *jetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) {
if jsm.msg == nil {
return nil, errIllegalJetStreamMsg
}
metadata, err := jsm.msg.Metadata()
if err != nil {
return nil, err
}
return &jetstream.MsgMetadata{
Stream: metadata.Stream,
Consumer: metadata.Consumer,
NumDelivered: metadata.NumDelivered,
NumPending: metadata.NumPending,
Sequence: jetstream.SequencePair{
Stream: metadata.Sequence.Stream,
Consumer: metadata.Sequence.Consumer,
},
Domain: metadata.Domain,
Timestamp: metadata.Timestamp,
}, nil
}
func (jsm *jetStreamMsg) Data() []byte {
if jsm.msg == nil {
return nil
}
return jsm.msg.Data
}
func (jsm *jetStreamMsg) Headers() natsgo.Header {
if jsm.msg == nil {
return nil
}
return jsm.msg.Header
}
func (jsm *jetStreamMsg) Subject() string {
if jsm.msg == nil {
return ""
}
return jsm.msg.Subject
}
func (jsm *jetStreamMsg) Reply() string {
if jsm.msg == nil {
return ""
}
return jsm.msg.Reply
}
func (jsm *jetStreamMsg) Ack() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Ack()
}
func (jsm *jetStreamMsg) DoubleAck(ctx context.Context) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.AckSync(natsgo.Context(ctx))
}
func (jsm *jetStreamMsg) Nak() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Nak()
}
func (jsm *jetStreamMsg) NakWithDelay(delay time.Duration) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.NakWithDelay(delay)
}
func (jsm *jetStreamMsg) InProgress() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.InProgress()
}
func (jsm *jetStreamMsg) Term() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Term()
}
func (jsm *jetStreamMsg) TermWithReason(_ string) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Term()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/anesec/chameleon.git
git@gitee.com:anesec/chameleon.git
anesec
chameleon
chameleon
205da4a0ed50

搜索帮助