1 Star 5 Fork 3

夏季的风 / TCP-UDP网络组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Connection.go 11.68 KB
一键复制 编辑 原始数据 按行查看 历史
package netClient
import (
"encoding/hex"
"errors"
"gitee.com/ling-bin/go-utils/pools"
"gitee.com/ling-bin/network/Receivers"
"gitee.com/ling-bin/network/netInterface"
"net"
"sync"
"sync/atomic"
"time"
)
var(
connId uint64 = 0
bufferPool = pools.NewBufferPoolMany(128, 64*1204, 2)
)
//Connection 连接结构体
type Connection struct {
client *Client //当前conn属于哪个client,在conn初始化的时候添加即可
conn net.Conn //连接客户端
connId uint64 //当前连接的ID 也可以称作为SessionID,ID全局唯一
isClosed bool //当前连接的关闭状态 [ true:关闭,false:开 ]
msgSendChan chan *callSendHandle //消息发送,有缓冲管道
dynamicReceiver *Receivers.DynamicReceiver //分包器
heartTime time.Time //连接最后一次接收数据时间(每包更新)
sendTime time.Time //连接最后一次发送数据时间(每包更新)
startTime time.Time //连接建立时间
recPackCount uint64 //上行当前处理到的包数
recTotalByteSize uint64 //上行总大小(字节)
repPackCount uint64 //下行总包个数
repTotalByteSize uint64 //下行总大小(字节)
repPackErrCount uint64 //下发异常包个数
incr int64 //供业务下发使用流水号
property map[string]interface{} //链接属性
connLock sync.RWMutex //保护链接属性修改的锁
config *Config //配置
}
//callSendHandle 数据发送回调处理
type callSendHandle struct {
Buf []byte //发送数据
CallFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error) //回调方法
Pram interface{} //参数
CmdCode string
}
//NewConnection 创建连接的方法
func NewConnection(client *Client, conn net.Conn, config *Config) *Connection {
c := &Connection{
client: client,
conn: conn,
isClosed: true,
config: config,
property: make(map[string]interface{}, 0),
msgSendChan: make(chan *callSendHandle, config.SendDataCount),
heartTime: time.Now(),
sendTime: time.Now(),
startTime: time.Now(),
connId: atomic.AddUint64(&connId, 1),
}
return c
}
//GetNetConn 从当前连接获取原始的socket TCPConn
func (c *Connection) GetNetConn() interface{} {
return c.conn
}
// GetNetwork 获取网络类型
func (c *Connection) GetNetwork() string {
return c.config.Network
}
//GetConnId 获取客户端ID
func (c *Connection) GetConnId() uint64 {
return c.connId
}
//GetRemoteAddr 获取远程客户端地址信息
func (c *Connection) GetRemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
//GetLocalAddr 获取本地地址信息
func (c *Connection) GetLocalAddr() net.Addr {
return c.conn.LocalAddr()
}
//CallLogHandle 调用异常处理
func (c *Connection) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
c.client.CallLogHandle(level, msgAry...)
}
//GetIsClosed 获取的状态[脏读](ture:关闭状态,false:未关闭)
func (c *Connection) GetIsClosed() bool {
return c.isClosed
}
//Incr 连接提供给业务作为流水号使用,循环累加,(val 为正数为递增,val为负数为递减,val为0则获取值)
func (c *Connection) Incr(val int64) int64 {
return atomic.AddInt64(&c.incr, val)
}
//Start 启动连接,让当前连接开始工作[只会调用一次]
func (c *Connection) Start() {
if c.isClosed {
c.isClosed = false
//开启处理该链接读取到客户端数据之后的请求业务
go c.startReader()
go c.startWriter()
}
}
//Stop 停止连接,结束当前连接状态M
func (c *Connection) Stop() {
if c.isClosed {
return
}
c.connLock.Lock()
if c.isClosed {
c.connLock.Unlock()
return
}
c.isClosed = true
c.connLock.Unlock()
// 关闭socket链接
c.conn.Close()
}
//SendData 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendData(data []byte, cmdCode string) error {
return c.SendDataCall(data, cmdCode, nil, nil)
}
//SendDataCall 直接将Message数据发送数据给远程的TCP客户端(带参数和回调)
func (c *Connection) SendDataCall(data []byte, cmdCode string, pram interface{}, callFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) error {
if c.isClosed {
return netInterface.ClosedErr
}
callSendData := &callSendHandle{
Buf: data,
CallFunc: callFunc,
Pram: pram,
CmdCode: cmdCode,
}
if !c.config.OverflowDiscard {
c.msgSendChan <- callSendData
return nil
}
select {
case c.msgSendChan <- callSendData:
break
default:
return errors.New("发送队列已满,请稍后重试")
}
return nil
}
//SetProperty 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {
c.connLock.Lock()
defer c.connLock.Unlock()
c.property[key] = value
}
//GetProperty 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.connLock.RLock()
defer c.connLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, netInterface.NotKey
}
}
//RemoveProperty 移除链接属性
func (c *Connection) RemoveProperty(key string) {
c.connLock.Lock()
defer c.connLock.Unlock()
delete(c.property, key)
}
//GetPropertyKeys 获取所有属性key
func (c *Connection) GetPropertyKeys() []string {
c.connLock.RLock()
defer c.connLock.RUnlock()
propertyAry := make([]string, 0, len(c.property))
for key, _ := range c.property {
propertyAry = append(propertyAry, key)
}
return propertyAry
}
//GetRecInfo 上行当前处理的包总数(处理前,1开始),总大小(字节)
func (c *Connection) GetRecInfo() (count, byteSize uint64) {
return c.recPackCount, c.recTotalByteSize
}
//GetRepInfo 下行当前处理的包总数(处理后),总大小(字节)
func (c *Connection) GetRepInfo() (count, byteSize, errCount uint64) {
return c.repPackCount, c.repTotalByteSize, c.repPackErrCount
}
//GetHeartTime 连接最后一次接收数据时间
func (c *Connection) GetHeartTime() time.Time {
return c.heartTime
}
//GetSendTime 连接最后一次发送数据时间
func (c *Connection) GetSendTime() time.Time {
return c.sendTime
}
//GetStartTime 连接建立时间
func (c *Connection) GetStartTime() time.Time {
return c.startTime
}
//OnReceive 数据分包完成
func (c *Connection) OnReceive(data []byte) {
//更新心跳时间
c.recPackCount++
c.recTotalByteSize += uint64(len(data))
c.heartTime = time.Now()
var hData []byte
if c.client.config.HDataCache {
hData = data
} else {
hData = make([]byte, len(data))
copy(hData, data)
}
c.client.CallOnReceive(c, hData)
}
const(
hKey = "#CallKey#" // 回调返回Key
)
//OnCompleted 数据上传处理
func (c *Connection) OnCompleted(data []byte, count int) {
defer func() {
if r := recover(); r != nil {
c.CallLogHandle(netInterface.Error, "数据[", count, "个字节]分包处理异常:", r)
}
}()
//udp不需要分包
if c.client.network == "udp" || c.client.handleStrategy == nil {
if c.client.handleStrategy != nil {
strategyInfo := c.client.handleStrategy(c, data[:count])
if strategyInfo == nil || len(strategyInfo.Key) == 0 {
c.Stop()
return
}
if c.recPackCount == 0 {
c.SetProperty(hKey, strategyInfo.Key)
if len(strategyInfo.ExtData) != 0 {
for key, val := range strategyInfo.ExtData {
c.SetProperty(key, val)
}
}
}
}
c.OnReceive(data[:count])
return
}
//tcp分包逻辑
if c.dynamicReceiver == nil {
strategyInfo := c.client.handleStrategy(c,data[:count])
if strategyInfo == nil || len(strategyInfo.Receivers) == 0 {
c.Stop()
c.CallLogHandle(netInterface.Warn, "第一包数据不能被分包识别:", hex.EncodeToString(data[:count]))
return
}
c.SetProperty(hKey,strategyInfo.Key)
if len(strategyInfo.ExtData) != 0 {
for key,val:= range strategyInfo.ExtData{
c.SetProperty(key,val)
}
}
c.dynamicReceiver = Receivers.NewDynamicReceiver(strategyInfo.Receivers, c.config.BufferSize, c.OnReceive, func(errStr string) {
c.CallLogHandle(netInterface.Warn, "内部分包算法异常:", errStr)
})
}
c.dynamicReceiver.Receiver(c, data[:count])
}
//startReader 处理conn读数据的Goroutine
func (c *Connection) startReader() {
defer func() {
c.Stop() //连接断开
c.client.CallOnConnStop(c)
//关闭该链接全部管道
close(c.msgSendChan)
//在这里回收分包缓存,主要目的是保证和数据分包处理协程在同一个协程中;
if c.dynamicReceiver != nil {
c.dynamicReceiver.Recovery(c)
}
if r := recover(); r != nil {
c.CallLogHandle(netInterface.Error, "[tcp]连接读取数据异常:", r)
}
}()
c.client.CallOnConnStart(c)
data := bufferPool.Get(c.config.BufferSize)
defer bufferPool.Put(data) //资源回收
tempDelay := time.Millisecond * 0
for {
//获取缓存对象
count, err := c.conn.Read(data)
if err != nil {
//临时异常或者超时异常休眠一下
if ne, ok := err.(net.Error); ok && (ne.Temporary() || ne.Timeout()){
if tempDelay == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
c.CallLogHandle(netInterface.Error, "[tcp]客户端上传数据临时异常: ", err)
time.Sleep(tempDelay)
continue
}
break
}
c.OnCompleted(data, count)
if tempDelay != 0 {
tempDelay = time.Millisecond * 0
}
}
}
//startWriter 写消息Goroutine, 用户将数据发送给客户端
func (c *Connection) startWriter() {
defer func() {
if r := recover(); r != nil {
c.CallLogHandle(netInterface.Error, "[tcp]运行回复任务异常:", r)
}
}()
for {
content, ok := <-c.msgSendChan
if !ok {
break
}
count := 0
var tempDelay time.Duration
sendData := content.Buf
RETRY: //遇到临时错误重试发送
//有数据要写给客户端
err := c.conn.SetWriteDeadline(time.Now().Add(c.config.SendOutTime))
if err != nil {
//设置超时失败回调
c.call(content, false, err)
continue
}
//发送数据[有异常信息或者发送成功数据和要发的数据不一致]
bLen, err := c.conn.Write(sendData)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
count++
if count < c.config.SendRetryCount {
if count == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if max := 10 * time.Millisecond; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
goto RETRY //临时错误重试
}
}
//发送失败回调
c.call(content, false, err)
continue
}
//没有异常,但数据没有完全发送
if bLen != len(sendData) {
count++
if count < c.config.SendRetryCount {
sendData = sendData[bLen:]
if count == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if max := 10 * time.Millisecond; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
goto RETRY
}
c.call(content, false, err)
continue
}
//发送成功回调(异步处理)
c.call(content, true, nil)
}
}
//call 回调
func (c *Connection) call(content *callSendHandle, isOk bool, err error) {
if isOk {
c.repTotalByteSize += uint64(len(content.Buf))
c.repPackCount++
} else {
c.repPackErrCount++
}
c.sendTime = time.Now()
if content.CallFunc != nil {
content.CallFunc(c, content.Buf, isOk, content.CmdCode, content.Pram, err)
}
}
Go
1
https://gitee.com/ling-bin/network.git
git@gitee.com:ling-bin/network.git
ling-bin
network
TCP-UDP网络组件
v1.8.30

搜索帮助