7 Star 53 Fork 26

ryanduan / wsPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 8.93 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wsPool
import (
"errors"
"gitee.com/rczweb/wsPool/util/grpool"
"github.com/gorilla/websocket"
"net/http"
"time"
)
const (
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10 //1 * time.Second//(pongWait * 9) / 10
//var pingPeriod =27* time.Second //1 * time.Second//(pongWait * 9) / 10
// Time allowed to write a message to the peer.
writeWait = 30 * time.Second
// Maximum message size allowed from peer.
maxMessageSize = 1024 * 1024 * 20
)
var upgrader = websocket.Upgrader{
//ReadBufferSize: 1024 * 1024,
//WriteBufferSize: 1024 * 1024,
// 默认允许WebSocket请求跨域,权限控制可以由业务层自己负责,灵活度更高
CheckOrigin: func(r *http.Request) bool {
return true
},
}
/*连接参数结构体*/
type Config struct {
Id string //标识连接的名称
Type string //连接类型或path
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
Goroutine int //每个连接开启的go程数里 默认为10
}
type RuntimeInfo struct {
Id string //标识连接的名称
Type string //连接类型或path
Ip string
Channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
OpenTime time.Time //连接打开时间
LastReceiveTime time.Time //最后一次接收到数据的时间
LastSendTime time.Time //最后一次发送数据的时间
}
/*//接收消息结构
messageType=1 为string
messageType=2 为[]byte*/
type Message struct {
MsgType int
Message []byte
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *hub
// The websocket connection.
conn *websocket.Conn
types string //连接类型或path
openTime time.Time //连接打开时间
CloseTime time.Time //连接断开的时间
lastReceiveTime time.Time //最后一次接收到数据的时间
lastSendTime time.Time //最后一次发送数据的时间
Id string //标识连接的名称
IsClose chan bool //连接的状态。true为关闭
channel []string //连接注册频道类型方便广播等操作。做为一个数组存储。因为一个连接可以属多个频道
// Buffered channel of outbound messages.
grpool *grpool.Pool
sendCh chan *Message //发送消息的缓冲管首
recvCh chan *Message //接收消息的缓冲管首
recvPing chan int //收到ping的存储管道,方便回复pong处理
sendPing chan int //发送ping的存储管道,方便收到pong处理下次发ping
//ticker *time.Ticker //定时发送ping的定时器
onError func(error)
onOpen func() //连接成功的回调
onPing func() //收到ping
onPong func() //收到pong
onMessage func([]byte)
onMessageString func(string)
onClose func()
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
dump()
c.close()
}()
for {
select {
case <-c.IsClose:
return
default:
msgType, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseAbnormalClosure,
websocket.CloseGoingAway,
websocket.CloseProtocolError,
websocket.CloseUnsupportedData,
websocket.CloseNoStatusReceived,
websocket.CloseAbnormalClosure,
websocket.CloseInvalidFramePayloadData,
websocket.ClosePolicyViolation,
websocket.CloseMessageTooBig,
websocket.CloseMandatoryExtension,
websocket.CloseInternalServerErr,
websocket.CloseServiceRestart,
websocket.CloseTryAgainLater,
websocket.CloseTLSHandshake) {
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage Is Unexpected Close Error:" + err.Error()))
//c.closeChan<-true;
return
}
c.onError(errors.New("连接ID:" + c.Id + "ReadMessage other error:" + err.Error()))
//c.closeChan<-true;
return
}
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.lastReceiveTime = time.Now()
c.readMessage(&Message{MsgType: msgType, Message: message})
}
}
}
// 读取消息写管道缓冲区
func (c *Client) readMessage(msg *Message) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case <-c.IsClose:
c.onError(errors.New("readMessage连接" + c.Id + ",连接己在关闭,不进行消息接收"))
return
case c.recvCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("recvCh 消息管道blocked,写入消息超时,管道长度:" + string(len(c.recvCh))))
return
}
}
// 单个连接接收消息
func (c *Client) recvMessage() {
defer func() {
dump()
}()
loop:
for {
select {
case <-c.IsClose:
return
case data, ok := <-c.recvCh:
if !ok {
break loop
}
/* //ToClientId与Channel不能同时存在!!!注意!!!!
if message.ToClientId!="" {
Send(message)
}
//ToClientId与Channel不能同时存在!!!注意!!!!
if message.Channel!="" {
Broadcast(message)
}*/
//收到消息触发回调
switch data.MsgType {
case websocket.BinaryMessage:
c.onMessage(data.Message)
break
case websocket.TextMessage:
c.onMessageString(string(data.Message))
break
}
/*c.grpool.Add(func() {
c.onMessage(data)
})*/
}
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
defer func() {
select {
case <-c.IsClose:
return
default:
close(c.IsClose)
}
dump()
}()
loop:
for {
select {
case <-c.IsClose:
return
case d, ok := <-c.sendCh:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
//说明管道己经关闭
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
//glog.Error("连接ID:"+c.Id,"wsServer发送消息失败,一般是连接channel已经被关闭:(此处服务端会断开连接,使客户端能够感知进行重连)")
return
}
w, err := c.conn.NextWriter(d.MsgType)
if err != nil {
return
}
c.lastSendTime = time.Now()
_, err = w.Write(d.Message)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写消息进写入IO错误!连接中断" + err.Error()))
return
}
/*// Add queued chat messages to the current websocket message.
n := len(c.sendCh)
if n > 0 {
for i := 0; i < n; i++ {
_, err = w.Write(<-c.sendCh)
if err != nil {
c.onError(errors.New("连接ID:" + c.Id + "写上次连接未发送的消息消息进写入IO错误!连接中断" + err.Error()))
return
}
}
}
*/
//关闭写入io对象
if err := w.Close(); err != nil {
c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
return
}
case p, ok := <-c.sendPing: //定时发送ping
if !ok {
break loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
c.onError(errors.New("连接ID:" + c.Id + "关闭写入IO对象出错,连接中断" + err.Error()))
return
}
}
case p, ok := <-c.recvPing:
if !ok {
break loop
}
if p == 1 {
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PongMessage, nil); err != nil {
c.onError(errors.New("回复客户端PongMessage出现异常:" + err.Error()))
return
}
}
}
}
}
func (c *Client) send(msg *Message) {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case <-c.IsClose:
c.onError(errors.New("send连接" + c.Id + ",连接己在关闭,消息发送失败"))
return
case c.sendCh <- msg:
return
case <-timeout.C:
c.onError(errors.New("sendCh消息管道blocked,写入消息超时,管道长度:" + string(len(c.sendCh))))
return
}
//c.sendCh<-msg
}
func (c *Client) close() {
c.conn.Close()
//触发连接关闭的事件回调
c.onClose() //先执行完关闭回调,再请空所有的回调
c.OnError(nil)
c.OnOpen(nil)
c.OnMessage(nil)
c.OnClose(nil)
c.hub.RemoveClient(c.Id)
select {
case <-c.IsClose:
return
default:
close(c.IsClose)
}
}
Go
1
https://gitee.com/rczweb/wsPool.git
git@gitee.com:rczweb/wsPool.git
rczweb
wsPool
wsPool
v1.4.5

搜索帮助