1 Star 6 Fork 4

夏季的风/TCP-UDP网络组件

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
Receivers
apps
netClient
netInterface
netService
AcceptTask.go
Config.go
ConnManager.go
Connection.go
ReceiveUdpTask.go
ReplyTask.go
Service.go
UdpConnection.go
UdpService.go
.gitignore
LICENSE
README.md
go.mod
go.sum
克隆/下载
Service.go 9.82 KB
一键复制 编辑 原始数据 按行查看 历史
package netService
import (
"crypto/tls"
"fmt"
"gitee.com/ling-bin/go-utils/pools"
"gitee.com/ling-bin/network/netInterface"
"log"
"net"
"sync"
"sync/atomic"
"time"
)
// Service 接口实现,定义一个Server服务类
type Service struct {
connId uint64 //连接ID累计
connMgr netInterface.IConnManager //当前Server的链接管理器
listenMap sync.Map //监听对象
onLogHandle func(level netInterface.ErrLevel, msg ...interface{}) //设置异常处理
onConnStart func(conn netInterface.IConnection) //该Server的连接创建时Hook函数
onConnStop func(conn netInterface.IConnection) //该Server的连接断开时的Hook函数
onReceive func(conn netInterface.IConnection, data []byte) //数据上传完成
onReply func(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) //数据下发完成
handleStrategy func(conn netInterface.IConnection, data []byte) *netInterface.StrategyData //处理策略,TCP和UDP表现不一样,tcp 只会在连接上第一包数据时调用,udp 会在每一包数上次都调用
config *Config //配置
replyHandle pools.ITaskWorkerPool //消息发送处理器(工作池)
acceptHandle pools.ITaskWorkerPool //连接处理池
startTime time.Time //连接启动时间
bufferPool *pools.BufferPool //接收数据缓存池
isStart bool //是否启动[true 启动,false 未启动]
}
// NewService 实例化TCP服务类
func NewService(config *Config) netInterface.IService {
ser := &Service{
config: config,
connMgr: NewConnManager(), //创建ConnManager
isStart: false,
replyHandle: pools.NewTaskWorkerPool("数据回复处理器", config.ReplyWorkerSize, config.ReplyTaskQueueSize),
acceptHandle: pools.NewTaskWorkerPool("连接接收处理器", config.AcceptWorkerSize, config.AcceptTaskQueueSize),
bufferPool: pools.NewBufferPool(uint(config.BufferSize)),
}
return ser
}
//GetConnMgr 得到链接管理
func (s *Service) GetConnMgr() netInterface.IConnManager {
return s.connMgr
}
//GetConn 获取连接
func (s *Service) GetConn(connId uint64) (netInterface.IConnection, bool) {
return s.GetConnMgr().Get(connId)
}
//SetLogHandle 设置日志处理
func (s *Service) SetLogHandle(hookFunc func(level netInterface.ErrLevel, msg ...interface{})) {
s.onLogHandle = hookFunc
}
//SetOnConnStart 设置该Server的连接创建时Hook函数
func (s *Service) SetOnConnStart(hookFunc func(netInterface.IConnection)) {
s.onConnStart = hookFunc
}
//SetOnConnStop 设置该Server的连接断开时的Hook函数
func (s *Service) SetOnConnStop(hookFunc func(netInterface.IConnection)) {
s.onConnStop = hookFunc
}
//SetOnReceive 数据上传完成处理函数[分包后]
func (s *Service) SetOnReceive(hookFunc func(netInterface.IConnection, []byte)) {
s.onReceive = hookFunc
}
//SetOnReply 数据下发后回调
func (s *Service) SetOnReply(hookFunc func(netInterface.IConnection, []byte, bool, string, interface{}, error)) {
s.onReply = hookFunc
}
//CallOnReply 下发后回调
func (s *Service) CallOnReply(conn netInterface.IConnection, data []byte, isOk bool, cmdCode string, param interface{}, err error) {
if s.onReply != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[TCP]下发后回调业务逻辑异常:", r)
}
}()
s.onReply(conn, data, isOk, cmdCode, param, err)
}
}
// CallOnReceive 数据上传完成回调
func (s *Service) CallOnReceive(conn netInterface.IConnection, data []byte) {
if s.onReceive != nil {
s.onReceive(conn, data)
}
}
//SetHandleStrategy 设置处理策略
func (s *Service) SetHandleStrategy(hookFunc func(netInterface.IConnection, []byte) *netInterface.StrategyData) {
s.handleStrategy = hookFunc
}
//CallLogHandle 错误消息处理
func (s *Service) CallLogHandle(level netInterface.ErrLevel, msgAry ...interface{}) {
if s.onLogHandle != nil {
defer func() {
if r := recover(); r != nil {
log.Println("[tcp]CallLogHandle 错误消息处理调用业务逻辑异常:",r)
}
}()
s.onLogHandle(level, msgAry)
}
}
//CallOnConnStart 调用连接OnConnStart Hook函数
func (s *Service) CallOnConnStart(conn netInterface.IConnection) {
if s.onConnStart != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[tcp]调用开始连接业务逻辑异常:", r)
}
}()
s.onConnStart(conn)
}
}
//CallOnConnStop 调用连接OnConnStop Hook函数
func (s *Service) CallOnConnStop(conn netInterface.IConnection) {
if s.onConnStop != nil {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[tcp]调用断开连接业务逻辑异常:", r)
}
}()
s.onConnStop(conn)
}
}
//GetStartTime 获取连接启动时间
func (s *Service) GetStartTime() time.Time {
return s.startTime
}
//GetIsStart 获取是否启动[true 启动,false 未启动]
func (s *Service) GetIsStart() bool {
return s.isStart
}
//Start 开启网络服务
func (s *Service) Start() {
if s.isStart {
return
}
s.isStart = true
//消息发送工作池
s.replyHandle.StartWorkerPool(func(errString string) {
s.CallLogHandle(netInterface.Error,fmt.Sprint("消息发送工作池异常:",errString))
})
//连接接入工作池
s.acceptHandle.StartWorkerPool(func(errString string) {
s.CallLogHandle(netInterface.Error,fmt.Sprint("连接接入工作池:",errString))
})
s.startTime = time.Now()
//开启监听
var wg sync.WaitGroup
wg.Add(len(s.config.AddrAry))
for _, addrConfig := range s.config.AddrAry {
go s.listenAddr(addrConfig, &wg)
}
wg.Wait()
}
//listenAddr 开启监听host
func (s *Service) listenAddr(addrConfig *AddrConfig, wg *sync.WaitGroup) {
var (
listen net.Listener
err error
)
if !addrConfig.IsTls {
//监听服务器地址
listen, err = net.Listen(s.config.Network, addrConfig.Addr)
} else {
//监听服务器地址
listen, err = tls.Listen(s.config.Network, addrConfig.Addr, addrConfig.TlsConfig)
}
if err != nil {
wg.Done()
s.CallLogHandle(netInterface.Fatal, "监听[", s.config.Network, "]TLS[",addrConfig.IsTls,"][", addrConfig.Addr, "]错误:", err)
panic(fmt.Sprint("监听[", s.config.Network, "][", addrConfig.Addr, "]错误:", err))
return
}
//记录监听
s.listenMap.Store(listen.Addr().String(), listen)
//关闭监听
defer func() {
listen.Close()
s.listenMap.Delete(listen.Addr().String())
}()
//输出日志
s.CallLogHandle(netInterface.Info, "服务开启成功 [", s.config.Network, "]TLS[",addrConfig.IsTls,"]地址[", addrConfig.Addr, "]正在监听中...")
wg.Done()
tempDelay := time.Millisecond * 0
//3 启动server网络连接业务
for {
//3.1 阻塞等待客户端建立连接请求
conn, err := listen.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Microsecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.CallLogHandle(netInterface.Error, "[accept temp]客户端接入临时异常: ", err)
time.Sleep(tempDelay)
continue
}
s.CallLogHandle(netInterface.Error, "客户端接入异常: ", err)
break
}
accept := newAcceptTask()
accept.Conn = conn
accept.ConnId = atomic.AddUint64(&s.connId, 1)
accept.OnAccept = s.runAcceptTask
if !s.config.OverflowDiscard {
s.acceptHandle.SendToTaskQueueWait(accept)
} else {
err = s.acceptHandle.SendToTaskQueue(accept)
if err != nil {
s.CallLogHandle(netInterface.Warn, "连接接入队列已满:", err)
}
}
}
}
//runAcceptTask 运行接入任务
func (s *Service) runAcceptTask(accept *acceptTask) {
defer func() {
if r := recover(); r != nil {
s.CallLogHandle(netInterface.Error, "[tcp]连接接入处理异常:", r)
}
}()
dealConn := NewConnection(s, accept.Conn, accept.ConnId)
dealConn.Start()
}
//Stop 停止服务
func (s *Service) Stop() {
if !s.isStart {
return
}
s.isStart = false
//停止监听
s.listenMap.Range(func(key, value interface{}) bool {
listen := value.(net.Listener)
listen.Close()
s.CallLogHandle(netInterface.Info, "[tcp-Stop]监听服务停止:", key)
s.listenMap.Delete(listen.Addr().String())
return true
})
//连接接入工作池
s.acceptHandle.StopWorkerPool(false)
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
s.connMgr.ClearConn()
//消息发送工作池
s.replyHandle.StopWorkerPool(false)
}
//ToMap 获取内部日志
func (s *Service) ToMap() map[string]int64 {
return map[string]int64{
"AcceptQueueCount": s.acceptHandle.GetTotalTaskQueue(),
"AcceptHandleCount": s.acceptHandle.GetTotalHandleCount(),
"ReplyQueueCount": s.replyHandle.GetTotalTaskQueue(),
"ReplyHandleCount": s.replyHandle.GetTotalHandleCount(),
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ling-bin/network.git
git@gitee.com:ling-bin/network.git
ling-bin
network
TCP-UDP网络组件
v1.8.29

搜索帮助