代码拉取完成,页面将自动刷新
package serverMgr
import (
"context"
"encoding/json"
"fmt"
"time"
"gitee.com/night-tc/gobige/global"
"gitee.com/night-tc/gobige/logger"
"gitee.com/night-tc/gobige/threads"
)
/*
使用ETCD进行服务发现
*/
type DiscoveryMgr struct {
srvinfo IServerInfo //本服务器信息
thgo *threads.ThreadGo //协程管理器
ext IExt //外部对象,一般是Service
//上报状态时间
refreshTime time.Duration
//默认负载均衡器
defmap IServerMap
//监听对应的负载均衡器
watchmap map[string]IServerMap
}
func NewDiscoveryMgr(thgo *threads.ThreadGo, ext IExt) *DiscoveryMgr {
result := new(DiscoveryMgr)
result.refreshTime = 10 * time.Second
result.ext = ext
// result.dsync = new(sync.Once)
// result.serverMapRW = new(sync.RWMutex)
result.defmap = NewServerMapByRand()
result.defmap.SetExt(ext)
result.watchmap = make(map[string]IServerMap)
if thgo == nil {
result.thgo = threads.NewThreadGo()
} else {
result.thgo = threads.NewThreadGoByGo(thgo)
}
return result
}
// 开始注册
func (mgr *DiscoveryMgr) Start() (err error) {
//为etcd设置出问题时的函数
//这个过程 应该在ETCD开始监听之前就要设置好。不然会有安全问题
// if !global.GetCfgInst().GetDebug() {
// etcd.SetEtcdTimeoutFunc(mgr.ClearServerInfoAll)(mgr.ext.GetEtcd())
// }
mgr.ext.GetEtcd().SetReFunc(mgr.register)
err = mgr.register()
if err != nil {
return err
}
mgr.registerLoad()
data, _ := json.Marshal(mgr.srvinfo)
logger.Infof("discover start. %s", string(data))
//load属性的更新,把服务注册方法,写入etcd,让系统可以在租约过期的时候,重新上去
return nil
}
func (mgr *DiscoveryMgr) register() error {
mgr.srvinfo = mgr.ext.GetSelfInfo()
server := mgr.srvinfo
if server == nil {
return nil
}
key := GetPath(server.GetGroupID(), server.GetSrvType(), server.GetServerID())
val, _ := json.Marshal(server)
if err := mgr.ext.GetEtcd().PutNX(key, string(val)); err != nil {
return err
}
return nil
}
// load属性的更新,系统默认会5秒上报一次load
func (mgr *DiscoveryMgr) registerLoad() {
ctrl := global.GetSrvInst()
server := mgr.srvinfo
key := GetPathLoad(server.GetGroupID(), server.GetSrvType(), server.GetServerID())
// keygateway := GetGateWayPathLoad(server.Group, server.ServerID)
mgr.thgo.Go(func(ctx context.Context) {
tk := time.NewTicker(mgr.refreshTime)
defer tk.Stop()
for {
select {
case <-ctx.Done():
return
case <-tk.C:
val := fmt.Sprint(ctrl.GetLoad())
_ = mgr.ext.GetEtcd().Put(key, val)
}
}
})
}
// 主动上报Load
func (mgr *DiscoveryMgr) UpLoad() {
ctrl := global.GetSrvInst()
server := mgr.srvinfo
key := GetPathLoad(server.GetGroupID(), server.GetSrvType(), server.GetServerID())
val := fmt.Sprint(ctrl.GetLoad())
_ = mgr.ext.GetEtcd().Put(key, val)
}
// 更新服务器状态
func (mgr *DiscoveryMgr) UpStatus(status int32) {
server := mgr.srvinfo
key := GetPathStatus(server.GetGroupID(), server.GetSrvType(), server.GetServerID())
val := fmt.Sprint(status)
_ = mgr.ext.GetEtcd().Put(key, val)
}
// 监听指定服务器类型下的指定分组
func (mgr *DiscoveryMgr) Watch(stype, groupid uint32, smap IServerMap) {
//监听一般在服务器启动的时候,开起来就好了,后面不会再有操作了
if smap != nil {
smap.SetExt(mgr.ext)
}
watchkey := GetWatchPath(groupid, stype)
if groupid == global.GroupGlobalID {
watchkey = GetWatchByType(stype)
}
mapkey := GetWatchByType(stype)
if _, ok := mgr.watchmap[mapkey]; ok {
//因为已有自定义监听了,所以不再进行监听了。
if smap != nil {
logger.Errorf("Duplicate watch key:%s", watchkey)
}
return
}
mgr.watchWithKey(smap, watchkey, mapkey)
}
func (mgr *DiscoveryMgr) watchWithKey(smap IServerMap, watchkey, mapkey string) {
if smap == nil {
smap = mgr.defmap
}
mgr.watchmap[mapkey] = smap
mgr.ext.GetEtcd().WatchPrefix(watchkey, smap.WatchCallDelay, smap.LoadWatchAll)
// smap.LoadWatchAll(watchkey)
mgr.ext.GetEtcd().SetReFunc(func() error {
smap.LoadWatchAll(watchkey)
return nil
})
}
// 监听所有group下的指定服务器类型
func (mgr *DiscoveryMgr) WatchByStype(stype global.ServerTypeEnum, smap IServerMap) {
//监听一般在服务器启动的时候,开起来就好了,后面不会再有操作了
if smap != nil {
smap.SetExt(mgr.ext)
}
key := GetWatchByType(stype)
if _, ok := mgr.watchmap[key]; ok {
//因为已有自定义监听了,所以不再进行监听了。
if smap != nil {
logger.Error("Duplicate watch key:", key)
}
return
}
mgr.watchWithKey(smap, key, key)
}
// 根据负载逻辑,获取服务器
func (mgr *DiscoveryMgr) GetServerByType(sGroup uint32, sType global.ServerTypeEnum) (result IServerInfo, err error) {
// key := GetWatchPath(sGroup, sType)
watchkey := GetWatchByType(sType)
if smap, ok := mgr.watchmap[watchkey]; ok {
return smap.GetBalancing(sType, sGroup)
} else {
return mgr.defmap.GetBalancing(sType, sGroup)
}
}
// 根据负载逻辑,获取服务器
func (mgr *DiscoveryMgr) GetServerList(sType global.ServerTypeEnum, sGroup uint32) ([]IServerInfo, error) {
watchkey := GetWatchByType(sType)
// key := GetWatchPath(sGroup, sType)
if smap, ok := mgr.watchmap[watchkey]; ok {
return smap.GetListByTypeAGroup(sType, sGroup)
} else {
return mgr.defmap.GetListByTypeAGroup(sType, sGroup)
}
}
// GetServerById 根据服务器类型和ID取服务器
func (mgr *DiscoveryMgr) GetServerById(groupid uint32, stype global.ServerTypeEnum, serverId uint64) (IServerInfo, error) {
watchkey := GetWatchByType(stype)
// key := GetWatchPath(groupid, stype)
if smap, ok := mgr.watchmap[watchkey]; ok {
return smap.GetServerByID(groupid, stype, serverId)
} else {
return mgr.defmap.GetServerByID(groupid, stype, serverId)
}
}
// 清空服务器信息,因为可能是脏数据
func (mgr *DiscoveryMgr) ClearServerInfoAll() {
logger.Warn("Etcd leaseID timeout. DiscoveryMgr.ClearServerInfoAll.")
mgr.defmap.DeleteAll()
for _, smap := range mgr.watchmap {
smap.DeleteAll()
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。