1 Star 1 Fork 0

jawide/go-micro-nacos

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
watcher.go 4.33 KB
一键复制 编辑 原始数据 按行查看 历史
jawide 提交于 2021-11-24 01:58 . init
package nacos
import (
"fmt"
"net"
"reflect"
"sync"
"github.com/asim/go-micro/v3/logger"
"github.com/asim/go-micro/v3/registry"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
)
type nacosWatcher struct {
n *nacosRegistry
wo registry.WatchOptions
next chan *registry.Result
exit chan bool
sync.RWMutex
services map[string][]*registry.Service
cacheServices map[string][]model.SubscribeService
param *vo.SubscribeParam
Doms []string
}
func NewNacosWatcher(nr *nacosRegistry, opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
nw := nacosWatcher{
n: nr,
wo: wo,
exit: make(chan bool),
next: make(chan *registry.Result, 10),
services: make(map[string][]*registry.Service),
cacheServices: make(map[string][]model.SubscribeService),
param: new(vo.SubscribeParam),
Doms: make([]string, 0),
}
withContext := false
if wo.Context != nil {
if p, ok := wo.Context.Value("subscribe_param").(vo.SubscribeParam); ok {
nw.param = &p
withContext = ok
nw.param.SubscribeCallback = nw.callBackHandle
go nr.client.Subscribe(nw.param)
}
}
if !withContext {
param := vo.GetAllServiceInfoParam{}
services, err := nr.client.GetAllServicesInfo(param)
if err != nil {
return nil, err
}
param.PageNo = 1
param.PageSize = uint32(services.Count)
services, err = nr.client.GetAllServicesInfo(param)
if err != nil {
return nil, err
}
nw.Doms = services.Doms
for _, v := range nw.Doms {
param := &vo.SubscribeParam{
ServiceName: v,
SubscribeCallback: nw.callBackHandle,
}
go nr.client.Subscribe(param)
}
}
return &nw, nil
}
func (nw *nacosWatcher) callBackHandle(services []model.SubscribeService, err error) {
if err != nil {
logger.Error("nacos watcher call back handle error:%v", err)
return
}
serviceName := services[0].ServiceName
if nw.cacheServices[serviceName] == nil {
nw.Lock()
nw.cacheServices[serviceName] = services
nw.Unlock()
for _, v := range services {
nw.next <- &registry.Result{Action: "create", Service: buildRegistryService(&v)}
return
}
} else {
for _, subscribeService := range services {
create := true
for _, cacheService := range nw.cacheServices[serviceName] {
if subscribeService.InstanceId == cacheService.InstanceId {
if !reflect.DeepEqual(subscribeService, cacheService) {
//update instance
nw.next <- &registry.Result{Action: "update", Service: buildRegistryService(&subscribeService)}
return
}
create = false
}
}
//new instance
if create {
nw.next <- &registry.Result{Action: "create", Service: buildRegistryService(&subscribeService)}
nw.Lock()
nw.cacheServices[serviceName] = append(nw.cacheServices[serviceName], subscribeService)
nw.Unlock()
return
}
}
for index, cacheService := range nw.cacheServices[serviceName] {
del := true
for _, subscribeService := range services {
if subscribeService.InstanceId == cacheService.InstanceId {
del = false
}
}
if del {
nw.next <- &registry.Result{Action: "delete", Service: buildRegistryService(&cacheService)}
nw.Lock()
nw.cacheServices[serviceName][index] = model.SubscribeService{}
nw.Unlock()
return
}
}
}
}
func buildRegistryService(v *model.SubscribeService) (s *registry.Service) {
nodes := make([]*registry.Node, 0)
nodes = append(nodes, &registry.Node{
Id: v.InstanceId,
Address: net.JoinHostPort(v.Ip, fmt.Sprintf("%d", v.Port)),
Metadata: v.Metadata,
})
s = &registry.Service{
Name: v.ServiceName,
Version: "latest",
Metadata: v.Metadata,
Nodes: nodes,
}
return
}
func (nw *nacosWatcher) Next() (r *registry.Result, err error) {
select {
case <-nw.exit:
return nil, registry.ErrWatcherStopped
case r, ok := <-nw.next:
if !ok {
return nil, registry.ErrWatcherStopped
}
return r, nil
}
}
func (nw *nacosWatcher) Stop() {
select {
case <-nw.exit:
return
default:
close(nw.exit)
if len(nw.Doms) > 0 {
for _, v := range nw.Doms {
param := &vo.SubscribeParam{
ServiceName: v,
SubscribeCallback: nw.callBackHandle,
}
nw.n.client.Unsubscribe(param)
}
} else {
nw.n.client.Unsubscribe(nw.param)
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/jawide/go-micro-nacos.git
git@gitee.com:jawide/go-micro-nacos.git
jawide
go-micro-nacos
go-micro-nacos
main

搜索帮助