Ai
1 Star 0 Fork 0

TGodfather/go-plugins

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
watcher.go 5.58 KB
一键复制 编辑 原始数据 按行查看 历史
TGodfather 提交于 2021-11-28 21:57 +08:00 . update
package nacos
import (
"github.com/micro/go-micro/v2/logger"
mnet "github.com/micro/go-micro/v2/util/net"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
"reflect"
"sync"
"github.com/micro/go-micro/v2/registry"
)
type nacosWatcher struct {
client naming_client.INamingClient
wo registry.WatchOptions
next chan *registry.Result
exit chan bool
sync.RWMutex
services map[string][]*registry.Service
cacheServices map[string][]model.SubscribeService
param *vo.SubscribeParam
Doms []string
}
func newNacosWatcher(nr *nacosRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
logger.Info("newNacosWatcher :",wo)
nw := nacosWatcher{
client: nr.client,
wo: wo,
exit: make(chan bool),
next: make(chan *registry.Result, 10),
services: make(map[string][]*registry.Service),
cacheServices: make(map[string][]model.SubscribeService),
param: new(vo.SubscribeParam),
Doms: make([]string, 0),
}
withContext := false
if wo.Context != nil {
logger.Info("newNacosWatcher WatchOption with Context ")
if p, ok := wo.Context.Value("subscribe_param").(vo.SubscribeParam); ok {
nw.param = &p
withContext = ok
nw.param.SubscribeCallback = nw.callBackHandle
go nr.client.Subscribe(nw.param)
}
}
if !withContext {
logger.Info("newNacosWatcher WatchOption with no Context ")
//param := vo.GetAllServiceInfoParam{}
param := vo.GetAllServiceInfoParam{
GroupName: nr.defWatchGroup,
NameSpace: nr.defNameSpace,
}
services, err := nr.client.GetAllServicesInfo(param)
if err != nil {
return nil, err
}
param.PageNo = 1
param.PageSize = uint32(services.Count)
services, err = nr.client.GetAllServicesInfo(param)
if err != nil {
return nil, err
}
nw.Doms = services.Doms
for _, v := range nw.Doms {
param := &vo.SubscribeParam{
ServiceName: v,
SubscribeCallback: nw.callBackHandle,
GroupName: nr.defWatchGroup,
Clusters: nr.defWatchClusterList,
}
go nr.client.Subscribe(param)
}
}
return &nw, nil
}
func (nw *nacosWatcher) callBackHandle(services []model.SubscribeService, err error) {
if err != nil {
logger.Warnf("nacos watcher call back handle error:%v", err)
return
}
//是否需要循环?
logger.Infof("callBackHandle : len[services]:[%d] [%s] ", len(services), services[0].ServiceName)
serviceName := services[0].ServiceName
//nw.cacheServices 读写需要加锁,原作者写法不太好,后续考虑重写
//
nw.Lock()
done := false
if nw.cacheServices[serviceName] == nil {
//nw.Lock()
nw.cacheServices[serviceName] = services
//nw.Unlock()
for _, v := range services {
nw.next <- &registry.Result{Action: "create", Service: buildRegistryService(&v)}
//return
done = true
break
}
} else {
for _, subscribeService := range services {
create := true
for _, cacheService := range nw.cacheServices[serviceName] {
if subscribeService.InstanceId == cacheService.InstanceId {
if !reflect.DeepEqual(subscribeService, cacheService) {
//update instance
nw.next <- &registry.Result{Action: "update", Service: buildRegistryService(&subscribeService)}
//return
done = true
}
create = false
}
if done {
break
}
}
if done {
break
}
//new instance
if create {
nw.next <- &registry.Result{Action: "create", Service: buildRegistryService(&subscribeService)}
//nw.Lock()
nw.cacheServices[serviceName] = append(nw.cacheServices[serviceName], subscribeService)
//nw.Unlock()
//return
done = true
}
if done {
break
}
}
if !done {
for index, cacheService := range nw.cacheServices[serviceName] {
del := true
for _, subscribeService := range services {
if subscribeService.InstanceId == cacheService.InstanceId {
del = false
}
}
if del {
nw.next <- &registry.Result{Action: "delete", Service: buildRegistryService(&cacheService)}
//nw.Lock()
nw.cacheServices[serviceName][index] = model.SubscribeService{}
//nw.Unlock()
//return
done = true
}
if done {
break
}
}
}
}
nw.Unlock()
}
func buildRegistryService(v *model.SubscribeService) (s *registry.Service) {
nodes := make([]*registry.Node, 0)
nodes = append(nodes, &registry.Node{
Id: v.InstanceId,
Address: mnet.HostPort(v.Ip, v.Port),
Metadata: v.Metadata,
})
var version string
if v, ok := v.Metadata["version"]; ok{
version = v
}else{
version = "unknown"
}
s = &registry.Service{
Name: v.ServiceName,
Version: version,
Metadata: v.Metadata,
Nodes: nodes,
}
logger.Error("buildRegistryService : services]:[%v] ", s)
return
}
func (nw *nacosWatcher) Next() (r *registry.Result, err error) {
select {
case <-nw.exit:
return nil, registry.ErrWatcherStopped
case r, ok := <-nw.next:
if !ok {
return nil, registry.ErrWatcherStopped
}
return r, nil
}
// NOTE: This is a dead code path: e.g. it will never be reached
// as we return in all previous code paths never leading to this return
return nil, registry.ErrWatcherStopped
}
func (nw *nacosWatcher) Stop() {
select {
case <-nw.exit:
return
default:
close(nw.exit)
if len(nw.Doms) > 0 {
for _, v := range nw.Doms {
param := &vo.SubscribeParam{
ServiceName: v,
SubscribeCallback: nw.callBackHandle,
}
nw.client.Unsubscribe(param)
}
} else {
nw.client.Unsubscribe(nw.param)
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/tgodfather/go-plugins.git
git@gitee.com:tgodfather/go-plugins.git
tgodfather
go-plugins
go-plugins
efc0651a8291

搜索帮助