代码拉取完成,页面将自动刷新
package webtrans
import (
"bytes"
"context"
"gitee.com/DreamPublic/signal/pkg/proto"
"go.uber.org/zap"
"io"
"net/http"
"nhooyr.io/websocket"
"sync"
)
type Session struct {
id string
cbLock sync.RWMutex
// header - []proto.SessionCallback
onCbMap sync.Map
con *websocket.Conn
signal *Signal
log *zap.Logger
}
func (s *Session) ID() string {
return s.id
}
func (s *Session) On(header string, cb proto.SessionCallback) {
s.cbLock.Lock()
defer s.cbLock.Unlock()
cbsRaw, ok := s.onCbMap.Load(header)
if !ok {
cbsRaw = make([]proto.SessionCallback, 0)
}
cbs := append(cbsRaw.([]proto.SessionCallback), cb)
s.onCbMap.Store(header, cbs)
}
func (s *Session) Emit(header string, body string) error {
msgStruct := proto.NewMsgStructFromParam(header, body)
writer, getWriterErr := s.con.Writer(context.Background(), websocket.MessageBinary)
if nil != getWriterErr {
s.log.Error("get writer error", zap.Error(getWriterErr),
zap.String("header", header),
zap.String("body", body))
return getWriterErr
}
defer func(writer io.WriteCloser) {
_ = writer.Close()
}(writer)
_, writeErr := writer.Write(msgStruct.ToBytes())
if nil != writeErr {
s.log.Error("发送返回数据失败", zap.Error(writeErr),
zap.String("header", header),
zap.String("body", body))
return writeErr
}
return nil
}
func (s *Session) Close() {
_ = s.con.Close(websocket.StatusNormalClosure, "")
s.signal.fireCloseCb(s)
s.log.Info("连接断开")
}
func NewSession(sessionId string, con *websocket.Conn, signal *Signal) *Session {
return &Session{
id: sessionId,
cbLock: sync.RWMutex{},
onCbMap: sync.Map{},
signal: signal,
con: con,
log: zap.L().With(zap.String("sessionId", sessionId)),
}
}
func (s *Session) fireOnCb(msg *proto.MsgStruct) {
header := msg.GetHeader()
cbsRaw, ok := s.onCbMap.Load(header)
if ok {
for _, cb := range cbsRaw.([]proto.SessionCallback) {
go cb(proto.SessionCbStruct{
Session: s,
MsgStruct: msg,
SessionId: s.id,
Msg: msg.GetBody(),
})
}
}
cbsRaw, ok = s.onCbMap.Load("*")
if ok {
for _, cb := range cbsRaw.([]proto.SessionCallback) {
go cb(proto.SessionCbStruct{
Session: s,
MsgStruct: msg,
SessionId: s.id,
Msg: msg.GetBody(),
})
}
}
}
func (s *Session) Handle(r *http.Request) {
log := s.log
s.signal.fireConnectCb(s)
defer s.Close()
if !s.signal.fireAuthCb(proto.AuthCbStruct{
Path: r.RequestURI,
Origin: r.Header.Get("Origin"),
}) {
return
}
// 读取数据
lenHeaderBytes := make([]byte, 4)
lenBodyBytes := make([]byte, 4)
for {
_, buf, readErr := s.con.Read(context.Background())
if nil != readErr {
log.Error("读取msg失败", zap.Error(readErr))
return
}
reader := bytes.NewReader(buf)
// 读取 header 长度
_, readDataErr := io.ReadFull(reader, lenHeaderBytes)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
// 读取 body 长度
_, readDataErr = io.ReadFull(reader, lenBodyBytes)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
lenHeader := proto.BytesToInt(lenHeaderBytes)
lenBody := proto.BytesToInt(lenBodyBytes)
leftBuf := make([]byte, lenHeader+lenBody)
_, readDataErr = io.ReadFull(reader, leftBuf)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
totalBuf := bytes.Buffer{}
totalBuf.Write(lenHeaderBytes)
totalBuf.Write(lenBodyBytes)
totalBuf.Write(leftBuf)
// 解析数据包
structFromBytes := proto.NewMsgStructFromBytes(totalBuf.Bytes())
if nil != structFromBytes {
s.fireOnCb(structFromBytes)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。