代码拉取完成,页面将自动刷新
package etcd
import (
"context"
"fmt"
"gitee.com/monobytes/gcore/gencoding/json"
"gitee.com/monobytes/gcore/gregistry"
"go.etcd.io/etcd/client/v3"
"sync"
)
const name = "etcd"
var _ gregistry.Registry = &Registry{}
type Registry struct {
err error
ctx context.Context
cancel context.CancelFunc
opts *options
builtin bool
watchers sync.Map
registrars sync.Map
}
func NewRegistry(opts ...Option) *Registry {
o := defaultOptions()
for _, opt := range opts {
opt(o)
}
r := &Registry{}
r.opts = o
r.ctx, r.cancel = context.WithCancel(o.ctx)
if o.client == nil {
r.builtin = true
o.client, r.err = clientv3.New(clientv3.Config{
Endpoints: o.addrs,
DialTimeout: o.dialTimeout,
})
}
return r
}
// Name 获取服务注册发现组件名
func (r *Registry) Name() string {
return name
}
// Register 注册服务实例
func (r *Registry) Register(ctx context.Context, ins *gregistry.ServiceInstance) error {
if r.err != nil {
return r.err
}
v, ok := r.registrars.Load(ins.ID)
if ok {
return v.(*registrar).register(ctx, ins)
}
reg := newRegistrar(r)
if err := reg.register(ctx, ins); err != nil {
return err
}
r.registrars.Store(ins.ID, reg)
return nil
}
// Deregister 解注册服务实例
func (r *Registry) Deregister(ctx context.Context, ins *gregistry.ServiceInstance) error {
if r.err != nil {
return r.err
}
v, ok := r.registrars.Load(ins.ID)
if ok {
return v.(*registrar).deregister(ctx, ins)
}
key := fmt.Sprintf("/%s/%s/%s", r.opts.namespace, ins.Name, ins.ID)
_, err := r.opts.client.Delete(ctx, key)
return err
}
// Watch 监听相同服务名的服务实例变化
func (r *Registry) Watch(ctx context.Context, serviceName string) (gregistry.Watcher, error) {
if r.err != nil {
return nil, r.err
}
v, ok := r.watchers.Load(serviceName)
if ok {
return v.(*watcherMgr).fork(), nil
}
w, err := newWatcherMgr(r, ctx, serviceName)
if err != nil {
return nil, err
}
r.watchers.Store(serviceName, w)
return w.fork(), nil
}
// Services 获取服务实例列表
func (r *Registry) Services(ctx context.Context, serviceName string) ([]*gregistry.ServiceInstance, error) {
if r.err != nil {
return nil, r.err
}
v, ok := r.watchers.Load(serviceName)
if ok {
return v.(*watcherMgr).services(), nil
} else {
return r.services(ctx, serviceName)
}
}
// Close 关闭服务注册发现
func (r *Registry) Close() error {
if r.err != nil {
return r.err
}
r.cancel()
if r.builtin {
return r.opts.client.Close()
}
return nil
}
// 获取服务实例列表
func (r *Registry) services(ctx context.Context, serviceName string) ([]*gregistry.ServiceInstance, error) {
res, err := r.opts.client.Get(ctx, buildPrefixKey(r.opts.namespace, serviceName), clientv3.WithPrefix())
if err != nil {
return nil, err
}
services := make([]*gregistry.ServiceInstance, 0, len(res.Kvs))
for _, kv := range res.Kvs {
service, err := unmarshal(kv.Value)
if err != nil {
return nil, err
}
services = append(services, service)
}
return services, nil
}
func marshal(ins *gregistry.ServiceInstance) (string, error) {
buf, err := json.Marshal(ins)
if err != nil {
return "", err
}
return string(buf), nil
}
func unmarshal(data []byte) (*gregistry.ServiceInstance, error) {
ins := &gregistry.ServiceInstance{}
if err := json.Unmarshal(data, ins); err != nil {
return nil, err
}
return ins, nil
}
func buildPrefixKey(namespace, serviceName string) string {
return fmt.Sprintf("/%s/%s", namespace, serviceName)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。