1 Star 0 Fork 0

Souki/go-framework

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
service_discovery.go 2.76 KB
一键复制 编辑 原始数据 按行查看 历史
sage 提交于 2022-12-05 17:36 +08:00 . Modify
package etcd
import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"log"
"strings"
"sync"
"time"
)
type ServiceDiscovery struct {
*EtcdV3Wrapper
ch clientv3.WatchChan
cc resolver.ClientConn
nodes map[string]string
mu sync.Mutex
}
func NewServiceDiscovery(endpoints string, dialTimeout int64) (*ServiceDiscovery, error) {
wp, err := NewEtcdV3Wrapper(endpoints, dialTimeout, 3)
if err != nil {
return nil, err
}
return &ServiceDiscovery{
EtcdV3Wrapper: wp,
nodes: map[string]string{},
}, nil
}
func (sdv *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(sdv.timeout))
defer cancel()
k := fmt.Sprintf("%s/%s/", RegisterPrefix, target.Endpoint)
resp, err := sdv.cli.Get(ctx, k, clientv3.WithPrefix())
if err != nil {
return nil, err
}
sdv.cc = cc
for _, kv := range resp.Kvs {
sdv.setAddr(string(kv.Key), string(kv.Value))
}
sdv.syncCC()
sdv.ch = sdv.cli.Watch(context.TODO(), k, clientv3.WithPrefix())
go sdv.watch()
return sdv, nil
}
func (sdv *ServiceDiscovery) watch() {
for {
resp, ok := <-sdv.ch
if !ok {
sdv.log("watch chan closed")
return
}
for _, ev := range resp.Events {
switch ev.Type {
case clientv3.EventTypePut:
sdv.setAddr(string(ev.Kv.Key), string(ev.Kv.Value))
case clientv3.EventTypeDelete:
sdv.delAddr(string(ev.Kv.Key))
}
sdv.log("event type:%s", ev.Type)
}
sdv.syncCC()
}
}
func (sdv *ServiceDiscovery) syncCC() {
sdv.mu.Lock()
defer sdv.mu.Unlock()
addresses := []resolver.Address{}
for _, v := range sdv.nodes {
attrs := attributes.New("weight", int64(0))
addr := v
//
rInfo := NewRegisterInfo()
err := rInfo.Parse(v)
if err == nil {
addr = rInfo.SrInvoke
}
addresses = append(addresses, resolver.Address{
Addr: addr,
Attributes: attrs,
})
}
//sdv.cc.NewAddress(addresses)
sdv.cc.UpdateState(resolver.State{
Addresses: addresses,
})
}
func (sdv *ServiceDiscovery) delAddr(k string) {
sdv.mu.Lock()
defer sdv.mu.Unlock()
delete(sdv.nodes, k)
}
func (sdv *ServiceDiscovery) setAddr(k, addr string) {
sdv.mu.Lock()
defer sdv.mu.Unlock()
sdv.nodes[k] = addr
}
func (sdv *ServiceDiscovery) Scheme() string {
return "grpclb"
}
func (sdv *ServiceDiscovery) ResolveNow(options resolver.ResolveNowOptions) {
sdv.log("connecting... %+v",options)
return
}
// Close closes the resolver.
func (sdv *ServiceDiscovery) Close() {
sdv.cli.Close()
sdv.log("cli close now")
}
func (sdv *ServiceDiscovery) log(s string, args ...interface{}) {
if !strings.HasSuffix(s, "\n") {
s = s + "\n"
}
log.Printf(s, args...)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/scottq/go-framework.git
git@gitee.com:scottq/go-framework.git
scottq
go-framework
go-framework
v1.1.46

搜索帮助