1 Star 5 Fork 3

夏季的风 / TCP-UDP网络组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
DynamicReceiver.go 3.58 KB
一键复制 编辑 原始数据 按行查看 历史
lingbinbin 提交于 2021-09-16 14:44 . *升级任务处理池
package Receivers
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"gitee.com/ling-bin/network/netInterface"
)
var (
maxLen = 65535 //允许最大包容量,超过会异常(64k)
)
// DynamicReceiver 策略分包器
type DynamicReceiver struct {
currentReceiver netInterface.IReceiver //当前分包器
Receivers []netInterface.IReceiver //分包器集合
ReceiverLen int //分包器个数
IsPackStart bool //是否开始分包
IsSingle bool //是否只有一个分包算法
HDataCache bool //是否启用处理数据缓存优化[注意:启用后SetOnReceiveCompleted 处理方法使用协程会导致数据错乱]
BytesCache *bytes.Buffer //分包缓存:每个连接对应一个分包缓存
OnReceive func(data []byte) //分包完整包回调
}
// Receiver 分包处理
func (d *DynamicReceiver) Receiver(conn netInterface.IConnection, buffer []byte) (err error) {
//1.查看有没有待分包数据
var newBuf []byte
cacheLen := d.BytesCache.Len()
if cacheLen != 0 {
currentLen := cacheLen + len(buffer)
//长度超长直接踢连接
if currentLen > maxLen {
conn.Stop()
msg := fmt.Sprint(" buffer out maxLen [", d.BytesCache.Len(), "]data:", hex.EncodeToString(newBuf[:]))
d.BytesCache.Reset()
return errors.New(msg)
}
d.BytesCache.Write(buffer)
newBuf = d.BytesCache.Bytes()
d.BytesCache.Reset()
} else {
newBuf = buffer
}
totalLen := len(newBuf)
startIndex := 0
skipCount := 0
for {
if startIndex == totalLen {
//刚好完成或未找到分包算法就清理缓存跳出函数
if skipCount > 0 {
return errors.New(fmt.Sprint("SelectReceiver skip[", skipCount, "(byte)] data:", hex.EncodeToString(newBuf[:])))
}
break
}
if d.IsPackStart { //确认当前包分包算法
iReceiver, headLenOk := d.selectReceiver(conn, newBuf[startIndex:])
if !headLenOk {
d.BytesCache.Write(newBuf[startIndex:])
break //包头长度不够
}
if iReceiver == nil {
//后移一位查找分包算法
startIndex++
skipCount++
continue
}
d.currentReceiver = iReceiver
d.IsPackStart = false
}
//分包处理[内部也有可能跳过,需要具体分包算法处理]
data, handleIndex := d.currentReceiver.Receiver(conn, newBuf[startIndex:])
if len(data) == 0 {
if startIndex + handleIndex < totalLen {
//没有找到则重置缓存,重新设置缓存
d.BytesCache.Write(newBuf[startIndex+handleIndex:])
}
break
}
count := len(data)
if d.HDataCache {
d.OnReceive(data)
} else {
hData := make([]byte, 0, count)
hData = append(hData, data[:count]...)
d.OnReceive(hData)
}
d.IsPackStart = true
startIndex += handleIndex
}
return nil
}
//选择分包器[分包结构体,是否够包头长度]
func (d *DynamicReceiver) selectReceiver(conn netInterface.IConnection, buffer []byte) (netInterface.IReceiver,bool) {
var receiver netInterface.IReceiver
bfLen := len(buffer)
//单个分包算法直接跳过认包逻辑
if d.IsSingle {
receiver = d.Receivers[0]
if bfLen < receiver.GetHeadLen() {
return nil, false
}
canHandle := receiver.CanHandle(conn, buffer)
if canHandle {
return d.Receivers[0], true
}
return nil, true
}
//找到符合的分包器
headLenOk := true
for i := 0; i < d.ReceiverLen; i++ {
receiver = d.Receivers[0]
if bfLen < receiver.GetHeadLen() {
headLenOk = false
continue
}
canHandle := d.Receivers[i].CanHandle(conn, buffer)
if canHandle {
return d.Receivers[i],false
}
}
return nil,headLenOk
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ling-bin/network.git
git@gitee.com:ling-bin/network.git
ling-bin
network
TCP-UDP网络组件
v1.7.4

搜索帮助

344bd9b3 5694891 D2dac590 5694891