7 Star 53 Fork 26

ryanduan / wsPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
hub.go 5.79 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package wsPool
import (
"errors"
"gitee.com/rczweb/wsPool/util/gmap"
"gitee.com/rczweb/wsPool/util/queue"
"log"
"time"
)
var (
//最大连接池缓冲处理连接对像管道长度
Max_client_channel_len = 10240
//最大全局广播缓冲处理管道长度
Max_broadcastQueue_len = 4096
//最大频道广播缓冲处理管道长度
Max_chanBroadcastQueue_len = 4096
//最大接收消息缓冲处理管道长度
Max_recvCh_len = 10240
//最大发送消息缓冲处理管道长度
Max_sendCh_len = 10240
)
type broadcastMessage struct {
Channel string
Msg *Message
}
//重新连接需要处理的消息(缓存上次未来得能处理发送消息channel中的消息,60秒后原ws未连接消息失效)
type oldMsg struct {
list *queue.PriorityQueue
Expiration time.Time //过期时间
}
// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type hub struct {
// Registered clients.
clients *gmap.StrAnyMap //map[string]*Client// //新的处理方式有没有线程效率会更高,所以SafeMap的锁处理都去掉了
oldClients *gmap.StrAnyMap //缓存断开的连接消息队列
// Inbound messages from the clients.
//可以用于广播所有连接对象 //广播指定频道的管道 频道Channel=""时广播所有
broadcastQueue chan *broadcastMessage
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan string
clearOldClient chan *Client
}
func newHub() *hub {
return &hub{
register: make(chan *Client, Max_client_channel_len),
unregister: make(chan string, Max_client_channel_len),
clearOldClient: make(chan *Client, Max_client_channel_len),
clients: gmap.NewStrAnyMap(true), //make(map[string]*Client),//
oldClients: gmap.NewStrAnyMap(true), //make(map[string]*Client),//
broadcastQueue: make(chan *broadcastMessage, Max_broadcastQueue_len),
}
}
func (h *hub) run() {
loop:
for {
select {
case id, ok := <-h.unregister:
if !ok {
break loop
}
c := h.clients.Get(id)
if c != nil {
client := c.(*Client)
h.clients.Remove(id)
//把连接对像缓存在旧对像列表中,并设置连接断开的时间,过期未连接就会清理对像
client.CloseTime = time.Now()
h.oldClients.Set(id, client)
}
log.Println("取消注册ws连接对象:", id, "连接总数:", h.clients.Size())
case client, ok := <-h.clearOldClient:
/*清理缓存的旧的连接对像*/
if !ok {
break loop
}
close(client.recvCh)
close(client.sendCh)
close(client.recvPing)
close(client.sendPing)
client.grpool.Close()
h.oldClients.Remove(client.Id)
log.Println("己断开的ws连接缓存对象:", client.Id, "旧连接总数:", h.oldClients.Size())
case client, ok := <-h.register:
if !ok {
break loop
}
log.Println("注册ws连接对象:", client.Id, "连接总数:", h.clients.Size())
h.clients.Set(client.Id, client)
case broadcastMsg, ok := <-h.broadcastQueue:
if !ok {
break loop
}
if broadcastMsg.Channel == "" {
h.clients.Iterator(func(id string, v interface{}) bool {
if v != nil {
client := v.(*Client)
client.send(broadcastMsg.Msg)
}
return true
})
} else {
//广播指定频道的消息处理
h.clients.Iterator(func(id string, v interface{}) bool {
if v != nil {
client := v.(*Client)
if searchStrArray(client.channel, broadcastMsg.Channel) {
client.send(broadcastMsg.Msg)
}
}
return true
})
}
}
}
}
func (h *hub) ticker() {
//定时清理清理缓存的旧的连接对像
ticket := time.NewTimer(time.Second * 30)
select {
case <-ticket.C:
if h.oldClients.Size() > 0 {
h.oldClients.Iterator(func(k string, v interface{}) bool {
if v != nil {
client := v.(*Client)
if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
//3分钟后清理组存中的旧连接对像
h.clearOldClient <- client
}
}
return true
})
}
}
//定时清理清理缓存的旧的连接对像
/*gtimer.AddSingleton(30*time.Second, func() {
if h.oldClients.Size() > 0 {
h.oldClients.Iterator(func(k string, v interface{}) bool {
if v != nil {
client := v.(*Client)
if time.Now().Add(-180 * time.Second).After(client.CloseTime) {
//3分钟后清理组存中的旧连接对像
h.clearOldClient <- client
}
}
return true
})
}
})*/
}
func (h *hub) AddClient(client *Client) error {
timeout := time.NewTimer(time.Second * 3)
defer timeout.Stop()
select {
case h.register <- client:
return nil
case <-timeout.C:
return errors.New("AddClient register消息管道blocked,写入消息超时")
}
}
func (h *hub) AddOldClient(client *Client) error {
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.clearOldClient <- client:
return nil
case <-timeout.C:
return errors.New("AddClient register消息管道blocked,写入消息超时")
}
}
func (h *hub) RemoveClient(clientId string) error {
timeout := time.NewTimer(time.Second * 1)
defer timeout.Stop()
select {
case h.unregister <- clientId:
return nil
case <-timeout.C:
return errors.New(" RemoveClient unregister消息管道blocked,写入消息超时")
}
}
func (h *hub) Broadcast(channel string, msg *Message) error {
timeout := time.NewTimer(time.Millisecond * 800)
defer timeout.Stop()
select {
case wsSever.hub.broadcastQueue <- &broadcastMessage{Channel: channel, Msg: msg}:
return nil
case <-timeout.C:
return errors.New("hub.broadcastQueue 消息管道blocked,写入消息超时,管道长度:" + string(len(wsSever.hub.broadcastQueue)))
}
}
Go
1
https://gitee.com/rczweb/wsPool.git
git@gitee.com:rczweb/wsPool.git
rczweb
wsPool
wsPool
v1.4.5

搜索帮助