0 Star 1 Fork 0

蒋佳李 / leaf

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
gate.go 4.59 KB
一键复制 编辑 原始数据 按行查看 历史
蒋佳李 提交于 2023-03-08 15:33 . 修复:没有关闭 closeChan 通道
package gate
import (
"gitee.com/jiangjiali/leaf/chanrpc"
"gitee.com/jiangjiali/leaf/conf"
"gitee.com/jiangjiali/leaf/log"
"gitee.com/jiangjiali/leaf/network"
"net"
"reflect"
"sync"
"time"
)
type Gate struct {
MaxConnNum int
PendingWriteNum int
MaxMsgLen uint32
Processor network.Processor
AgentChanRPC *chanrpc.Server
// websocket
WSAddr string
HTTPTimeout time.Duration
CertFile string
KeyFile string
// tcp
TCPAddr string
LenMsgLen int
LittleEndian bool
}
func (gate *Gate) Run(closeSig chan bool) {
var wsServer *network.WSServer
if gate.WSAddr != "" {
wsServer = new(network.WSServer)
wsServer.Addr = gate.WSAddr
wsServer.MaxConnNum = gate.MaxConnNum
wsServer.PendingWriteNum = gate.PendingWriteNum
wsServer.MaxMsgLen = gate.MaxMsgLen
wsServer.HTTPTimeout = gate.HTTPTimeout
wsServer.CertFile = gate.CertFile
wsServer.KeyFile = gate.KeyFile
wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
a := &agent{conn: conn, gate: gate, closeChan: make(chan byte)}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
var tcpServer *network.TCPServer
if gate.TCPAddr != "" {
tcpServer = new(network.TCPServer)
tcpServer.Addr = gate.TCPAddr
tcpServer.MaxConnNum = gate.MaxConnNum
tcpServer.PendingWriteNum = gate.PendingWriteNum
tcpServer.LenMsgLen = gate.LenMsgLen
tcpServer.MaxMsgLen = gate.MaxMsgLen
tcpServer.LittleEndian = gate.LittleEndian
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{conn: conn, gate: gate, closeChan: make(chan byte)}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}
if wsServer != nil {
wsServer.Start()
}
if tcpServer != nil {
tcpServer.Start()
}
<-closeSig
if wsServer != nil {
wsServer.Close()
}
if tcpServer != nil {
tcpServer.Close()
}
}
func (gate *Gate) OnDestroy() {}
type agent struct {
m sync.RWMutex
closeChan chan byte // 心跳关闭通道
lastHeartbeatTime time.Time // 最近一次心跳时间
conn network.Conn // 连接句柄
gate *Gate // 连接门限控制
userData interface{} // 玩家数据
}
func (a *agent) Run() {
var (
err error // 错误信息
data []byte // 传输过来的数据
msg interface{} // 解析后的数据
)
go a.heartbeatChecker()
for {
data, err = a.conn.ReadMsg()
if err != nil {
log.Error("Core::Agent::读取信息错误: %v", err)
goto EXIT
}
if a.gate.Processor != nil {
msg, err = a.gate.Processor.Unmarshal(data)
if err != nil {
log.Error("Core::Agent::解析信息错误: %v", err)
goto EXIT
}
if err = a.gate.Processor.Route(msg, a); err != nil {
log.Error("Core::Agent::路由信息错误: %v", err)
goto EXIT
}
}
}
EXIT:
close(a.closeChan)
}
func (a *agent) OnClose() {
a.m.RLock()
defer a.m.RUnlock()
if a.gate.AgentChanRPC != nil {
err := a.gate.AgentChanRPC.Call0("CloseAgent", a)
if err != nil {
log.Error("Core::Agent::RPC通道错误: %v", err)
}
}
}
func (a *agent) WriteMsg(msg interface{}) {
if a.gate.Processor != nil {
data, err := a.gate.Processor.Marshal(msg)
if err != nil {
log.Error("Core::Agent::编码信息[%v]错误: %v", reflect.TypeOf(msg), err)
return
}
err = a.conn.WriteMsg(data...)
if err != nil {
log.Error("Core::Agent::写信息[%v]错误: %v", reflect.TypeOf(msg), err)
}
}
}
func (a *agent) LocalAddr() net.Addr {
a.m.RLock()
defer a.m.RUnlock()
return a.conn.LocalAddr()
}
func (a *agent) RemoteAddr() net.Addr {
a.m.RLock()
defer a.m.RUnlock()
return a.conn.RemoteAddr()
}
func (a *agent) Close() {
a.conn.Close()
}
func (a *agent) Destroy() {
a.conn.Destroy()
}
func (a *agent) UserData() interface{} {
a.m.RLock()
defer a.m.RUnlock()
return a.userData
}
func (a *agent) SetUserData(data interface{}) {
a.m.Lock()
a.userData = data
a.m.Unlock()
}
// 每隔1秒, 检查一次连接是否健康
func (a *agent) heartbeatChecker() {
// HeartbeatInterval 心跳时间,后传给 timer.C 去判断
heartbeatTime := time.Duration(conf.HeartbeatInterval)
timer := time.NewTimer(heartbeatTime * time.Second)
for {
select {
case <-timer.C:
if time.Now().Sub(a.lastHeartbeatTime) > heartbeatTime*time.Second {
timer.Stop()
close(a.closeChan)
goto EXIT
}
timer.Reset(heartbeatTime * time.Second)
case <-a.closeChan:
timer.Stop()
goto EXIT
}
}
EXIT:
a.Destroy()
}
// KeepAlive 更新心跳
func (a *agent) KeepAlive() {
a.m.Lock()
a.lastHeartbeatTime = time.Now()
a.m.Unlock()
}
Go
1
https://gitee.com/jiangjiali/leaf.git
git@gitee.com:jiangjiali/leaf.git
jiangjiali
leaf
leaf
v1.1.44

搜索帮助