代码拉取完成,页面将自动刷新
package ws
import (
v1log "gitee.com/scottq/go-framework/src/v1/log"
"net/http"
"sync"
)
type IWorkerMgr interface {
SetHub(h *Hub) //设置hub
Online(client *Client) //client 上线
Close(client *Client) //client close
Pong(messageType int, message []byte) (bool, []byte) //判断消息是否ping消息并返回pong应答消息体
Receive(cmd *WebSocketMessage) //接收消息处理
}
type Hub struct {
//cmd
messageCh chan *WebSocketMessage
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
//
worker IWorkerMgr
IClientsMgr
//hub quite service chan and do once
closed chan struct{}
closeOnce sync.Once
}
func NewHub() *Hub {
return &Hub{
messageCh: make(chan *WebSocketMessage, 128),
register: make(chan *Client),
unregister: make(chan *Client),
closed: make(chan struct{}, 0),
IClientsMgr: NewClientsMgr(),
}
}
func (h *Hub) Run() {
//尝试多个协程运行
for i := 1; i <= 10; i++ {
go h.run()
}
}
func (h *Hub) Stop() {
defer logger.Info("ws hub shutdown")
h.closed <- struct{}{}
for {
select {
case _, ok := <-h.closed:
if !ok {
return
}
}
}
}
func (h *Hub) SetWorker(worker IWorkerMgr) {
h.worker = worker
}
//用于ws联机的http方法
func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
serveWs(h, w, r)
}
//运行
func (h *Hub) run() {
defer logger.Info("ws hub stop")
defer h.closeOnce.Do(func() {
close(h.register)
close(h.closed)
close(h.unregister)
close(h.messageCh)
})
for {
select {
case <-h.closed:
//系统停止
h.handleQuit()
return
case client, ok := <-h.register:
if !ok {
return
}
//联机上线
h.handleOnline(client)
case client, ok := <-h.unregister:
if !ok {
return
}
//下线
h.handleClose(client)
case message, ok := <-h.messageCh:
if !ok {
return
}
//消息处理
h.handleMessage(message)
}
}
}
func (h *Hub) handleQuit() {
clients := h.Clients()
logger.Info("ws clients quite %d", len(clients))
for _, c := range clients {
h.handleClose(c)
}
}
func (h *Hub) handleClose(client *Client) {
h.DeleteClient(client)
if h.worker != nil {
h.worker.Close(client)
}
client.Close()
}
func (h *Hub) handleOnline(client *Client) {
h.AddClient(client)
if h.worker != nil {
h.worker.Online(client)
}
}
func (h *Hub) handleMessage(cmd *WebSocketMessage) {
if h.worker != nil {
h.worker.Receive(cmd)
}
}
func (h *Hub) PongHandle(messageType int, message []byte) (bool, []byte) {
if h.worker != nil {
return h.worker.Pong(messageType, message)
}
return false, []byte{}
}
//logger
var logger v1log.ILog = v1log.NewNullLog()
func SetLogger(log v1log.ILog) {
logger = log
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。