1 Star 0 Fork 0

SasukeBo/go-micro

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
nats.go 2.61 KB
一键复制 编辑 原始数据 按行查看 历史
汪波 提交于 2023-02-23 10:27 . fix: 替换包名
package nats
import (
"fmt"
"net"
"strings"
"time"
"gitee.com/sasukebo/go-micro/v4/config/source"
log "gitee.com/sasukebo/go-micro/v4/logger"
natsgo "github.com/nats-io/nats.go"
)
type nats struct {
url string
bucket string
key string
kv natsgo.KeyValue
opts source.Options
}
// DefaultBucket is the bucket that nats keys will be assumed to have if you
// haven't specified one
var (
DefaultBucket = "default"
DefaultKey = "micro_config"
)
func (n *nats) Read() (*source.ChangeSet, error) {
e, err := n.kv.Get(n.key)
if err != nil {
if err == natsgo.ErrKeyNotFound {
return nil, nil
}
return nil, err
}
if e.Value() == nil || len(e.Value()) == 0 {
return nil, fmt.Errorf("source not found: %s", n.key)
}
cs := &source.ChangeSet{
Data: e.Value(),
Format: n.opts.Encoder.String(),
Source: n.String(),
Timestamp: time.Now(),
}
cs.Checksum = cs.Sum()
return cs, nil
}
func (n *nats) Write(cs *source.ChangeSet) error {
_, err := n.kv.Put(n.key, cs.Data)
if err != nil {
return err
}
return nil
}
func (n *nats) String() string {
return "nats"
}
func (n *nats) Watch() (source.Watcher, error) {
return newWatcher(n.kv, n.bucket, n.key, n.String(), n.opts.Encoder)
}
func NewSource(opts ...source.Option) source.Source {
options := source.NewOptions(opts...)
config := natsgo.DefaultOptions
urls, ok := options.Context.Value(urlKey{}).([]string)
endpoints := []string{}
if ok {
for _, u := range urls {
addr, port, err := net.SplitHostPort(u)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "4222"
addr = u
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
} else if err == nil {
endpoints = append(endpoints, fmt.Sprintf("%s:%s", addr, port))
}
}
}
if len(endpoints) == 0 {
endpoints = append(endpoints, "127.0.0.1:4222")
}
bucket, ok := options.Context.Value(bucketKey{}).(string)
if !ok {
bucket = DefaultBucket
}
key, ok := options.Context.Value(keyKey{}).(string)
if !ok {
key = DefaultKey
}
config.Url = strings.Join(endpoints, ",")
nc, err := natsgo.Connect(config.Url)
if err != nil {
log.Error(err)
}
js, err := nc.JetStream(natsgo.MaxWait(10 * time.Second))
if err != nil {
log.Error(err)
}
kv, err := js.KeyValue(bucket)
if err == natsgo.ErrBucketNotFound || err == natsgo.ErrKeyNotFound {
kv, err = js.CreateKeyValue(&natsgo.KeyValueConfig{Bucket: bucket})
if err != nil {
log.Error(err)
}
}
if err != nil {
log.Error(err)
}
return &nats{
url: config.Url,
bucket: bucket,
key: key,
kv: kv,
opts: options,
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sasukebo/go-micro.git
git@gitee.com:sasukebo/go-micro.git
sasukebo
go-micro
go-micro
6e18eb58b836

搜索帮助