代码拉取完成,页面将自动刷新
package messenger
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"gitee.com/anesec/chameleon/apis"
"gitee.com/anesec/chameleon/pkg/errdefs"
"gitee.com/anesec/chameleon/pkg/utils"
"gitee.com/anesec/mobius/goroutine"
"github.com/klauspost/compress/zstd"
natsgo "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
log "github.com/sirupsen/logrus"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DefaultStream = "Chameleon"
EnforcePolicyPrefix = "policy.enforce"
HeartbeatPrefix = "fort_heartbeat"
GenericPrefix = "fort_common"
defaultHost = "nats://127.0.0.1:4222"
defaultUser = "diss"
defaultPassword = "uWfRpVbazHEjx@76"
defaultReconnectWait = time.Second * 5
defaultPingInterval = time.Second * 15
defaultTimeout = time.Second * 5
defaultFetchInterval = time.Second * 30
defaultMaxPingOutstanding = 2
)
func newNats(ctx context.Context, options *messengerOptions) (Messenger, error) {
msgr := &nats{}
if options.stateChange != nil {
msgr.stateChange.Store(&options.stateChange)
}
if err := msgr.parse(options); err != nil {
return nil, err
}
var err error
retries := 5
for {
if err = msgr.connect(); err == nil {
break
}
retries--
if retries == 0 {
_ = msgr.Close()
return nil, err
}
log.Warnf("Connecting to %s failed, %v", msgr.url(), err)
log.Infof("Try re-connect %s in 30 seconds", msgr.url())
select {
case <-time.After(time.Second * 30):
case <-ctx.Done():
return nil, ctx.Err()
}
}
goroutine.Run(context.TODO(), func(_ context.Context) {
msgr.sentinel()
}, "nats::sentinel")
return msgr, nil
}
type nats struct {
*natsgo.Conn
streams []*jetstream.StreamConfig
jsc jetstream.JetStream
host string
user string
password string
cluster string
server string
stateChange atomic.Pointer[func(State)]
closed int32
rwl sync.RWMutex
}
func (msgr *nats) Name() string {
return Nats
}
func (msgr *nats) Messages(ctx context.Context, options ...FetchOption) (<-chan *apis.Message, <-chan error) {
// 消费数据模式,订阅|拉取
// 如是拉取需要处理周期和批次
var (
option fetchOptions
messages chan *apis.Message
errs chan error
err error
)
errs = make(chan error, 16)
if atomic.LoadInt32(&msgr.closed) == 1 {
errs <- fmt.Errorf("nats: %w", errdefs.ErrClosed)
close(errs)
return nil, errs
}
for _, o := range options {
o.apply(&option)
}
if !msgr.isConnected() {
log.Errorf("Nats connection(%s) is not ready, %v", msgr.url(), err)
errs <- err
close(errs)
return nil, errs
}
messages = make(chan *apis.Message, 16)
switch option.mode {
case Push:
goroutine.Run(ctx, func(ctx context.Context) {
defer func() {
close(messages)
close(errs)
}()
if err = msgr.consume(ctx, option, messages); err != nil {
log.Errorf("NATS consume topic %s failed, %v", option.topic, err)
errs <- err
}
}, fmt.Sprintf("consumer.%s", option.topic))
case Pull:
goroutine.Run(ctx, func(ctx context.Context) {
defer func() {
close(messages)
close(errs)
}()
if err = msgr.fetch(ctx, option, messages); err != nil {
log.Errorf("NATS fetch topic %s topic failed, %v", option.topic, err)
errs <- err
}
}, fmt.Sprintf("fetcher.%s", option.topic))
default:
errs <- fmt.Errorf("invalid nats work mode %q", option.mode)
close(errs)
close(messages)
messages = nil
}
return messages, errs
}
func (msgr *nats) Write(ctx context.Context, msg *apis.Message, options ...WriteOption) error {
var (
data []byte
err error
)
if msg == nil {
return nil
}
if atomic.LoadInt32(&msgr.closed) == 1 {
return fmt.Errorf("nats: %w", errdefs.ErrClosed)
}
if !msgr.isConnected() {
return fmt.Errorf("nats connection(%s) is not ready", msgr.url())
}
opts := writeOptions{stream: true}
for _, option := range options {
option.apply(&opts)
}
if opts.topic == "" {
return fmt.Errorf("unable to write message, topic is empty")
}
if !opts.rawData {
data, err = json.Marshal(msg)
} else {
data, err = json.Marshal(msg.Data)
}
if err != nil {
return err
}
buf := utils.Acquire(utils.BufferSize(len(data)))
defer utils.Recycle(buf)
if err = msgr.compress(data, buf); err != nil {
return err
}
log.Debugf("topic: %s", opts.topic)
log.Debugf("%s", data)
msgr.rwl.RLock()
if opts.stream && msgr.jsc != nil {
_, err = msgr.jsc.Publish(ctx, opts.topic, buf.Bytes())
} else if !opts.stream && msgr.Conn != nil {
err = msgr.Publish(opts.topic, buf.Bytes())
} else if atomic.LoadInt32(&msgr.closed) == 1 {
err = fmt.Errorf("nats: %w", errdefs.ErrClosed)
} else {
err = fmt.Errorf("nats: %w", errdefs.ErrNotInitialized)
}
msgr.rwl.RUnlock()
return err
}
func (msgr *nats) Close() error {
// 关闭所有链接
if !atomic.CompareAndSwapInt32(&msgr.closed, 0, 1) {
return nil
}
msgr.rwl.Lock()
if msgr.Conn != nil {
msgr.Conn.Close()
msgr.Conn = nil
msgr.jsc = nil
}
msgr.rwl.Unlock()
return nil
}
func (msgr *nats) parse(options *messengerOptions) error {
if options != nil {
if options.Endpoint != "" {
if !strings.HasPrefix(options.Endpoint, "nats://") {
options.Endpoint = fmt.Sprintf("nats://%s", options.Endpoint)
}
msgr.host = options.Endpoint
}
if options.User != "" && options.Credential != "" {
msgr.user = options.User
msgr.password = options.Credential
}
}
if msgr.host == "" {
msgr.host = defaultHost
}
addr, err := url.Parse(msgr.host)
if err != nil {
return fmt.Errorf("invalid nats host, %s", msgr.host)
}
if addr.Scheme == "" {
addr.Scheme = "nats"
}
if addr.Port() == "" {
addr.Host += ":4222"
}
msgr.server = addr.Host
msgr.host = fmt.Sprintf("%s://%s", addr.Scheme, addr.Host)
if addr.User != nil {
msgr.user = addr.User.Username()
msgr.password, _ = addr.User.Password()
}
if msgr.user == "" {
msgr.user = defaultUser
}
if msgr.password == "" {
msgr.password = defaultPassword
}
return nil
}
func (msgr *nats) connect() error {
var err error
msgr.rwl.Lock()
defer msgr.rwl.Unlock()
log.Infof("Connecting to %s", msgr.url())
msgr.Conn, err = natsgo.Connect(msgr.host,
natsgo.Name("Chameleon Messenger"),
natsgo.RetryOnFailedConnect(false),
natsgo.MaxReconnects(-1),
natsgo.Timeout(defaultTimeout),
natsgo.ReconnectWait(defaultReconnectWait),
natsgo.PingInterval(defaultPingInterval),
natsgo.MaxPingsOutstanding(defaultMaxPingOutstanding),
natsgo.UserInfo(msgr.user, msgr.password),
natsgo.Secure(&tls.Config{
InsecureSkipVerify: true,
MinVersion: tls.VersionTLS12,
}),
natsgo.ReconnectHandler(func(conn *natsgo.Conn) {
msgr.server = conn.ConnectedAddr()
msgr.cluster = conn.ConnectedClusterName()
if err = msgr.createOrUpdateStream(); err != nil {
log.Warnf("NATS initializes required streams failed, %v", err)
log.Infof("NATS try reconnect to %s in 30 seconds", msgr.url())
go func() {
time.Sleep(30 * time.Second)
_ = conn.ForceReconnect()
}()
return
}
msgr.notifyStateChanged(Online)
log.Infof("NATS connected to %s", msgr.url())
}),
natsgo.DisconnectErrHandler(func(conn *natsgo.Conn, err error) {
msgr.notifyStateChanged(Offline)
log.Errorf("NATS connection from %s lost, reason: %v, try reconnect in %s", msgr.url(), err, defaultReconnectWait)
}),
)
if err != nil {
return err
}
if msgr.jsc, err = jetstream.New(msgr.Conn); err != nil {
msgr.Conn.Close()
msgr.Conn = nil
return err
}
msgr.server = msgr.ConnectedAddr()
msgr.cluster = msgr.ConnectedClusterName()
if err = msgr.createOrUpdateStream(); err != nil {
msgr.Conn.Close()
msgr.jsc = nil
msgr.Conn = nil
return err
}
msgr.notifyStateChanged(Online)
log.Infof("NATS connected to %s", msgr.url())
return nil
}
func (msgr *nats) subscribe(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
// 直接订阅topic,不绑定stream
if opts.topic == "" {
return fmt.Errorf("unable to subscribe message, topic is empty")
}
var err error
_, err = msgr.Conn.Subscribe(opts.topic, func(raw *natsgo.Msg) {
if raw == nil {
return
}
jsm := &jetStreamMsg{raw}
if msg := msgr.process(jsm); msg != nil {
messages <- msg
}
})
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
}
}
}
// 订阅模式
func (msgr *nats) consume(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
var (
consumer jetstream.Consumer
consumerCtx jetstream.ConsumeContext
err error
)
if opts.stream == "" {
if err = msgr.subscribe(ctx, opts, messages); err != nil {
return err
}
}
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckNonePolicy,
//MaxRequestExpires: time.Second * 15,
FilterSubjects: []string{opts.topic},
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
if consumerCtx, err = consumer.Consume(func(raw jetstream.Msg) {
if msg := msgr.process(raw); msg != nil {
messages <- msg
}
}); err != nil {
return err
}
for {
select {
case <-ctx.Done():
consumerCtx.Stop()
return ctx.Err()
case <-consumerCtx.Closed():
return fmt.Errorf("consumer [%s] has been closed", opts.topic)
}
}
}
// 拉取模式
func (msgr *nats) fetch(ctx context.Context, opts fetchOptions, messages chan *apis.Message) error {
if opts.topic == "" {
return fmt.Errorf("topic is empty")
}
if opts.stream == "" {
return fmt.Errorf("fetch message should bind stream")
}
var (
consumer jetstream.Consumer
err error
)
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckExplicitPolicy,
MaxRequestExpires: time.Second * 15,
FilterSubject: opts.topic,
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
var (
msgs jetstream.MessageBatch
interval time.Duration
)
interval = defaultFetchInterval
if opts.interval > 0 {
interval = opts.interval
}
ticker := time.NewTicker(interval)
consumerTicker := time.NewTicker(time.Minute)
defer ticker.Stop()
defer consumerTicker.Stop()
for {
select {
case <-ticker.C:
log.Debugf("Start fetch [%s] message", opts.topic)
if msgs, err = consumer.Fetch(opts.batch, jetstream.FetchMaxWait(time.Second*5)); err != nil {
log.Errorf("Consumer [%s] fetch message failed, %v", opts.topic, err)
break
}
if err = msgs.Error(); err != nil {
log.Errorf("Consumer [%s] fetch message failed, %v", opts.topic, err)
break
}
for raw := range msgs.Messages() {
if msg := msgr.process(raw); msg != nil {
messages <- msg
}
if err = raw.Ack(); err != nil {
log.Warnf("Consumer [%s] acknowlege message failed, %v", opts.topic, err)
}
}
case <-consumerTicker.C:
_, err = msgr.jsc.Consumer(ctx, opts.stream, opts.topic)
if err != nil {
log.Debugf("Attempt to recreate the consumer[%s]", opts.topic)
if consumer, err = msgr.jsc.CreateOrUpdateConsumer(ctx, opts.stream, jetstream.ConsumerConfig{
Durable: strings.TrimPrefix(opts.topic, "chameleon."),
AckPolicy: jetstream.AckExplicitPolicy,
MaxRequestExpires: time.Second * 15,
FilterSubject: opts.topic,
AckWait: time.Second * 5,
MaxAckPending: 10000,
}); err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (msgr *nats) process(raw jetstream.Msg) (msg *apis.Message) {
var err error
topic := raw.Subject()
buf := utils.Acquire(utils.BufferSize(len(raw.Data()) * 3))
defer utils.Recycle(buf)
if err = msgr.decompress(raw.Data(), buf); err != nil {
log.Errorf("Consumer [%s] received malfored data failed, %v", topic, err)
return
}
if buf.Len() == 0 {
log.Warnf("Consumer [%s] received empty message", topic)
return nil
}
log.Debugf("%s", buf.Bytes())
msg = &apis.Message{}
if err = json.Unmarshal(buf.Bytes(), msg); err != nil {
log.Errorf("Consumer [%s] received malfored data failed, %v", topic, err)
return nil
}
msg.Topic = raw.Reply()
return
}
func (msgr *nats) sentinel() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
// check nats message queue state every minute
case <-ticker.C:
if !msgr.isConnected() {
return
}
msgr.rwl.Lock()
// 检查是否需要重新创建流
for _, stream := range msgr.streams {
_, err := msgr.jsc.Stream(context.TODO(), stream.Name)
if err == nil || !errors.Is(err, jetstream.ErrStreamNotFound) {
continue
}
if _, err = msgr.jsc.CreateOrUpdateStream(context.TODO(), *stream); err != nil && !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
log.Warnf("Add stram '%s' failed, %v", stream.Name, err)
}
}
msgr.rwl.Unlock()
}
}
}
func (msgr *nats) isConnected() (connected bool) {
msgr.rwl.RLock()
connected = msgr != nil && msgr.Conn.IsConnected()
msgr.rwl.RUnlock()
return
}
func (msgr *nats) notifyStateChanged(state State) {
if stateChange := msgr.stateChange.Load(); stateChange != nil {
(*stateChange)(state)
}
}
func (msgr *nats) createOrUpdateStream() (err error) {
if len(msgr.streams) == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10*time.Duration(len(msgr.streams)))
defer cancel()
for _, stream := range msgr.streams {
if _, err = msgr.jsc.CreateOrUpdateStream(ctx, *stream); err != nil {
return err
}
}
return
}
func (msgr *nats) compress(data []byte, out *bytes.Buffer) error {
encoder, err := zstd.NewWriter(out, zstd.WithEncoderLevel(zstd.SpeedDefault))
if err != nil {
return err
}
if _, err = encoder.Write(data); err != nil {
_ = encoder.Close()
return err
}
return encoder.Close()
}
func (msgr *nats) decompress(data []byte, out *bytes.Buffer) error {
decoder, err := zstd.NewReader(bytes.NewBuffer(data))
if err != nil {
return err
}
defer decoder.Close()
_, err = io.Copy(out, decoder)
return err
}
func (msgr *nats) url() string {
if msgr.cluster != "" {
return fmt.Sprintf("nats://%s [cluster=%s]", msgr.server, msgr.cluster)
}
return fmt.Sprintf("nats://%s", msgr.server)
}
var errIllegalJetStreamMsg = errors.New("illegal jetstream message")
type jetStreamMsg struct {
msg *natsgo.Msg
}
func (jsm *jetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) {
if jsm.msg == nil {
return nil, errIllegalJetStreamMsg
}
metadata, err := jsm.msg.Metadata()
if err != nil {
return nil, err
}
return &jetstream.MsgMetadata{
Stream: metadata.Stream,
Consumer: metadata.Consumer,
NumDelivered: metadata.NumDelivered,
NumPending: metadata.NumPending,
Sequence: jetstream.SequencePair{
Stream: metadata.Sequence.Stream,
Consumer: metadata.Sequence.Consumer,
},
Domain: metadata.Domain,
Timestamp: metadata.Timestamp,
}, nil
}
func (jsm *jetStreamMsg) Data() []byte {
if jsm.msg == nil {
return nil
}
return jsm.msg.Data
}
func (jsm *jetStreamMsg) Headers() natsgo.Header {
if jsm.msg == nil {
return nil
}
return jsm.msg.Header
}
func (jsm *jetStreamMsg) Subject() string {
if jsm.msg == nil {
return ""
}
return jsm.msg.Subject
}
func (jsm *jetStreamMsg) Reply() string {
if jsm.msg == nil {
return ""
}
return jsm.msg.Reply
}
func (jsm *jetStreamMsg) Ack() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Ack()
}
func (jsm *jetStreamMsg) DoubleAck(ctx context.Context) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.AckSync(natsgo.Context(ctx))
}
func (jsm *jetStreamMsg) Nak() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Nak()
}
func (jsm *jetStreamMsg) NakWithDelay(delay time.Duration) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.NakWithDelay(delay)
}
func (jsm *jetStreamMsg) InProgress() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.InProgress()
}
func (jsm *jetStreamMsg) Term() error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Term()
}
func (jsm *jetStreamMsg) TermWithReason(_ string) error {
if jsm.msg == nil {
return errIllegalJetStreamMsg
}
return jsm.msg.Term()
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。