1 Star 1 Fork 0

linngc / center.gf

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
websocket_client_manager.go 7.03 KB
一键复制 编辑 原始数据 按行查看 历史
// Package websocket
// @Link https://gitee.com/linngc/center.gf
// @Copyright Copyright (c) 2022 center CLI
// @Author linngc
// @License
package websocket
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcron"
"github.com/gogf/gf/v2/os/gtime"
"sync"
)
// ClientManager 客户端管理
type ClientManager struct {
Clients map[*WssClient]bool // 全部的连接
ClientsLock sync.RWMutex // 读写锁
UniqueClient map[string]*WssClient // 登录的用户 // unique
UniqueLock sync.RWMutex // 读写锁
Register chan *WssClient // 连接连接处理
Login chan *login // 用户登录处理
Unregister chan *WssClient // 断开连接处理程序
Broadcast chan *WResponse // 广播 向全部成员发送数据
ClientBroadcast chan *ClientWResponse // 广播 向某个客户端发送数据
TagBroadcast chan *TagWResponse // 广播 向某个标签成员发送数据
UBroadcast chan *UWResponse // 广播 向某个用户的所有链接发送数据
}
func NewClientManager() (clientManager *ClientManager) {
clientManager = &ClientManager{
Clients: make(map[*WssClient]bool),
UniqueClient: make(map[string]*WssClient),
Register: make(chan *WssClient, chanLen),
Unregister: make(chan *WssClient, chanLen),
Broadcast: make(chan *WResponse, chanLen),
TagBroadcast: make(chan *TagWResponse, chanLen),
UBroadcast: make(chan *UWResponse, chanLen),
}
return
}
// InClient 客户端是否存在
func (manager *ClientManager) InClient(client *WssClient) (ok bool) {
manager.ClientsLock.RLock()
defer manager.ClientsLock.RUnlock()
_, ok = manager.Clients[client]
return
}
// GetClients 获取所有客户端
func (manager *ClientManager) GetClients() (clients map[*WssClient]bool) {
clients = make(map[*WssClient]bool)
manager.ClientsRange(func(client *WssClient, value bool) (result bool) {
clients[client] = value
return true
})
return
}
// ClientsRange 遍历
func (manager *ClientManager) ClientsRange(f func(client *WssClient, value bool) (result bool)) {
manager.ClientsLock.RLock()
defer manager.ClientsLock.RUnlock()
for key, value := range manager.Clients {
result := f(key, value)
if result == false {
return
}
}
return
}
// GetClientsLen 获取客户端总数
func (manager *ClientManager) GetClientsLen() (clientsLen int) {
clientsLen = len(manager.Clients)
return
}
// AddClients 添加客户端
func (manager *ClientManager) AddClients(client *WssClient) {
manager.ClientsLock.Lock()
defer manager.ClientsLock.Unlock()
manager.Clients[client] = true
}
// DelClients 删除客户端
func (manager *ClientManager) DelClients(client *WssClient) {
manager.ClientsLock.Lock()
defer manager.ClientsLock.Unlock()
if _, ok := manager.Clients[client]; ok {
delete(manager.Clients, client)
}
}
// GetUniqueClient 获取用户的连接
func (manager *ClientManager) GetUniqueClient(unique uint64) (client *WssClient) {
manager.UniqueLock.RLock()
defer manager.UniqueLock.RUnlock()
uniqueKey := GetUniqueKey(unique)
if value, ok := manager.UniqueClient[uniqueKey]; ok {
client = value
}
return
}
// AddUniques 添加用户
func (manager *ClientManager) AddUniques(key string, client *WssClient) {
manager.UniqueLock.Lock()
defer manager.UniqueLock.Unlock()
manager.UniqueClient[key] = client
}
// DelUniques 删除用户
func (manager *ClientManager) DelUniques(client *WssClient) (result bool) {
manager.UniqueLock.Lock()
defer manager.UniqueLock.Unlock()
key := GetUniqueKey(client.Unique)
if value, ok := manager.UniqueClient[key]; ok {
// 判断是否为相同的用户
if value.Addr != client.Addr {
return
}
delete(manager.UniqueClient, key)
result = true
}
return
}
// GetUniquesLen 已登录用户数
func (manager *ClientManager) GetUniquesLen() (uniqueLen int) {
uniqueLen = len(manager.UniqueClient)
return
}
// EventRegister 用户建立连接事件
func (manager *ClientManager) EventRegister(ctx context.Context, client *WssClient) {
manager.AddClients(client)
//发送当前客户端标识
client.SendMsg(ctx, &WResponse{Event: Connected, FromId: Connected, ToId: client.Unique, Data: g.Map{"ID": client.ID, "Unique": client.Unique}})
}
// EventLogin 用户登录事件
func (manager *ClientManager) EventLogin(ctx context.Context, login *login) {
client := login.Client
if manager.InClient(client) {
uniqueKey := login.GetKey()
manager.AddUniques(uniqueKey, login.Client)
}
}
// EventUnregister 用户断开连接事件
func (manager *ClientManager) EventUnregister(ctx context.Context, client *WssClient) {
manager.DelClients(client)
// 删除用户连接
deleteResult := manager.DelUniques(client)
if deleteResult == false {
// 不是当前连接的客户端
return
}
// 关闭 chan
// close(client.Send)
}
// ClearTimeoutConnections 定时清理超时连接
func (manager *ClientManager) clearTimeoutConnections(ctx context.Context) {
currentTime := uint64(gtime.Now().Unix())
clients := Manager.GetClients()
for client := range clients {
if client.IsHeartbeatTimeout(currentTime) {
g.Log().Info(ctx, "心跳时间超时[链接地址=%s,连接唯一标识=%s,连接用户=%s,登陆时间=%s,用户上次心跳时间=%s ]关闭连接",
client.Addr, client.ID, client.Unique, client.LoginTime, client.HeartbeatTime)
_ = client.Socket.Close()
}
}
}
// WebsocketPing 心跳处理
func (manager *ClientManager) ping(ctx context.Context) {
//定时任务,发送心跳包
_, _ = gcron.Add(ctx, "0 */1 * * * *", func(ctx context.Context) {
res := &WResponse{Event: Ping, FromId: Ping, ToId: Ping, Data: uint64(gtime.Now().Unix())}
SendToAll(ctx, res)
})
// 定时任务,清理超时连接
_, _ = gcron.Add(ctx, "*/30 * * * * *", func(ctx context.Context) {
manager.clearTimeoutConnections(ctx)
})
}
// start 管道处理程序
func (manager *ClientManager) startChan(ctx context.Context) {
for {
select {
case conn := <-manager.Register:
// 建立连接事件
manager.EventRegister(ctx, conn)
case login := <-manager.Login:
// 用户登录
manager.EventLogin(ctx, login)
case conn := <-manager.Unregister:
// 断开连接事件
manager.EventUnregister(ctx, conn)
case message := <-manager.Broadcast:
// 全部客户端广播事件
clients := manager.GetClients()
for conn := range clients {
conn.SendMsg(ctx, message)
}
case message := <-manager.TagBroadcast:
// 标签广播事件
clients := manager.GetClients()
for conn := range clients {
if conn.tags.Contains(message.Tag) {
conn.SendMsg(ctx, message.WResponse)
}
}
case message := <-manager.UBroadcast:
// 用户广播事件
clients := manager.GetClients()
for conn := range clients {
if conn.Unique == message.Unique {
conn.SendMsg(ctx, message.WResponse)
}
}
case message := <-manager.ClientBroadcast:
// 单个客户端广播事件
clients := manager.GetClients()
for conn := range clients {
if conn.ID == message.ID {
conn.SendMsg(ctx, message.WResponse)
}
}
}
}
}
Go
1
https://gitee.com/linngc/center.gf.git
git@gitee.com:linngc/center.gf.git
linngc
center.gf
center.gf
52e4a05782b6

搜索帮助