4 Star 5 Fork 4

Plato / Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
service_watcher.go 4.16 KB
一键复制 编辑 原始数据 按行查看 历史
CloudGuan 提交于 2022-12-09 09:04 . !4update: http 网关功能实现
package sbox
import (
"fmt"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/log"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/transport"
"gitee.com/dennis-kk/service-box-go/util/errors"
"sync"
)
const (
ServiceAdd = iota
ServiceClose
)
type (
//ServiceEvent 服务变化事件类型 ServiceAdd 添加 ServiceClose 关闭
ServiceEvent int
//ServiceWatcher 是服务监听回调函数类型
ServiceWatcher func(eType ServiceEvent, name string, host string, proxy idlrpc.IProxy)
//ProxyFinder 服务proxy获取接口
ProxyFinder func(uuid uint64, trans transport.ITransport) (idlrpc.IProxy, error)
WatcherList struct {
uuid uint64
watched map[string]ServiceWatcher
}
//WatcherManager 服务变化监听管理
WatcherManager struct {
finder ProxyFinder //proxy 获取函数
wg sync.WaitGroup //等待协程结束
rw sync.RWMutex //读写锁
logger log.ILogger //日志
watchersMap map[string]*WatcherList //watcher列表
}
)
func (w *WatcherManager) init(finder ProxyFinder, logger log.ILogger) {
w.watchersMap = make(map[string]*WatcherList)
w.wg = sync.WaitGroup{}
w.rw = sync.RWMutex{}
w.finder = finder
w.logger = logger
}
func (w *WatcherManager) shutdown() {
w.wg.Wait()
w.watchersMap = nil
}
//addWatcher 添加一个服务的监听事件,如果一个函数被重复添加会返回报错
func (w *WatcherManager) addWatcher(name string, uuid uint64, watcher ServiceWatcher) error {
w.rw.Lock()
defer w.rw.Unlock()
wg, ok := w.watchersMap[name]
if !ok {
wg = &WatcherList{
uuid: uuid,
watched: map[string]ServiceWatcher{},
}
w.watchersMap[name] = wg
}
//校验uuid似否正确
if wg.uuid != uuid {
return errors.RepeatedWatcher
}
//生成handle
handle := fmt.Sprintf("%v", watcher)
//去重
if _, ok := wg.watched[handle]; ok {
return errors.RepeatedWatcher
}
//加入注册
wg.watched[handle] = watcher
return nil
}
//removeWatcher 移除特定服务的特定监听
func (w *WatcherManager) removeWatcher(name string, watcher ServiceWatcher) {
w.rw.Lock()
defer w.rw.Unlock()
wg, ok := w.watchersMap[name]
if !ok {
return
}
//生成handle
handle := fmt.Sprintf("%v", watcher)
if _, ok := wg.watched[handle]; ok {
delete(wg.watched, handle)
}
//如果监听空了,清理对应的handle
if len(wg.watched) == 0 {
delete(w.watchersMap, name)
}
return
}
func (w *WatcherManager) onConnected(name string, trans *BoxChannel) {
//新连接到来,触发所有监听的回调函数,先在主线程做预检查,减少增加协程得开销
var uuid uint64
w.rw.RLock()
wg, ok := w.watchersMap[name]
if ok {
uuid = wg.uuid
}
w.rw.RUnlock()
if !ok {
return
}
w.wg.Add(1)
go w.notifyWatcherAdd(uuid, name, trans)
}
func (w *WatcherManager) onClose(name, host string) {
w.rw.RLock()
_, ok := w.watchersMap[name]
w.rw.RUnlock()
if !ok {
return
}
w.wg.Add(1)
go w.notifyWatcherClose(name, host)
}
//notifyWatcher 启动协程触发watcher 收到新proxy到达得通知
func (w *WatcherManager) notifyWatcherAdd(uuid uint64, name string, trans *BoxChannel) {
// 先获取proxy
proxy, err := w.finder(uuid, trans)
if err != nil {
w.logger.Warn("get service %d,%s proxy error!", uuid, name)
w.wg.Done()
return
}
w.rw.RLock()
defer func() {
//防止用户回调函数里面触发异常
if r := recover(); r != nil {
w.logger.Error("service %s watcher panic %v", name, r)
}
w.rw.RUnlock()
w.wg.Done()
}()
// 触发用户回调
wg, ok := w.watchersMap[name]
if !ok {
return
}
for _, cb := range wg.watched {
cb(ServiceAdd, name, trans.RemoteAddr(), proxy)
}
return
}
//notifyWatcherDelete 通知watcher 有proxy失效
func (w *WatcherManager) notifyWatcherClose(name string, host string) {
w.rw.RLock()
defer func() {
//防止用户回调函数里面触发异常
if r := recover(); r != nil {
w.logger.Error("notify service %s watcher delete panic %v", name, r)
}
w.rw.RUnlock()
w.wg.Done()
}()
// 触发用户回调
wg, ok := w.watchersMap[name]
if !ok {
return
}
for _, cb := range wg.watched {
cb(ServiceClose, name, host, nil)
}
return
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.17

搜索帮助

344bd9b3 5694891 D2dac590 5694891