1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
registry.go 4.64 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-03-27 17:19 . 修改创建的时候的配置
package etcd
import (
"context"
"encoding/json"
"fmt"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
"gitee.com/tylf2018/go-micro-framework/registry"
regOps "gitee.com/tylf2018/go-micro-framework/registry/registry"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"math/rand"
"time"
)
type Option func(o *options)
type options struct {
ctx context.Context
namespace string
ttl time.Duration
maxRetry int
}
// EtcdRegistry is etcd registry.
type EtcdRegistry struct {
opts *options
client *clientv3.Client
kv clientv3.KV
lease clientv3.Lease
}
// NewEtcdRegistry creates etcd registry
func NewEtcdRegistry(registryOptions *regOps.RegistryOptions, opts ...Option) (r *EtcdRegistry) {
op := &options{
ctx: context.Background(),
namespace: "/microservices",
ttl: time.Second * 15,
maxRetry: 5,
}
for _, o := range opts {
o(op)
}
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{registryOptions.Address},
DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
log.ErrorF("create etcd client failed.error=%v", err.Error())
panic(err)
}
return &EtcdRegistry{
opts: op,
client: client,
kv: clientv3.NewKV(client),
}
}
// Register the registration.
func (r *EtcdRegistry) Register(ctx context.Context, service *registry.ServiceInstance) error {
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
value, err := marshal(service)
if err != nil {
return err
}
if r.lease != nil {
r.lease.Close()
}
r.lease = clientv3.NewLease(r.client)
leaseID, err := r.registerWithKV(ctx, key, value)
if err != nil {
return err
}
go r.heartBeat(r.opts.ctx, leaseID, key, value)
return nil
}
// Deregister the registration.
func (r *EtcdRegistry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
defer func() {
if r.lease != nil {
r.lease.Close()
}
}()
key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID)
_, err := r.client.Delete(ctx, key)
return err
}
// registerWithKV create a new lease, return current leaseID
func (r *EtcdRegistry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) {
grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds()))
if err != nil {
return 0, err
}
_, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID))
if err != nil {
return 0, err
}
return grant.ID, nil
}
func (r *EtcdRegistry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) {
curLeaseID := leaseID
kac, err := r.client.KeepAlive(ctx, leaseID)
if err != nil {
curLeaseID = 0
}
rand.Seed(time.Now().Unix())
for {
if curLeaseID == 0 {
// try to registerWithKV
var retreat []int
for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ {
if ctx.Err() != nil {
return
}
// prevent infinite blocking
idChan := make(chan clientv3.LeaseID, 1)
errChan := make(chan error, 1)
cancelCtx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
id, registerErr := r.registerWithKV(cancelCtx, key, value)
if registerErr != nil {
errChan <- registerErr
} else {
idChan <- id
}
}()
select {
case <-time.After(3 * time.Second):
cancel()
continue
case <-errChan:
continue
case curLeaseID = <-idChan:
}
kac, err = r.client.KeepAlive(ctx, curLeaseID)
if err == nil {
break
}
retreat = append(retreat, 1<<retryCnt)
time.Sleep(time.Duration(retreat[rand.Intn(len(retreat))]) * time.Second)
}
if _, ok := <-kac; !ok {
// retry failed
return
}
}
select {
case _, ok := <-kac:
if !ok {
if ctx.Err() != nil {
// channel closed due to context cancel
return
}
// need to retry registration
curLeaseID = 0
continue
}
case <-r.opts.ctx.Done():
return
}
}
}
// WithContext with registry context.
func WithContext(ctx context.Context) Option {
return func(o *options) { o.ctx = ctx }
}
// WithNamespace with registry namespace.
func WithNamespace(ns string) Option {
return func(o *options) { o.namespace = ns }
}
// WithRegisterTTL with register ttl.
func WithRegisterTTL(ttl time.Duration) Option {
return func(o *options) { o.ttl = ttl }
}
func WithMaxRetry(num int) Option {
return func(o *options) { o.maxRetry = num }
}
func marshal(si *registry.ServiceInstance) (string, error) {
data, err := json.Marshal(si)
if err != nil {
return "", err
}
return string(data), nil
}
func unmarshal(data []byte) (si *registry.ServiceInstance, err error) {
err = json.Unmarshal(data, &si)
return
}
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
bd95c43b90bc

搜索帮助