代码拉取完成,页面将自动刷新
package netService
import (
"gitee.com/ling-bin/network/netInterface"
"net"
"sync"
"sync/atomic"
"time"
)
//UdpConnection 连接结构体
type UdpConnection struct {
server *UdpService //当前conn属于哪个server,在conn初始化的时候添加即可
conn *net.UDPConn //当前连接的socket 套接字
connId uint64 //当前连接的ID 也可以称作为SessionID,ID全局唯一
remoteAddr net.Addr //客户端地址
isClosed bool //当前连接的关闭状态 [ true:关闭,false:开 ]
recPackCount uint64 //上行当前处理的包总数(处理前,1开始)
recTotalByteSize uint64 //上行总大小(字节)
repPackCount uint64 //下行当前处理的包总数(处理后)
repTotalByteSize uint64 //下行成功总大小(字节)
repPackErrCount uint64 //下发异常包个数
incr int64 //供业务下发使用流水号
heartTime time.Time //连接最后一次接收数据时间(每包更新)
sendTime time.Time //连接最后一次发送数据时间(每包更新)
startTime time.Time //连接建立时间
property map[string]interface{} //链接属性
connLock sync.RWMutex //保护链接属性修改的锁
}
//NewUdpConnection Udp连接
func NewUdpConnection(server *UdpService, conn *net.UDPConn, connId uint64, remoteAddr net.Addr) *UdpConnection {
uConn := &UdpConnection{
server: server,
conn: conn,
connId: connId,
isClosed: true,
remoteAddr: remoteAddr,
heartTime: time.Now(),
sendTime: time.Now(),
startTime: time.Now(),
property: make(map[string]interface{}),
}
//添加连接
uConn.server.GetConnMgr().Add(uConn)
uConn.server.addDetection(uConn)
return uConn
}
//GetNetConn 获取网络连接
func (u *UdpConnection) GetNetConn() interface{} {
return u.conn
}
// GetNetwork 获取网络类型
func (u *UdpConnection) GetNetwork() string {
return u.server.config.Network
}
//GetConnId 获取当前连接ID
func (u *UdpConnection) GetConnId() uint64 {
return u.connId
}
//GetRemoteAddr 获取远程客户端地址信息
func (u *UdpConnection) GetRemoteAddr() net.Addr {
return u.remoteAddr
}
//GetLocalAddr 获取本地地址信息
func (u *UdpConnection) GetLocalAddr() net.Addr {
return u.conn.LocalAddr()
}
//CallLogHandle 调用异常处理
func (u *UdpConnection) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
u.server.CallLogHandle(level, msgAry)
}
//GetIsClosed 获取的状态[脏读](ture:关闭状态,false:未关闭)
func (u *UdpConnection) GetIsClosed() bool {
return u.isClosed
}
//Incr 连接提供给业务作为流水号使用,循环累加,(val 为正数为递增,val为负数为递减,val为0则获取值)
func (u *UdpConnection) Incr(val int64) int64 {
return atomic.AddInt64(&u.incr, val)
}
//Start 启动连接,让当前连接开始工作[只会调用一次]
func (u *UdpConnection) Start() {
if u.isClosed {
u.isClosed = false
u.server.CallOnConnStart(u)
}
}
//Stop 停止连接,结束当前连接状态
func (u *UdpConnection) Stop() {
if u.isClosed {
return
}
u.connLock.Lock()
if u.isClosed {
u.connLock.Unlock()
return
}
u.isClosed = true
u.connLock.Unlock()
//发出关闭通知
u.server.CallOnConnStop(u)
//将链接从连接管理器中删除
u.server.GetConnMgr().Remove(u)
}
//SendData 发送数据给远程的TCP客户端
//Data 下发数据
//CmdCode 指令标识[如: rep 普通回复, cmd 用户操作下发 。。]
func (u *UdpConnection) SendData(data []byte, cmdCode string) error {
return u.SendDataCall(data, cmdCode, nil, nil)
}
//SendDataCall 发送数据给远程的UDP客户端(带参数和回调)
//Data 下发数据
//Param 下发需要回调携带参数
//CmdCode 指令标识[如: rep 普通回复, cmd 用户操作下发 。。]
//CallFunc 下发后回调函数
func (u *UdpConnection) SendDataCall(data []byte, cmdCode string, param interface{}, callFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) error {
if u.isClosed {
return netInterface.ClosedErr
}
//更新发送时间
u.sendTime = time.Now()
//对象池获取
response := newReplyTask()
response.ConnId = u.connId
response.Data = data
response.CallFunc = callFunc
response.CmdCode = cmdCode
response.Param = param
response.RunReplyTask = u.runReplyTask
if !u.server.config.OverflowDiscard {
u.server.replyHandle.SendToTaskQueueWait(response)
return nil
}
err := u.server.replyHandle.SendToTaskQueue(response)
if err != nil {
u.server.CallLogHandle(netInterface.Warn, "发送队列已满", err)
}
return err
}
//runReplyTask 运行回复任务
func (u *UdpConnection) runReplyTask(replyTask *replyTask) {
defer func() {
if r := recover(); r != nil {
u.CallLogHandle(netInterface.Error, "[udp]运行回复任务异常:", r)
}
}()
count := 0
var tempDelay time.Duration
sendData := replyTask.Data
RETRY: //遇到临时错误重试发送
//设置发送超时时间
err := u.conn.SetWriteDeadline(time.Now().Add(u.server.config.SendOutTime))
if err != nil {
u.replyNotice(replyTask, false, err)
return
}
//发送数据[有异常信息或者发送成功数据和要发的数据不一致]
if _, err := u.conn.WriteTo(sendData, u.remoteAddr);err != nil {
count++
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if count <= u.server.config.SendRetryCount {
if count == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if max := 10 * time.Millisecond; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
goto RETRY //临时错误重试
}
}
u.replyNotice(replyTask, false, err)
return
}
u.replyNotice(replyTask, true, nil)
}
//replyNotice 数据发送后通知
func (u *UdpConnection) replyNotice(replyTask *replyTask, isOk bool, err error) {
u.sendTime = time.Now()
if replyTask.CallFunc != nil {
replyTask.CallFunc(u, replyTask.Data, isOk, replyTask.CmdCode, replyTask.Param, err)
}
if isOk {
u.repTotalByteSize += uint64(len(replyTask.Data))
u.repPackCount++
} else {
u.repPackErrCount++
}
u.server.CallOnReply(u, replyTask.Data, isOk, replyTask.CmdCode, replyTask.Param, err)
}
func (u *UdpConnection) sendCallFunc(connection netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) {
if isOk {
//成功记录发送次数,发送字节数
u.repPackCount++
u.repTotalByteSize += uint64(len(data))
} else {
u.repPackErrCount++
}
u.server.CallOnReply(connection, data, isOk, cmdCode, param, err)
}
//SetProperty 设置链接属性
func (u *UdpConnection) SetProperty(key string, value interface{}) {
u.connLock.Lock()
defer u.connLock.Unlock()
u.property[key] = value
}
//GetProperty 获取链接属性
func (u *UdpConnection) GetProperty(key string) (interface{}, error) {
u.connLock.RLock()
defer u.connLock.RUnlock()
if value, ok := u.property[key]; ok {
return value, nil
} else {
return nil, netInterface.NotKey
}
}
//RemoveProperty 移除链接属性
func (u *UdpConnection) RemoveProperty(key string) {
u.connLock.Lock()
defer u.connLock.Unlock()
delete(u.property, key)
}
//GetPropertyKeys 获取所有属性key
func (u *UdpConnection) GetPropertyKeys() []string {
u.connLock.RLock()
defer u.connLock.RUnlock()
propertyAry := make([]string, 0, len(u.property))
for key := range u.property {
propertyAry = append(propertyAry, key)
}
return propertyAry
}
//GetRecInfo 上行当前处理的包总数(处理前,1开始),总大小(字节)
func (u *UdpConnection) GetRecInfo() (count, byteSize uint64) {
return u.recPackCount, u.recTotalByteSize
}
//GetRepInfo 下行当前处理的包总数(处理后),总大小(字节)
func (u *UdpConnection) GetRepInfo() (count, byteSize, errCount uint64) {
return u.repPackCount, u.repTotalByteSize, u.repPackErrCount
}
//GetHeartTime 连接最后一次接收数据时间
func (u *UdpConnection) GetHeartTime() time.Time {
return u.heartTime
}
//GetSendTime 连接最后一次发送数据时间
func (u *UdpConnection) GetSendTime() time.Time {
return u.sendTime
}
//GetStartTime 连接建立时间
func (u *UdpConnection) GetStartTime() time.Time {
return u.startTime
}
// onCompleted Udp 不需要分包
func (u *UdpConnection) onCompleted(receive *receiveUdpTask) {
defer func() {
if r := recover(); r != nil {
u.CallLogHandle(netInterface.Error, "[udp]数据处理异常:", r)
}
}()
var hData []byte
if u.server.config.HDataCache {
hData = receive.Data[:receive.Count]
} else {
hData = make([]byte, receive.Count)
copy(hData, receive.Data[:receive.Count])
}
u.OnReceive(hData)
}
//OnReceive 数据上传完整的一包处理
func (u *UdpConnection) OnReceive(data []byte) {
//添加包个数
u.recPackCount++
//添加处理字节数
u.recTotalByteSize += uint64(len(data))
//更新心跳时间
u.heartTime = time.Now()
u.server.CallOnReceive(u, data)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。