代码拉取完成,页面将自动刷新
package netService
import (
"errors"
"gitee.com/ling-bin/go-utils/pools"
"gitee.com/ling-bin/netwebSocket/netInterface"
"github.com/gorilla/websocket"
"net"
"sync"
"sync/atomic"
"time"
)
//连接
type Connection struct {
server *Service //当前属于那个server
Conn *websocket.Conn //当前连接的ws
ConnID uint64 //连接id
isClosed int32 //当前连接的关闭状态 [ 0:关闭,1:开 ]
property map[string]interface{} //绑定属性
propertyLock sync.RWMutex //保护连接属性
heartTime time.Time //心跳时间(每包更新)
startTime time.Time //连接启动时间
receiveHandler pools.ITaskWorkerPool //当前Server的消息管理模块(工作池)
replyHandle pools.ITaskWorkerPool //消息发送处理器(工作池)
config *Config //配置
recPackCount uint64 //上行当前处理到的包数
recTotalByteSize uint64 //上行总大小(字节)
repPackCount uint64 //下行总包个数
repTotalByteSize uint64 //下行总大小(字节)
repPackErrCount uint64 //下发异常包个数
path string //请求路径
}
//初始化连接方法
func NewConnection(server *Service, conn *websocket.Conn, connID uint64,path string,receiveHandler pools.ITaskWorkerPool,replyHandle pools.ITaskWorkerPool,config *Config) *Connection {
c := &Connection{
server: server,
Conn: conn,
ConnID: connID,
isClosed: 0,
property: make(map[string]interface{}),
receiveHandler: receiveHandler,
replyHandle: replyHandle,
config: config,
heartTime: time.Now(),
path: path,
}
//将当前连接放入connmgr
c.server.GetConnMgr().Add(c)
return c
}
//获取连接
func (c *Connection) GetNetConn() interface{} {
return c.Conn
}
// GetNetwork 获取网络类型
func (c *Connection) GetScheme() string {
return c.config.Scheme
}
//获取客户端ID
func (c *Connection) GetConnId() uint64{
return c.ConnID
}
//获取远程客户端地址信息
func (c *Connection)GetRemoteAddr() net.Addr{
return c.Conn.RemoteAddr()
}
//获取本地地址
func (c *Connection) GetLocalAddr() net.Addr {
return c.Conn.LocalAddr()
}
//心跳时间
func (c *Connection) GetHeartTime() time.Time {
return c.heartTime
}
//连接启动时间
func (c *Connection) GetStartTime() time.Time {
return c.startTime
}
//上行当前处理的包总数(处理前,1开始),总大小(字节)
func (c *Connection) GetRecInfo() (count,byteSize uint64){
return c.recPackCount,c.recTotalByteSize
}
//下行当前处理的包总数(处理后),总大小(字节)
func (c *Connection) GetRepInfo() (count,byteSize,errCount uint64) {
return c.repPackCount, c.repTotalByteSize, c.repPackErrCount
}
//直接将Message数据发送数据给远程的TCP客户端消息类型 1.TextMessage(文本) 2、BinaryMessage(二进制)
func (c *Connection) SendData(msgType int,data []byte,cmdCode string) error {
return c.SendDataCall(msgType,data,cmdCode,nil,nil)
}
//直接将Message数据发送数据给远程的TCP客户端(带参数和回调)消息类型 1.TextMessage(文本) 2、BinaryMessage(二进制)
func (c *Connection) SendDataCall(msgType int,data []byte,cmdCode string,param interface{},callFunc func(netInterface.IConnection, bool,string, interface{}, error)) error {
if c.isClosed == 0 {
return errors.New("连接关闭,不能发送消息")
}
reply := &ReplyTask{
conn: c.Conn,
connection: c,
data: data,
callFunc: callFunc,
completedCallFunc: c.sendCallFunc,
param: param,
msgType: msgType,
cmdCode: cmdCode,
}
var err error
if c.config.IsOutTime {
reply.duration = c.config.ReplyOutTime
}else {
reply.duration = 0
}
err = c.replyHandle.SendToTaskQueue(reply)
if err != nil {
c.server.CallLogHandle(netInterface.Warn, "发送队列已满", err)
}
return err
}
func (c *Connection) sendCallFunc(connection netInterface.IConnection,messageType int,data []byte,isOk bool,cmdCode string,param interface{},err error) {
if isOk {
c.repTotalByteSize += uint64(len(data))
c.repPackCount++
} else {
c.repPackErrCount++
}
c.server.CallOnReply(connection,messageType ,data, isOk,cmdCode, param, err)
}
//数据上传了完整一包的回调
func (c *Connection) OnReceive(messageType int,data []byte) {
//成功记录发送次数
c.recPackCount++
if c.recPackCount == 1 {
//第一包调用
c.server.CallOnOneReceive(c, messageType, data)
}
//更新心跳时间
c.heartTime = time.Now()
c.server.CallOnReceive(c, messageType, data)
}
//读业务
func (c *Connection) StartReader() {
defer c.Stop()
//读业务
for {
//读取数据到内存中 messageType:TextMessage/BinaryMessage
msgType, data, err := c.Conn.ReadMessage()
if err != nil {
break
}
receive := &ReceiveTask{
conn: c,
data: data,
msgType: msgType,
OnReceive: c.OnReceive,
}
//累加上行总字节数
c.recTotalByteSize += uint64(len(data))
c.heartTime = time.Now()
if c.config.IsOutTime {
receive.duration = c.config.ReceiveOutTime
}else {
receive.duration = 0
}
err = c.receiveHandler.SendToTaskQueue(receive)
if err != nil {
c.CallLogHandle(netInterface.Fatal,"[TCP处理队列缓存池满]" ,err)
}
}
}
//启动连接,让当前连接,开始工作
func (c *Connection) Start() {
if atomic.LoadInt32(&c.isClosed) == 1 {
return
}
atomic.StoreInt32(&c.isClosed, 1)
c.startTime = time.Now()
//根据官方文档 读与写只能开一个线程
//启动读数据业务
go c.StartReader()
//按照开发者传递的函数来,调用回调函数
c.server.CallOnConnStart(c)
}
//停止连接,结束当前连接工作
func (c *Connection) Stop() {
if atomic.LoadInt32(&c.isClosed) == 0 {
return
}
atomic.StoreInt32(&c.isClosed, 0)
//按照开发者传递的函数来,调用回调函数,注意在close之前调用
c.server.CallOnConnStop(c)
//关闭连接
c.Conn.Close()
//将conn在connmgr中删除
c.server.GetConnMgr().Remove(c)
}
//设置连接属性
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
c.property[key] = value
c.propertyLock.Unlock()
}
//获取连接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("connection getproperty get error key:" + key)
}
}
//移除设置属性
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
delete(c.property, key)
c.propertyLock.Unlock()
}
//获取所有属性key
func (c *Connection) GetPropertyKeys()[]string {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
propertys := make([]string, 0, 5)
for key, _ := range c.property {
propertys = append(propertys, key)
}
return propertys
}
//获取路径
func (c *Connection) GetPath() string {
return c.path
}
//是否关闭
func (c *Connection) GetIsClosed() bool {
return c.isClosed == 1
}
//调用异常处理
func (c *Connection) CallLogHandle(level netInterface.ErrLevel,msgAry ...interface{}){
c.server.CallLogHandle(level,msgAry)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。