1 Star 5 Fork 3

夏季的风 / TCP-UDP网络组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
DynamicReceiver.go 6.99 KB
一键复制 编辑 原始数据 按行查看 历史
lingbinbin 提交于 2021-12-17 09:28 . *优化动态分包器算法
package Receivers
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"gitee.com/ling-bin/network/netInterface"
"log"
"sync"
)
var (
packageMaxLen = 64 * 1024 //允许最大包容量 64 K;
packageBufferPool = newPackageBufferPoolMany(1024, packageMaxLen, 2)
)
// DynamicReceiver 策略分包器
type DynamicReceiver struct {
currentReceiver netInterface.IReceiver //当前分包器
receivers []netInterface.IReceiver //分包器集合
receiverLen int //分包器个数
isSingle bool //是否只有一个分包算法
hDataCache bool //是否启用处理数据缓存优化[注意:启用后SetOnReceiveCompleted 处理方法使用协程会导致数据错乱]
bytesCache *bytes.Buffer //分包缓存:每个连接对应一个分包缓存,大多数情况下是不会有粘包情况
onReceive func(data []byte) //分包完整包回调
errorCallback func(errStr string) //内部错误回调
size int //初始分包大小
}
//NewDynamicReceiver 实例化策略分包器
func NewDynamicReceiver(receivers []netInterface.IReceiver, size int, onReceive func(data []byte),errorCallback func(errStr string)) *DynamicReceiver {
dynamicReceiver := &DynamicReceiver{
receivers: receivers,
receiverLen: len(receivers),
isSingle: len(receivers) == 1,
onReceive: onReceive,
errorCallback: errorCallback,
bytesCache: nil,
size: size,
}
return dynamicReceiver
}
//错误回调
func (d *DynamicReceiver) errCall(errStr string) {
defer func() {
if r := recover(); r != nil {
log.Println("异常处理函数异常则屏蔽:",r)
}
}()
if d.errorCallback != nil {
d.errorCallback(errStr)
}
}
// 写入缓存
func (d *DynamicReceiver) writeCache(data []byte) {
if d.bytesCache == nil {
if d.size > packageMaxLen {
d.size = packageMaxLen
}
d.bytesCache = packageBufferPool.get(d.size)
}
d.bytesCache.Write(data)
}
// Receiver 分包处理
func (d *DynamicReceiver) Receiver(conn netInterface.IConnection, buffer []byte) (err error) {
//连接关闭则不处理未分包好的数据,分包好的数据是否处理交由业务自行判断是否处理
if conn.GetIsClosed() {
if d.bytesCache != nil {
d.bytesCache.Reset()
}
return errors.New(fmt.Sprint("[", conn.GetConnId(), "]连接关闭未分包数据丢弃,不继续处理!"))
}
defer func() {
//程序执行完成没有未用的数据时回收
if d.bytesCache != nil && d.bytesCache.Len() == 0 {
if d.size < d.bytesCache.Cap() {
d.size = d.bytesCache.Cap()
}
packageBufferPool.put(d.bytesCache)
d.bytesCache = nil
}
}()
var (
newBuf []byte
totalLen int
)
//1.查看有没有待分包数据
if d.bytesCache != nil && d.bytesCache.Len() != 0 {
currentLen := d.bytesCache.Len() + len(buffer)
//长度超长直接踢连接
if currentLen > packageMaxLen {
err = errors.New(fmt.Sprint("[数据清理]缓存数据长度超过 [", packageMaxLen, "]当前长度[", currentLen, "]"))
d.bytesCache.Reset()
conn.Stop() //关闭连接
return err
}
//缓存中获取缓存内存
d.bytesCache.Write(buffer)
newBuf = d.bytesCache.Bytes()
d.bytesCache.Reset()
} else {
newBuf = buffer
}
totalLen = len(newBuf)
for startIndex := 0; startIndex < totalLen; {
//找分包算法
iReceiver, headLenOk := d.selectReceiver(conn, newBuf[startIndex:])
if iReceiver != nil {
d.currentReceiver = iReceiver
} else {
if !headLenOk { //包头长度不够,把剩余数据重新写回缓存
d.writeCache(newBuf[startIndex:])
break
}
//后移一位查找分包算法
startIndex++
continue
}
//分包处理[内部也有可能跳过,需要具体分包算法处理]
data, handleIndex := d.receiver(conn, newBuf[startIndex:])
if len(data) == 0 {
if startIndex+handleIndex < totalLen {
//没有分包成功,把剩余数据重新写回缓存
d.writeCache(newBuf[startIndex+handleIndex:])
}
break
}
// 获取到完整包
hData := make([]byte, len(data))
copy(hData, data)
d.onReceive(hData)
startIndex += handleIndex
}
return nil
}
//开始分包数据
func (d *DynamicReceiver) receiver(conn netInterface.IConnection, data []byte) (hData []byte,handleIndex int) {
defer func() {
if r := recover(); r != nil {
d.errCall(fmt.Sprint("具体分包算法内部异常跳过所有数据[", len(data), "]:", r, " ", hex.EncodeToString(data)))
hData = nil
handleIndex = len(data)
//重置状态
d.currentReceiver.Reset()
}
}()
return d.currentReceiver.Receiver(conn, data)
}
//选择分包器[分包结构体,是否够包头长度]
func (d *DynamicReceiver) selectReceiver(conn netInterface.IConnection, buffer []byte) (receiver netInterface.IReceiver,headLenOk bool) {
defer func() {
if r := recover() ;r != nil {
d.errCall(fmt.Sprint("选择分包算法内部异常:",r))
headLenOk = true
receiver = nil
}
}()
bfLen := len(buffer)
//单个分包算法直接跳过认包逻辑
if d.isSingle {
receiver = d.receivers[0]
if bfLen < receiver.GetHeadLen() {
return nil, false
}
canHandle := receiver.CanHandle(conn, buffer)
if canHandle {
return d.receivers[0], true
}
return nil, true
}
headLenOk = true
//找到符合的分包器
for i := 0; i < d.receiverLen; i++ {
receiver = d.receivers[0]
if bfLen < receiver.GetHeadLen() {
headLenOk = false
continue
}
canHandle := d.receivers[i].CanHandle(conn, buffer)
if canHandle {
return d.receivers[i], false
}
}
return nil, headLenOk
}
// packageBufferPoolMany 字节缓存池[多尺寸]
type packageBufferPoolMany struct {
classes []sync.Pool
classesSize []int
classesLen int
minSize int
maxSize int
}
// newPackageBufferPoolMany 实例化buffer[多尺寸]
func newPackageBufferPoolMany(minSize, maxSize, factor int) *packageBufferPoolMany {
n := 0
for chunkSize := minSize; chunkSize <= maxSize; chunkSize *= factor {
n++
}
pool := &packageBufferPoolMany{
classes: make([]sync.Pool, n),
classesSize: make([]int, n),
classesLen: n,
minSize: minSize,
maxSize: maxSize,
}
n = 0
for chunkSize := minSize; chunkSize <= maxSize; chunkSize *= factor {
pool.classesSize[n] = chunkSize
pool.classes[n].New = func(size int) func() interface{} {
return func() interface{} {
buf := bytes.NewBuffer(make([]byte, 0, size))
return buf
}
}(chunkSize)
n++
}
return pool
}
// get 分配
func (b *packageBufferPoolMany) get(size int) *bytes.Buffer {
if size <= b.maxSize {
for i := 0; i < b.classesLen; i++ {
if b.classesSize[i] >= size {
mem := b.classes[i].Get().(*bytes.Buffer)
return mem
}
}
}
return bytes.NewBuffer(make([]byte, 0, size))
}
// put 回收
func (b *packageBufferPoolMany) put(mem *bytes.Buffer) {
if size := mem.Cap(); size <= b.maxSize {
for i := 0; i < b.classesLen; i++ {
if b.classesSize[i] >= size {
mem.Reset()
b.classes[i].Put(mem)
return
}
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ling-bin/network.git
git@gitee.com:ling-bin/network.git
ling-bin
network
TCP-UDP网络组件
v1.8.12

搜索帮助

344bd9b3 5694891 D2dac590 5694891