代码拉取完成,页面将自动刷新
package netService
import (
"fmt"
"gitee.com/ling-bin/go-utils/pools"
"gitee.com/ling-bin/go-utils/timingwheel"
"gitee.com/ling-bin/network/netInterface"
"hash/fnv"
"log"
"sync"
"time"
"net"
"sync/atomic"
)
var (
timingWhee *timingwheel.TimingWheel //多层时间轮
)
// 初始化
func init() {
timingWhee = timingwheel.NewTimingWheel(time.Second, 60)
timingWhee.Start()
}
// UdpService 接口实现,定义一个Server服务类
type UdpService struct {
connId uint64 //连接ID累计
connMgr netInterface.IConnManager //当前Server的链接管理器
listenMap sync.Map //监听对象
onLogHandle func(level netInterface.ErrLevel, msg ...interface{}) //设置异常处理
onConnStart func(conn netInterface.IConnection) //该Server的连接创建时Hook函数
onConnStop func(conn netInterface.IConnection) //该Server的连接断开时的Hook函数
onReceive func(conn netInterface.IConnection, data []byte) //数据上传完成
onReply func(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) //数据下发完成
exitChan chan bool //告知该链接已经退出/停止的channel(管道)
handleStrategy func(conn netInterface.IConnection, data []byte) *netInterface.StrategyData //处理策略,TCP和UDP表现不一样,tcp 只会在连接上第一包数据时调用,udp 会在每一包数上次都调用
bufferPool *pools.BufferPool //缓存管理器
connIdAddrMap sync.Map // 客户端连接地址映射
config *Config //配置
receiveHandler pools.ITaskWorkerPool //当前Server的消息管理模块(工作池)
replyHandle pools.ITaskWorkerPool //消息发送处理器(工作池)
receiveUdpTaskPool *receiveUdpTaskPool //udp数据接收任务池
startTime time.Time //连接启动时间
isStart bool //是否启动[true 启动,false 未启动]
}
// NewUdpService 创建UDP服务
func NewUdpService(config *Config) netInterface.IService {
uSer := &UdpService{
config: config,
connMgr: NewConnManager(), //创建ConnManager
exitChan: make(chan bool, 1), //通知退出消息
receiveHandler: pools.NewTaskWorkerPool("数据接收处理器", config.ReceiveWorkerSize, config.ReceiveTaskQueueSize),
replyHandle: pools.NewTaskWorkerPool("数据回复处理器", config.ReplyWorkerSize, config.ReplyTaskQueueSize),
receiveUdpTaskPool: newReceiveUdpTaskPool(uint(config.BufferSize)),
}
return uSer
}
// GetConnMgr 得到链接管理
func (s *UdpService) GetConnMgr() netInterface.IConnManager {
return s.connMgr
}
// GetConn 获取连接
func (s *UdpService) GetConn(connId uint64) (netInterface.IConnection, bool) {
return s.GetConnMgr().Get(connId)
}
// GetStartTime 获取连接启动时间
func (s *UdpService) GetStartTime() time.Time {
return s.startTime
}
// SetLogHandle 内部日志输出
func (s *UdpService) SetLogHandle(hookFunc func(level netInterface.ErrLevel, msg ...interface{})) {
s.onLogHandle = hookFunc
}
// SetOnConnStart 设置该Server的连接创建时Hook函数
func (s *UdpService) SetOnConnStart(hookFunc func(netInterface.IConnection)) {
s.onConnStart = hookFunc
}
// SetOnConnStop 设置该Server的连接断开时的Hook函数
func (s *UdpService) SetOnConnStop(hookFunc func(netInterface.IConnection)) {
s.onConnStop = hookFunc
}
// SetOnReceive 数据上传完成处理函数[分包后]
func (s *UdpService) SetOnReceive(hookFunc func(netInterface.IConnection, []byte)) {
s.onReceive = hookFunc
}
// SetOnReply 数据下发后回调
func (s *UdpService) SetOnReply(hookFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) {
s.onReply = hookFunc
}
// CallOnReply 下发后回调
func (s *UdpService) CallOnReply(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) {
if s.onReply != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[UDP]下发后回调业务逻辑异常:", r)
}
}()
s.onReply(conn, data, isOk, cmdCode, param, err)
}
}
// CallOnReceive 数据上传完成回调
func (s *UdpService) CallOnReceive(conn netInterface.IConnection, data []byte) {
if s.onReceive != nil {
s.onReceive(conn, data)
}
}
// SetHandleStrategy 设置处理策略
func (s *UdpService) SetHandleStrategy(hookFunc func(netInterface.IConnection, []byte) *netInterface.StrategyData) {
s.handleStrategy = hookFunc
}
// CallLogHandle 错误消息处理
func (s *UdpService) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
if s.onLogHandle != nil {
defer func() {
if r := recover(); r != nil {
log.Println("[UDP]CallLogHandle 错误消息处理调用业务逻辑异常:", r)
}
}()
s.onLogHandle(level, msgAry)
}
}
// CallOnConnStart 调用连接OnConnStart Hook函数
func (s *UdpService) CallOnConnStart(conn netInterface.IConnection) {
if s.onConnStart != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[UDP]调用开始连接业务逻辑异常:", r)
}
}()
s.onConnStart(conn)
}
}
// CallOnConnStop 调用连接OnConnStop Hook函数
func (s *UdpService) CallOnConnStop(conn netInterface.IConnection) {
property, err := conn.GetProperty(connKey)
if err == nil {
load, ok := s.connIdAddrMap.Load(property)
if ok {
odlConn := load.(netInterface.IConnection)
if odlConn.GetConnId() == conn.GetConnId() {
s.connIdAddrMap.Delete(property)
}
}
}
if s.onConnStop != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[UDP]调用断开连接业务逻辑异常:", r)
}
}()
s.onConnStop(conn)
}
}
// GetIsStart 获取是否启动[true 启动,false 未启动]
func (s *UdpService) GetIsStart() bool {
return s.isStart
}
// Start 开启网络服务
func (s *UdpService) Start() {
if s.isStart {
return
}
s.isStart = true
//消息处理工作池
s.receiveHandler.StartWorkerPool(func(errString string) {
s.CallLogHandle(netInterface.Error, fmt.Sprint("消息处理工作池:", errString))
})
//消息发送工作池
s.replyHandle.StartWorkerPool(func(errString string) {
s.CallLogHandle(netInterface.Error, fmt.Sprint("消息发送工作池:", errString))
})
s.startTime = time.Now()
//开启监听
var wg sync.WaitGroup
wg.Add(len(s.config.AddrAry))
for _, addr := range s.config.AddrAry {
go s.listenAddr(addr, &wg)
}
wg.Wait()
}
// listenAddr 开启监听host
func (s *UdpService) listenAddr(addrConfig *AddrConfig, wg *sync.WaitGroup) {
s.CallLogHandle(netInterface.Info, "[开启] 服务监听 [", s.config.Network, "]地址[", addrConfig.Addr, "]")
//2 监听服务器地址
udpAddr, err := net.ResolveUDPAddr(s.config.Network, addrConfig.Addr)
if err != nil {
wg.Done()
s.CallLogHandle(netInterface.Fatal, "地址转换[", s.config.Network, "][", addrConfig.Addr, "]错误:", err)
return
}
uConn, err := net.ListenUDP(s.config.Network, udpAddr)
if err != nil {
wg.Done()
s.CallLogHandle(netInterface.Fatal, "监听[", s.config.Network, "][", addrConfig.Addr, "]错误:", err)
panic(fmt.Sprint("监听[", s.config.Network, "][", addrConfig.Addr, "]错误:", err))
return
}
s.listenMap.Store(uConn.LocalAddr().String(), uConn)
//已经监听成功
s.CallLogHandle(netInterface.Info, "服务开启成功 [", s.config.Network, "]地址[", addrConfig.Addr, "]正在监听中...")
wg.Done()
//3 启动server网络连接业务
s.startReader(uConn)
}
// startReader处理conn读数据的Goroutine
func (s *UdpService) startReader(uConn *net.UDPConn) {
defer func() {
uConn.Close()
s.listenMap.Delete(uConn.LocalAddr().String())
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[udp]连接读取数据异常:", r)
}
}()
tempDelay := time.Millisecond * 0
maxDelay := 1 * time.Second
for {
/*
*Udp在分配的缓存不足时表现
*windows 系统直接报异常,要不全部不接收,要不全部接收
*liunx 系统会出现接收不全,只接收到了分配的缓存,其它会丢弃
*/
receive := s.receiveUdpTaskPool.Get()
count, addr, err := uConn.ReadFromUDP(receive.Data)
if err != nil {
s.receiveUdpTaskPool.Put(receive)
if ne, ok := err.(net.Error); ok && (ne.Temporary() || ne.Timeout()) {
if tempDelay == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if tempDelay > maxDelay {
tempDelay = maxDelay
}
s.CallLogHandle(netInterface.Error, "[Udp]客户端上传数据临时异常: ", err)
time.Sleep(tempDelay)
continue
}
if count > 0 {
s.CallLogHandle(netInterface.Fatal, "设置缓冲区大小[", s.config.BufferSize, "][Udp]致命异常:", err)
continue
}
s.CallLogHandle(netInterface.Fatal, "[Udp]致命异常:", err)
continue
}
receive.Conn = uConn
receive.RemoteAddr = addr
receive.Count = count
receive.OnCompleted = s.handleConn
receive.ConnId = hashCode(addr.String()) //远程地址hashCode
if !s.config.OverflowDiscard {
s.receiveHandler.SendToTaskQueueWait(receive)
} else {
err := s.receiveHandler.SendToTaskQueue(receive)
if err != nil {
s.CallLogHandle(netInterface.Fatal, "[UDP处理队列缓存池满]", err)
}
}
tempDelay = 0
}
}
// hashCode 获取字符串hashCode
func hashCode(s string) uint64 {
h := fnv.New64a()
h.Write([]byte(s))
return h.Sum64()
}
// handleConn 处理连接数据
func (s *UdpService) handleConn(receive *receiveUdpTask) {
defer s.receiveUdpTaskPool.Put(receive) //处理完成回收
key := fmt.Sprint(receive.Conn.LocalAddr(), "#", receive.RemoteAddr)
var connection *UdpConnection
conn, ok := s.connIdAddrMap.Load(key)
if !ok {
newConnId := atomic.AddUint64(&s.connId, 1)
newConnection := NewUdpConnection(s, receive.Conn, newConnId, receive.RemoteAddr)
newConnection.SetProperty(connKey, key)
//启动新连接
newConnection.Start()
s.connIdAddrMap.Store(key, newConnection)
connection = newConnection
} else {
connection = conn.(*UdpConnection)
}
if s.handleStrategy != nil {
strategyInfo := s.handleStrategy(connection, receive.Data[:receive.Count])
if strategyInfo == nil || len(strategyInfo.Key) == 0 {
s.Stop()
return
}
property, err := connection.GetProperty(HKey)
if err == nil {
if strategyInfo.Key != property {
newConnId := atomic.AddUint64(&s.connId, 1)
newConnection := NewUdpConnection(s, receive.Conn, newConnId, receive.RemoteAddr)
newConnection.SetProperty(connKey, key)
newConnection.SetProperty(HKey, strategyInfo.Key)
if len(strategyInfo.ExtData) != 0 {
for key, val := range strategyInfo.ExtData {
newConnection.SetProperty(key, val)
}
}
//启动新连接
newConnection.Start()
s.connIdAddrMap.Store(key, newConnection)
//停止旧连接
connection.Stop()
connection = newConnection
}
} else {
connection.SetProperty(HKey, strategyInfo.Key)
if len(strategyInfo.ExtData) != 0 {
for key, val := range strategyInfo.ExtData {
connection.SetProperty(key, val)
}
}
}
}
//调用连接的处理数据方法
connection.onCompleted(receive)
}
// Stop 停止服务
func (s *UdpService) Stop() {
if !s.isStart {
return
}
s.isStart = false
//停止监听
s.listenMap.Range(func(key, value interface{}) bool {
listen := value.(*net.UDPConn)
listen.Close()
s.CallLogHandle(netInterface.Info, "[udp-Stop]监听服务停止:", key)
s.listenMap.Delete(listen.LocalAddr().String())
return true
})
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
s.connMgr.ClearConn()
//处理工作池
s.receiveHandler.StopWorkerPool(false)
//关闭回发工作池
s.replyHandle.StopWorkerPool(false)
}
// ToMap 获取内部日志
func (s *UdpService) ToMap() map[string]int64 {
return map[string]int64{
"ReceiveQueueCount": s.receiveHandler.GetTotalTaskQueue(),
"ReceiveHandleCount": s.receiveHandler.GetTotalHandleCount(),
"ReplyQueueCount": s.replyHandle.GetTotalTaskQueue(),
"ReplyHandleCount": s.replyHandle.GetTotalHandleCount(),
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。