1 Star 1 Fork 0

凡卡/libp2parea

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
manager.go 17.30 KB
一键复制 编辑 原始数据 按行查看 历史
凡卡 提交于 2023-11-29 11:01 . first commit
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
package virtual_node
import (
"bytes"
"context"
"sync"
"gitee.com/prestonTao/libp2parea/config"
"gitee.com/prestonTao/libp2parea/nodeStore"
"gitee.com/prestonTao/utils"
)
type VnodeManager struct {
lock *sync.RWMutex //
DiscoverVnodes *Vnode //每个节点启动一个发现者虚拟节点,用于保存逻辑节点
VnodeMap map[uint64]*Vnode //保存自己的虚拟节点key:uint64=index;value:*Vnode=自己的虚拟节点。
clientVnode *sync.Map //所有被连接的逻辑节点 key:string=AddressNetExtend;value:Vnodeinfo=;
findNearVnodeChan chan *[]*FindVnodeVO //需要查找的虚拟节点
nodeManager *nodeStore.NodeManager //
contextRoot context.Context //
autonomyFinish AutonomyFinish
isClose bool //
closeVnodeChan chan *Vnodeinfo //删除虚拟逻辑节点的广播管道
}
func NewVnodeManager(nodeManager *nodeStore.NodeManager, c context.Context, af AutonomyFinish) *VnodeManager {
vm := &VnodeManager{
lock: new(sync.RWMutex),
VnodeMap: make(map[uint64]*Vnode),
clientVnode: new(sync.Map),
findNearVnodeChan: make(chan *[]*FindVnodeVO, 1000),
nodeManager: nodeManager,
contextRoot: c,
autonomyFinish: af,
closeVnodeChan: make(chan *Vnodeinfo, 100),
}
//添加一个发现者虚拟节点,用于发现其他虚拟节点,以及保存逻辑节点
vm.DiscoverVnodes = NewVnode(0, nodeManager.NodeSelf.IdInfo.Id, vm.findNearVnodeChan, vm.contextRoot, af, vm.closeVnodeChan)
vm.VnodeMap[0] = vm.DiscoverVnodes
return vm
}
/*
关闭虚拟节点管理功能
*/
func (this *VnodeManager) IsClose() {
this.isClose = true
}
/*
打开虚拟节点管理功能
*/
func (this *VnodeManager) IsOpen() {
this.isClose = false
}
/*
虚拟节点管理读锁 锁操作
*/
func (this *VnodeManager) RLock() {
this.lock.RLock()
}
/*
虚拟节点管理读锁 解锁操作
*/
func (this *VnodeManager) RUnlock() {
this.lock.RUnlock()
}
/*
等待虚拟节点网络自治完成
*/
func (this *VnodeManager) WaitAutonomyFinish() {
if this.isClose {
return
}
this.DiscoverVnodes.WaitAutonomyFinish()
this.lock.RLock()
vnodesSelf := make([]*Vnode, 0, len(this.VnodeMap))
for _, v := range this.VnodeMap {
vnodesSelf = append(vnodesSelf, v)
}
this.lock.RUnlock()
for _, one := range vnodesSelf {
one.WaitAutonomyFinish()
}
}
/*
获取查询vnode管道
*/
func (this *VnodeManager) GetFindVnodeChan() chan *[]*FindVnodeVO {
return this.findNearVnodeChan
}
/*
添加一个发现者虚拟节点,用于发现其他虚拟节点,以及保存逻辑节点
*/
func (this *VnodeManager) GetVnodeDiscover() (discoverVnode *Vnode) {
this.lock.Lock()
discoverVnode = this.DiscoverVnodes
this.lock.Unlock()
return
}
/*
添加一个虚拟节点
*/
func (this *VnodeManager) AddVnode() Vnodeinfo {
this.lock.Lock()
defer this.lock.Unlock()
//找到不连续的index添加
index := uint64(0)
ok := true
for ok {
index++
_, ok = this.VnodeMap[index]
}
nodeOne := NewVnode(index, this.nodeManager.NodeSelf.IdInfo.Id, this.findNearVnodeChan, this.contextRoot, this.autonomyFinish, this.closeVnodeChan)
// this.Vnodes = append(this.Vnodes, *nodeOne)
this.VnodeMap[index] = nodeOne
return nodeOne.Vnode
}
/*
添加一个指定下标的虚拟节点
*/
func (this *VnodeManager) AddVnodeByIndex(index uint64) *Vnodeinfo {
this.lock.Lock()
defer this.lock.Unlock()
if vNode, ok := this.VnodeMap[index]; ok && vNode != nil {
return &vNode.Vnode
}
nodeOne := NewVnode(index, this.nodeManager.NodeSelf.IdInfo.Id, this.findNearVnodeChan, this.contextRoot, this.autonomyFinish, this.closeVnodeChan)
// this.Vnodes = append(this.Vnodes, *nodeOne)
this.VnodeMap[index] = nodeOne
//添加虚拟节点后,触发它寻找逻辑节点
nCurVnodeLen := len(this.VnodeMap)
if nCurVnodeLen > 1 {
vnodes := make([]Vnodeinfo, 0, len(this.VnodeMap)-1)
for _, vnode := range this.VnodeMap {
if bytes.Equal(vnode.Vnode.Vid, nodeOne.Vnode.Vid) {
continue
}
vnodes = append(vnodes, vnode.Vnode)
}
nodeOne.AddLogicVnodeinfosCheckChange(&vnodes)
}
return &nodeOne.Vnode
}
/*
删除一个指定下标的虚拟节点
*/
func (this *VnodeManager) DelVnodeByIndex(index uint64) {
this.lock.Lock()
defer this.lock.Unlock()
// nodeOne := NewVnode(index, this.nodeManager.NodeSelf.IdInfo.Id, this.findNearVnodeChan, this.contextRoot)
// this.Vnodes = append(this.Vnodes, *nodeOne)
if vnode, ok := this.VnodeMap[index]; ok {
vnode.Destroy()
}
delete(this.VnodeMap, index)
}
/*
* 指定删除一个自己创建的虚拟地址
* 同时删除本节点其他vnode中存此地址的信息
*/
func (this *VnodeManager) DelSelfVnodeByAddr(vid nodeStore.AddressNet) {
this.lock.Lock()
defer this.lock.Unlock()
for i, _ := range this.VnodeMap {
if bytes.Equal(this.VnodeMap[i].Vnode.Vid, vid) && this.VnodeMap[i].Vnode.Index != 0 {
this.VnodeMap[i].Destroy()
delete(this.VnodeMap, i)
} else {
this.VnodeMap[i].VnodeDelAddr(AddressNetExtend(vid))
}
}
}
/*
* 指定删除一个在up down排序中的别人的虚拟地址
*/
func (this *VnodeManager) DelUpdownVnodeByAddr(vid AddressNetExtend) {
//检查,不许删除自己创建的虚拟地址
if this.FindInVnodeSelf(vid) {
return
}
//删发现节点的逻辑节点
this.DiscoverVnodes.LogicalNode.Range(func(k, v interface{}) bool {
vnodeinfo, ok := v.(Vnodeinfo)
if !ok {
return false
}
if bytes.Equal(vnodeinfo.Vid, vid) {
this.DiscoverVnodes.LogicalNode.Delete(utils.Bytes2string(vid))
}
return true
})
//删clientVnode中的vid信息
this.clientVnode.Delete(utils.Bytes2string(vid))
this.lock.RLock()
defer this.lock.RUnlock()
//删本节点其他虚拟节点中的vid信息
for i, _ := range this.VnodeMap {
this.VnodeMap[i].VnodeDelAddr(vid)
}
}
/*
删除一个虚拟节点
*/
func (this *VnodeManager) DelVnode() (vnodeinfo *Vnodeinfo) {
this.lock.Lock()
defer this.lock.Unlock()
//找到不连续的index添加
index := uint64(0)
ok := true
for ok {
index++
_, ok = this.VnodeMap[index]
}
if vnode, ok := this.VnodeMap[index]; ok {
vnodeinfo = &vnode.Vnode
vnode.Destroy()
}
delete(this.VnodeMap, index)
// index := uint64(len(this.Vnodes))
// newvnodes := make([]Vnode, 0)
// for i, _ := range this.Vnodes {
// if uint64(i+1) >= index {
// nodeinfo = this.Vnodes[i].Vnode
// break
// }
// newvnodes = append(newvnodes, this.Vnodes[i])
// }
// this.Vnodes = newvnodes
return
}
/*
调整云存储大小,多了的就减少,少了的就增加。
*/
func (this *VnodeManager) SetupVnodeNumber(n uint64) {
// if n <= 0 {
// return
// }
this.lock.Lock()
defer this.lock.Unlock()
//空间大小合适,不需要调整
if uint64(len(this.VnodeMap)) == n {
return
}
//空间太大,需要减少空间
if uint64(len(this.VnodeMap)) > n {
count := uint64(len(this.VnodeMap)) - n
//随机删除
for k, _ := range this.VnodeMap {
if count == 0 {
break
}
count--
if vnode, ok := this.VnodeMap[k]; ok {
vnode.Destroy()
}
delete(this.VnodeMap, k)
}
// newvnodes := make([]Vnode, 0)
// for i, _ := range this.Vnodes {
// if uint64(i+1) > n {
// break
// }
// newvnodes = append(newvnodes, this.Vnodes[i])
// delete(this.VnodeMap, this.Vnodes[i].Vnode.Index)
// }
// this.Vnodes = newvnodes
} else {
//空间太小,需要增加空间。
//找到不连续的index添加
for i := uint64(len(this.VnodeMap)); i < n; i++ {
index := uint64(0)
ok := true
for ok {
index++
_, ok = this.VnodeMap[index]
}
nodeOne := NewVnode(index, this.nodeManager.NodeSelf.IdInfo.Id, this.findNearVnodeChan, this.contextRoot, this.autonomyFinish, this.closeVnodeChan)
// this.Vnodes = append(this.Vnodes, *nodeOne)
this.VnodeMap[index] = nodeOne
}
// for i := uint64(len(this.Vnodes)); i < n; i++ {
// nodeOne := NewVnode(i, this.nodeManager.NodeSelf.IdInfo.Id, this.findNearVnodeChan, this.contextRoot)
// this.Vnodes = append(this.Vnodes, *nodeOne)
// this.VnodeMap[i] = nodeOne
// }
}
}
/*
查询扩展的虚拟节点数量
*/
func (this *VnodeManager) GetVnodeNumber() []Vnodeinfo {
this.lock.RLock()
vnodeinfo := make([]Vnodeinfo, 0, len(this.VnodeMap))
for _, one := range this.VnodeMap {
if one.Vnode.Index == 0 {
continue
}
vnodeinfo = append(vnodeinfo, one.Vnode)
}
// vnodeinfo := make([]Vnodeinfo, 0, len(this.Vnodes))
// for _, one := range this.Vnodes {
// vnodeinfo = append(vnodeinfo, one.Vnode)
// }
this.lock.RUnlock()
return vnodeinfo
}
/*
添加虚拟节点的逻辑节点
@return bool 是否有改变true=有改变;false=无改变。
*/
func (this *VnodeManager) AddLogicVnodeinfo(vnodes ...Vnodeinfo) bool {
this.lock.RLock()
defer this.lock.RUnlock()
change := this.DiscoverVnodes.AddLogicVnodeinfosCheckChange(&vnodes)
for _, one := range this.VnodeMap {
//有一个改变,则返回有改变
if one.AddLogicVnodeinfosCheckChange(&vnodes) {
change = true
}
}
return change
}
/*
添加被连接的虚拟节点
*/
func (this *VnodeManager) AddClientVnodeinfo(vnode Vnodeinfo) {
// engine.Log.Info("vnodeinfo:%+v", vnode)
vnodeinfoTimeout := CreateVnodeinfoTimeout(&vnode)
vinfotItr, ok := this.clientVnode.LoadOrStore(utils.Bytes2string(vnode.Vid), vnodeinfoTimeout)
if ok {
vnodeinfoTimeout = vinfotItr.(*VnodeinfoTimeout)
vnodeinfoTimeout.FlashTime()
}
}
/*
添加被连接的虚拟节点
*/
func (this *VnodeManager) GetClientVnodeinfo() []*Vnodeinfo {
vnodes := make([]*Vnodeinfo, 0)
this.clientVnode.Range(func(k, v interface{}) bool {
vnodeinfoOne, ok := v.(*VnodeinfoTimeout)
if !ok {
return false
}
if vnodeinfoOne.CheckTimeout(config.VNODE_heartbeat_timeout) {
return true
}
vnodes = append(vnodes, &vnodeinfoOne.Vnodeinfo)
return true
})
return vnodes
}
/*
* 根据真实节点删除客户端连接信息
*/
func (this *VnodeManager) DelClientVnodeinfo(nid nodeStore.AddressNet) {
this.clientVnode.Range(func(k, v interface{}) bool {
name, ok := k.(string)
if !ok {
return false
}
vnodeinfoOnde := v.(*VnodeinfoTimeout)
if bytes.Equal(vnodeinfoOnde.Nid, nid) {
this.clientVnode.Delete(name)
}
return true
})
}
/*
添加虚拟节点的多个逻辑节点
*/
func (this *VnodeManager) AddLogicVnodeinfos(nodeVid AddressNetExtend, vnodes *[]Vnodeinfo) (change bool) {
this.lock.RLock()
defer this.lock.RUnlock()
if bytes.Equal(this.DiscoverVnodes.Vnode.Nid, nodeVid) {
// engine.Log.Info("给发现节点添加逻辑节点:%s", nodeVid.B58String())
if !this.DiscoverVnodes.AddLogicVnodeinfosCheckChange(vnodes) {
// engine.Log.Info("自治完成:%s", this.DiscoverVnodes.Vnode.Nid.B58String())
//如果添加虚拟逻辑节点,未改变,则自治完成。
this.DiscoverVnodes.SetAutonomyFinish()
}
for _, one := range this.VnodeMap {
one.AddLogicVnodeinfosCheckChange(vnodes)
}
} else {
// engine.Log.Info("给虚拟节点添加逻辑节点:%s", nodeVid.B58String())
this.DiscoverVnodes.AddLogicVnodeinfosCheckChange(vnodes)
for _, one := range this.VnodeMap {
if bytes.Equal(one.Vnode.Vid, nodeVid) {
if !one.AddLogicVnodeinfosCheckChange(vnodes) {
// engine.Log.Info("自治完成:%s", this.DiscoverVnodes.Vnode.Nid.B58String())
//如果添加虚拟逻辑节点,未改变,则自治完成。
one.SetAutonomyFinish()
}
} else {
one.AddLogicVnodeinfosCheckChange(vnodes)
}
}
}
// ok = this.DiscoverVnodes.AddLogicVnodeinfos(vnodes)
// for _, one := range this.VnodeMap {
// if success := one.AddLogicVnodeinfos(vnodes); success {
// ok = true
// }
// }
return
}
/*
获得所有节点,包括自己节点
*/
func (this *VnodeManager) GetVnodeAll() map[string]Vnodeinfo {
vnodeinfoMap := make(map[string]Vnodeinfo)
vnodeinfoMap[utils.Bytes2string(this.DiscoverVnodes.Vnode.Vid)] = this.DiscoverVnodes.Vnode
this.DiscoverVnodes.LogicalNode.Range(func(k, v interface{}) bool {
vnodeinfo, ok := v.(Vnodeinfo)
if !ok {
return false
}
vnodeinfoMap[utils.Bytes2string(vnodeinfo.Vid)] = vnodeinfo
return true
})
this.lock.RLock()
defer this.lock.RUnlock()
for _, one := range this.VnodeMap {
if one.Vnode.Index == 0 {
continue
}
selfOne := one.GetSelfVnodeinfo()
vnodeinfoMap[utils.Bytes2string(selfOne.Vid)] = selfOne
one.LogicalNode.Range(func(k, v interface{}) bool {
vnodeinfo, ok := v.(Vnodeinfo)
if !ok {
return false
}
vnodeinfoMap[utils.Bytes2string(vnodeinfo.Vid)] = vnodeinfo
return true
})
}
//删除自己节点
// for _, one := range this.Vnodes {
// delete(vnodeinfoMap, utils.Bytes2string(one.Vnode.Vid))
// }
return vnodeinfoMap
}
/*
获得自己管理的节点info
*/
func (this *VnodeManager) GetVnodeSelf() []Vnodeinfo {
this.lock.RLock()
defer this.lock.RUnlock()
vnodeinfo := make([]Vnodeinfo, 0, len(this.VnodeMap)+1)
vnodeinfo = append(vnodeinfo, this.DiscoverVnodes.Vnode)
for _, one := range this.VnodeMap {
if one.Vnode.Index == 0 {
continue
}
vnodeinfo = append(vnodeinfo, one.Vnode)
}
return vnodeinfo
}
/*
获得自己管理的节点
*/
func (this *VnodeManager) FindVnodeInSelf(nodeVid AddressNetExtend) *Vnode {
this.lock.RLock()
defer this.lock.RUnlock()
if bytes.Equal(this.DiscoverVnodes.Vnode.Vid, nodeVid) {
return this.DiscoverVnodes
}
for _, vnode := range this.VnodeMap {
if bytes.Equal(vnode.Vnode.Vid, nodeVid) {
return vnode
}
}
return nil
}
/*
在逻辑节点中查找Vnodeinfo
*/
func (this *VnodeManager) FindVnodeinfo(vid AddressNetExtend) *Vnodeinfo {
this.lock.RLock()
defer this.lock.RUnlock()
if bytes.Equal(vid, this.DiscoverVnodes.Vnode.Vid) {
return &this.DiscoverVnodes.Vnode
}
v, ok := this.DiscoverVnodes.LogicalNode.Load(utils.Bytes2string(vid))
if ok {
vnodeinfo := v.(Vnodeinfo)
return &vnodeinfo
}
for _, one := range this.VnodeMap {
vnodeinfo := one.FindVnodeinfo(vid)
if vnodeinfo == nil {
continue
}
return vnodeinfo
}
return nil
}
/*
查找节点id是否是自己的节点
@return bool 是否在
*/
func (this *VnodeManager) FindInVnodeSelf(id AddressNetExtend) bool {
// if bytes.Equal(id, this.DiscoverVnodes.Vnode.Vid) {
// return true
// }
for _, one := range this.GetVnodeSelf() {
if bytes.Equal(one.Vid, id) {
return true
}
}
return false
}
/*
查找自己一个虚拟节点的逻辑节点
*/
func (this *VnodeManager) FindLogicInVnodeSelf(id AddressNetExtend) []Vnodeinfo {
vnodeinfos := make([]Vnodeinfo, 0)
if bytes.Equal(id, this.DiscoverVnodes.Vnode.Vid) {
this.DiscoverVnodes.LogicalNode.Range(func(k, v interface{}) bool {
vnodeinfo, ok := v.(Vnodeinfo)
if !ok {
return false
}
vnodeinfos = append(vnodeinfos, vnodeinfo)
return true
})
return vnodeinfos
}
for _, one := range this.GetVnodeSelf() {
if bytes.Equal(one.Vid, id) {
this.DiscoverVnodes.LogicalNode.Range(func(k, v interface{}) bool {
vnodeinfo, ok := v.(Vnodeinfo)
if !ok {
return false
}
vnodeinfos = append(vnodeinfos, vnodeinfo)
return true
})
return vnodeinfos
}
}
return vnodeinfos
}
/*
查找节点id是否是自己的节点
@return bool 是否在
*/
func (this *VnodeManager) FindInVnodeinfoSelf(id AddressNetExtend) *Vnodeinfo {
// if bytes.Equal(id, this.DiscoverVnodes.Vnode.Vid) {
// return &this.DiscoverVnodes.Vnode
// }
for i, one := range this.GetVnodeSelf() {
if bytes.Equal(one.Vid, id) {
return &this.GetVnodeSelf()[i]
}
}
return nil
}
/*
查找节点id是否是自己的节点
@return bool 是否在
*/
func (this *VnodeManager) FindVnodeInAllSelf(id nodeStore.AddressNet) *Vnode {
// if bytes.Equal(id, this.DiscoverVnodes.Vnode.Vid) {
// return &this.DiscoverVnodes.Vnode
// }
this.lock.Lock()
defer this.lock.Unlock()
for i, one := range this.VnodeMap {
if bytes.Equal(one.Vnode.Vid, id) {
return this.VnodeMap[i]
}
}
return nil
}
/*
关闭所有虚拟节点
*/
func (this *VnodeManager) Close() {
}
/*
获取关闭vnode管道
*/
func (this *VnodeManager) GetCloseVnodeChan() chan *Vnodeinfo {
return this.closeVnodeChan
}
/*
* 检查是否包含指定的真实节点信息
*/
func (this *VnodeManager) CheckNodeinfoExistInSelf(nid nodeStore.AddressNet) bool {
this.lock.RLock()
defer this.lock.RUnlock()
if bytes.Equal(nid, this.DiscoverVnodes.Vnode.Nid) {
return true
}
exist := this.DiscoverVnodes.CheckVnodeinfoExistByNid(nid)
if exist {
return true
}
for _, one := range this.VnodeMap {
exist = one.CheckVnodeinfoExistByNid(nid)
if exist {
return true
}
}
return false
}
/*
* 检查是否是逻辑节点,虚拟节点,自己虚拟节点up down中是否包含
*/
func (this *VnodeManager) IsSelfVnodeNeed(nid nodeStore.AddressNet) bool {
this.lock.RLock()
defer this.lock.RUnlock()
if bytes.Equal(nid, this.DiscoverVnodes.Vnode.Nid) {
return true
}
exist := this.DiscoverVnodes.CheckVnodeinfoExistByNid(nid)
if exist {
return true
}
for _, one := range this.VnodeMap {
exist = one.CheckVnodeinfoExistByNid(nid)
if exist {
return true
}
for _, one := range one.GetOnebyoneVnodeInfo() {
if bytes.Equal(one.Nid, nid) || bytes.Equal(one.Vid, nid) {
return true
}
}
}
return false
}
/*
* 通过一个指定下标获取虚拟节点地址
*
* @param index uint64 下标
* @return vid AddressNetExtend 节点地址
*/
func (this *VnodeManager) GetVnodeIdByIndex(index uint64) *AddressNetExtend {
vnodeInfo := BuildNodeinfo(index, this.nodeManager.NodeSelf.IdInfo.Id)
return &vnodeInfo.Vid
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/prestonTao/libp2parea.git
git@gitee.com:prestonTao/libp2parea.git
prestonTao
libp2parea
libp2parea
3aaa451ef873

搜索帮助