Ai
1 Star 1 Fork 0

凡卡/libp2parea

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sessionStore.go 16.24 KB
一键复制 编辑 原始数据 按行查看 历史
凡卡 提交于 2023-11-29 11:01 +08:00 . first commit
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
package engine
import (
"context"
"crypto/tls"
"sync"
"time"
"gitee.com/prestonTao/utils"
"github.com/quic-go/quic-go"
)
type sessionBase struct {
name string
attrbutes *sync.Map //
machineID string // 机器Id
areaName string // 区域名
setGodTime int64 // 被设置为超级代理的时间
connType uint8 // 连接类型
}
func (this *sessionBase) Set(name string, value interface{}) {
this.attrbutes.Store(name, value)
}
func (this *sessionBase) Get(name string) interface{} {
v, _ := this.attrbutes.Load(name)
return v
}
func (this *sessionBase) GetName() string {
return this.name
}
func (this *sessionBase) SetName(name string) {
this.name = name
}
func (this sessionBase) GetConnType() string {
if this.connType == CONN_TYPE_TCP {
return "TCP"
} else if this.connType == CONN_TYPE_QUIC {
return "QUIC"
}
return "UNKNOWN"
}
func (this *sessionBase) Close() {}
type Session interface {
GetIndex() uint64
Send(msgID uint64, data, datapuls *[]byte, timeout time.Duration) error
Close()
Set(name string, value interface{})
Get(name string) interface{}
GetName() string
SetName(name string)
GetRemoteHost() string
GetMachineID() string
GetAreaName() string
GetSetGodTime() int64
GetConnType() string
}
type sessionStore struct {
lock *sync.RWMutex
indexMax uint64 //
sessionByIndex map[uint64]Session //
customNameStore map[string]*utils.SyncList //
upSessionByIndex map[string]map[uint64]Session
downSessionByIndex map[string]map[uint64]Session
}
/*
* 只添加session到customNameStore和upSession
*/
func (this *sessionStore) addOnlyUpSession(customName string, session Session, areaName []byte) {
this.addUpSession(areaName, session)
this.lock.Lock()
slist, ok := this.customNameStore[customName]
if ok && slist != nil {
slist.Add(session)
} else {
slist := utils.NewSyncList()
slist.Add(session)
this.customNameStore[customName] = slist
}
if _, ok := this.sessionByIndex[session.GetIndex()]; !ok {
this.sessionByIndex[session.GetIndex()] = session
}
this.lock.Unlock()
}
/*
* 只添加session到customNameStore和downSession
*/
func (this *sessionStore) addOnlyDownSession(customName string, session Session, areaName []byte) {
this.addDownSession(areaName, session)
this.lock.Lock()
slist, ok := this.customNameStore[customName]
if ok && slist != nil {
slist.Add(session)
} else {
slist := utils.NewSyncList()
slist.Add(session)
this.customNameStore[customName] = slist
}
if _, ok := this.sessionByIndex[session.GetIndex()]; !ok {
this.sessionByIndex[session.GetIndex()] = session
}
this.lock.Unlock()
}
/*
* 添加session
*/
func (this *sessionStore) addSession(customName string, session Session, ss string, areaName []byte) {
// addrNet := AddressNet([]byte(customName))
// Log.Info("添加session:%d %s", index, addrNet.B58String())
if ss == AddressUp {
this.addUpSession(areaName, session)
} else if ss == AddressDown {
this.addDownSession(areaName, session)
}
index := session.GetIndex()
this.lock.Lock()
_, ok := this.sessionByIndex[index]
if ok {
this.lock.Unlock()
return
}
this.sessionByIndex[index] = session
slist, ok := this.customNameStore[customName]
if ok && slist != nil {
slist.Add(session)
} else {
slist := utils.NewSyncList()
slist.Add(session)
this.customNameStore[customName] = slist
}
this.lock.Unlock()
}
// 管理根据新规则连接的 地址比自己大的session
func (this *sessionStore) addUpSession(areaName []byte, session Session) {
this.lock.Lock()
defer this.lock.Unlock()
index := session.GetIndex()
if _, ok := this.upSessionByIndex[utils.Bytes2string(areaName)][index]; ok {
return
}
if _, ok := this.upSessionByIndex[utils.Bytes2string(areaName)]; ok {
this.upSessionByIndex[utils.Bytes2string(areaName)][index] = session
} else {
tmp := make(map[uint64]Session)
tmp[index] = session
this.upSessionByIndex[utils.Bytes2string(areaName)] = tmp
}
}
// 管理根据新规则连接的 地址比自己小的session
func (this *sessionStore) addDownSession(areaName []byte, session Session) {
this.lock.Lock()
defer this.lock.Unlock()
index := session.GetIndex()
if _, ok := this.downSessionByIndex[utils.Bytes2string(areaName)][index]; ok {
return
}
if _, ok := this.downSessionByIndex[utils.Bytes2string(areaName)]; ok {
this.downSessionByIndex[utils.Bytes2string(areaName)][index] = session
} else {
tmp := make(map[uint64]Session)
tmp[index] = session
this.downSessionByIndex[utils.Bytes2string(areaName)] = tmp
}
}
/*
* 根据customName获取特定session
*/
func (this *sessionStore) getSession(areaName []byte, customName string) (Session, bool) {
// addrNet := AddressNet([]byte(customName))
// Log.Info("获取session:%s", AddressNet([]byte(customName)).B58String())
this.lock.RLock()
slist, ok := this.customNameStore[customName]
if slist == nil || !ok {
// Log.Info("获取session fail:%s", addrNet.B58String())
this.lock.RUnlock()
return nil, false
}
// Log.Info("获取session:%s", addrNet.B58String())
ssItr := slist.GetAll()
for i, _ := range ssItr {
ssOne, ok := ssItr[i].(Session)
if !ok {
continue
}
if ssOne.GetAreaName() != utils.Bytes2string(areaName) {
continue
}
this.lock.RUnlock()
return ssOne, true
}
this.lock.RUnlock()
return nil, false
}
/*
* getSessionAll 根据目标获取所有的连接信息
*
* @param customName string 目标地址
* @return sessions []Session 目标对应的所有连接
* @return success bool 是否存在与目标的连接信息
*/
func (this *sessionStore) getSessionAll(areaName []byte, customName string) ([]Session, bool) {
this.lock.RLock()
slist, ok := this.customNameStore[customName]
if slist == nil || !ok {
this.lock.RUnlock()
return nil, false
}
ssItr := slist.GetAll()
ss := make([]Session, 0, len(ssItr))
for i, _ := range ssItr {
ssOne, ok := ssItr[i].(Session)
if !ok {
continue
}
if ssOne.GetAreaName() != utils.Bytes2string(areaName) {
continue
}
ss = append(ss, ssOne)
}
this.lock.RUnlock()
return ss, true
}
// func (this *sessionStore) getSessionByHost(hostName string) Session {
// this.lock.RLock()
// ss, ok := this.hostNameStore[hostName]
// if !ok {
// this.lock.RUnlock()
// return nil
// }
// this.lock.RUnlock()
// return ss
// }
/*
* 特定session是否存在
*/
func (this *sessionStore) checkInSessionByIndex(ss Session) bool {
if _, ok := this.sessionByIndex[ss.GetIndex()]; ok {
return true
} else {
return false
}
}
/*
* 在真实节点downSession中删除特定session
*/
func (this *sessionStore) delNodeDownSession(areaName []byte, ss Session) {
this.lock.Lock()
defer this.lock.Unlock()
if _, ok := this.downSessionByIndex[utils.Bytes2string(areaName)]; ok {
delete(this.downSessionByIndex[utils.Bytes2string(areaName)], ss.GetIndex())
}
}
/*
* 在真实节点upSession中删除特定session
*/
func (this *sessionStore) delNodeUpSession(areaName []byte, ss Session) {
this.lock.Lock()
defer this.lock.Unlock()
if _, ok := this.upSessionByIndex[utils.Bytes2string(areaName)]; ok {
delete(this.upSessionByIndex[utils.Bytes2string(areaName)], ss.GetIndex())
}
}
/*
* 在真实节点customNameStore中删除特定session
*/
func (this *sessionStore) removeCustomSession(ss Session) {
this.lock.Lock()
defer this.lock.Unlock()
slist, ok := this.customNameStore[ss.GetName()]
if ok && slist != nil {
ssItr := slist.GetAll()
for i, _ := range ssItr {
session := ssItr[i].(Session)
if session.GetIndex() == ss.GetIndex() {
slist.Remove(i)
if len(ssItr) == 1 {
delete(this.customNameStore, ss.GetName())
}
break
}
}
}
}
/*
* sessionStore中删除特定session
*/
func (this *sessionStore) removeSession(areaName string, ss Session) {
// addrNet := AddressNet([]byte(ss.GetName()))
// Log.Info("删除一个连接session:%d %s", ss.GetIndex(), addrNet.B58String())
var hasIndex bool
var session Session
this.lock.Lock()
if ses, ok := this.sessionByIndex[ss.GetIndex()]; ok {
session = ses
hasIndex = ok
}
if ses, ok := this.upSessionByIndex[areaName][ss.GetIndex()]; ok {
session = ses
hasIndex = ok
}
if ses, ok := this.downSessionByIndex[areaName][ss.GetIndex()]; ok {
session = ses
hasIndex = ok
}
if !hasIndex {
this.lock.Unlock()
return
}
slist, ok := this.customNameStore[session.GetName()]
if ok && slist != nil {
ssItr := slist.GetAll()
for i, _ := range ssItr {
session := ssItr[i].(Session)
if session.GetIndex() == ss.GetIndex() {
slist.Remove(i)
break
}
}
}
//如果list长度为0,则删除
if ok && len(slist.GetAll()) == 0 {
delete(this.customNameStore, session.GetName())
}
if _, ok := this.sessionByIndex[ss.GetIndex()]; ok {
delete(this.sessionByIndex, ss.GetIndex())
}
if _, ok := this.upSessionByIndex[areaName][ss.GetIndex()]; ok {
delete(this.upSessionByIndex[areaName], ss.GetIndex())
}
if _, ok := this.downSessionByIndex[areaName][ss.GetIndex()]; ok {
delete(this.downSessionByIndex[areaName], ss.GetIndex())
}
this.lock.Unlock()
}
/*
* sessionStore中重命名session
*/
func (this *sessionStore) renameSession(index uint64, customName string) {
this.lock.Lock()
session, ok := this.sessionByIndex[index]
// session, ok := this.hostNameStore[hostName]
if !ok {
this.lock.Unlock()
return
}
//从原来的集合中删除
oldCustomName := session.GetName()
slist, ok := this.customNameStore[oldCustomName]
if !ok || slist == nil {
this.lock.Unlock()
return
}
//
ssItr := slist.GetAll()
for i, _ := range ssItr {
ss := ssItr[i].(Session)
// hostNameOne := ss.GetRemoteHost()
if ss.GetIndex() == index {
//找到了
slist.Remove(i)
break
}
}
//如果list长度为0,则删除
if len(slist.GetAll()) == 0 {
// Log.Info("removeSessionByHost:%s", hex.EncodeToString([]byte(oldCustomName)))
delete(this.customNameStore, oldCustomName)
}
//保存到新的名称集合中
slist, ok = this.customNameStore[customName]
if ok && slist != nil {
slist.Add(session)
} else {
slist := utils.NewSyncList()
slist.Add(session)
// Log.Info("removeSessionByHost rename:%s", hex.EncodeToString([]byte(customName)))
this.customNameStore[customName] = slist
}
this.lock.Unlock()
}
/*
* 获取特定区域名的所有session
*/
func (this *sessionStore) getAllSession(areaName []byte) []Session {
ss := make([]Session, 0)
strAreaName := utils.Bytes2string(areaName)
this.lock.RLock()
for _, session := range this.sessionByIndex {
if session.GetAreaName() != strAreaName {
continue
}
ss = append(ss, session)
}
this.lock.RUnlock()
return ss
}
/*
* 获取真实节点的所有upSession
*/
func (this *sessionStore) getAllUpSession(areaName []byte) []Session {
ss := make([]Session, 0)
if _, ok := this.upSessionByIndex[utils.Bytes2string(areaName)]; !ok {
return ss
}
this.lock.RLock()
for i := range this.upSessionByIndex[utils.Bytes2string(areaName)] {
ss = append(ss, this.upSessionByIndex[utils.Bytes2string(areaName)][i])
}
this.lock.RUnlock()
return ss
}
/*
* 获取真实节点的所有downSession
*/
func (this *sessionStore) getAllDownSession(areaName []byte) []Session {
ss := make([]Session, 0)
if _, ok := this.downSessionByIndex[utils.Bytes2string(areaName)]; !ok {
return ss
}
this.lock.RLock()
for i := range this.downSessionByIndex[utils.Bytes2string(areaName)] {
ss = append(ss, this.downSessionByIndex[utils.Bytes2string(areaName)][i])
}
this.lock.RUnlock()
return ss
}
/*
* 获取特定区域名的所有session
*/
func (this *sessionStore) getAllSessionName(areaName []byte) []string {
names := make([]string, 0)
nameM := make(map[string]int)
strAreaName := utils.Bytes2string(areaName)
this.lock.RLock()
for _, one := range this.sessionByIndex {
if _, ok := nameM[one.GetName()]; ok {
continue
}
if one.GetAreaName() != strAreaName {
continue
}
nameM[one.GetName()] = 0
names = append(names, one.GetName())
}
this.lock.RUnlock()
return names
}
/*
获得一个未使用的服务器连接
*/
func (this *sessionStore) getClientConn(areaName []byte, engine *Engine) *Client {
this.lock.Lock()
for {
_, ok := this.sessionByIndex[this.indexMax]
if !ok {
break
}
this.indexMax++
}
index := this.indexMax
this.indexMax++
this.lock.Unlock()
contextRoot, canceRoot := context.WithCancel(context.Background())
sessionBase := sessionBase{
attrbutes: new(sync.Map),
areaName: utils.Bytes2string(areaName),
connType: CONN_TYPE_TCP,
}
clientConn := &Client{
sessionBase: sessionBase,
index: index,
engine: engine,
sendQueue: NewSendQueue(SendQueueCacheNum, contextRoot, canceRoot, engine.name),
allowClose: make(chan bool, 1),
heartbeat: make(chan bool, 1),
getDataSign: make(chan bool, 1),
}
return clientConn
}
/*
获得一个未使用的服务器连接
*/
func (this *sessionStore) getServerConn(engine *Engine, areaName string) *ServerConn {
this.lock.Lock()
for {
_, ok := this.sessionByIndex[this.indexMax]
if !ok {
break
}
this.indexMax++
}
index := this.indexMax
this.indexMax++
this.lock.Unlock()
contextRoot, canceRoot := context.WithCancel(context.Background())
//创建一个新的session
sessionBase := sessionBase{
attrbutes: new(sync.Map),
areaName: areaName,
connType: CONN_TYPE_TCP,
}
serverConn := &ServerConn{
sessionBase: sessionBase,
index: index,
engine: engine,
sendQueue: NewSendQueue(SendQueueCacheNum, contextRoot, canceRoot, engine.name),
allowClose: make(chan bool, 1),
}
serverConn.controller = &ControllerImpl{
lock: new(sync.RWMutex),
engine: engine,
attributes: make(map[string]interface{}),
}
return serverConn
}
/*
获得一个未使用的服务器连接
*/
func (this *sessionStore) getClientQuicConn(areaName []byte, engine *Engine) *ClientQuic {
this.lock.Lock()
for {
_, ok := this.sessionByIndex[this.indexMax]
if !ok {
break
}
this.indexMax++
}
index := this.indexMax
this.indexMax++
this.lock.Unlock()
contextRoot, canceRoot := context.WithCancel(context.Background())
sessionBase := sessionBase{
attrbutes: new(sync.Map),
areaName: utils.Bytes2string(areaName),
connType: CONN_TYPE_QUIC,
}
clientConn := &ClientQuic{
sessionBase: sessionBase,
index: index,
engine: engine,
sendQueue: NewSendQueue(SendQueueCacheNum, contextRoot, canceRoot, engine.name),
allowClose: make(chan bool, 1),
heartbeat: make(chan bool, 1),
getDataSign: make(chan bool, 1),
quicConf: &quic.Config{
MaxIdleTimeout: time.Second,
KeepAlivePeriod: 500 * time.Millisecond,
},
tlsConf: &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"quic-p2p-project"},
},
}
return clientConn
}
/*
获得一个未使用的服务器连接
*/
func (this *sessionStore) getServerQuicConn(engine *Engine, areaName string) *ServerQuicConn {
this.lock.Lock()
for {
_, ok := this.sessionByIndex[this.indexMax]
if !ok {
break
}
this.indexMax++
}
index := this.indexMax
this.indexMax++
this.lock.Unlock()
contextRoot, canceRoot := context.WithCancel(context.Background())
//创建一个新的session
sessionBase := sessionBase{
attrbutes: new(sync.Map),
areaName: areaName,
connType: CONN_TYPE_QUIC,
}
serverConn := &ServerQuicConn{
sessionBase: sessionBase,
index: index,
engine: engine,
sendQueue: NewSendQueue(SendQueueCacheNum, contextRoot, canceRoot, engine.name),
allowClose: make(chan bool, 1),
}
serverConn.controller = &ControllerImpl{
lock: new(sync.RWMutex),
engine: engine,
attributes: make(map[string]interface{}),
}
return serverConn
}
func NewSessionStore() *sessionStore {
sessionStore := new(sessionStore)
sessionStore.lock = new(sync.RWMutex)
sessionStore.sessionByIndex = make(map[uint64]Session)
sessionStore.downSessionByIndex = make(map[string]map[uint64]Session)
sessionStore.upSessionByIndex = make(map[string]map[uint64]Session)
sessionStore.customNameStore = make(map[string]*utils.SyncList)
// sessionStore.hostNameStore = make(map[string]Session)
return sessionStore
}
/*
* 获取特定区域名的session总数
*/
func (this *sessionStore) getSessionCnt(areaName []byte) (res int64) {
strAreaName := utils.Bytes2string(areaName)
this.lock.RLock()
for _, one := range this.sessionByIndex {
if one.GetAreaName() != strAreaName {
continue
}
res++
}
this.lock.RUnlock()
return res
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/prestonTao/libp2parea.git
git@gitee.com:prestonTao/libp2parea.git
prestonTao
libp2parea
libp2parea
3aaa451ef873

搜索帮助