Ai
1 Star 0 Fork 0

筑梦人/signal

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
session.go 3.65 KB
一键复制 编辑 原始数据 按行查看 历史
weixia.tian 提交于 2022-02-16 14:56 +08:00 . 增加onAuth认证逻辑
package webtrans
import (
"bytes"
"context"
"gitee.com/DreamPublic/signal/pkg/proto"
"go.uber.org/zap"
"io"
"net/http"
"nhooyr.io/websocket"
"sync"
)
type Session struct {
id string
cbLock sync.RWMutex
// header - []proto.SessionCallback
onCbMap sync.Map
con *websocket.Conn
signal *Signal
log *zap.Logger
}
func (s *Session) ID() string {
return s.id
}
func (s *Session) On(header string, cb proto.SessionCallback) {
s.cbLock.Lock()
defer s.cbLock.Unlock()
cbsRaw, ok := s.onCbMap.Load(header)
if !ok {
cbsRaw = make([]proto.SessionCallback, 0)
}
cbs := append(cbsRaw.([]proto.SessionCallback), cb)
s.onCbMap.Store(header, cbs)
}
func (s *Session) Emit(header string, body string) error {
msgStruct := proto.NewMsgStructFromParam(header, body)
writer, getWriterErr := s.con.Writer(context.Background(), websocket.MessageBinary)
if nil != getWriterErr {
s.log.Error("get writer error", zap.Error(getWriterErr),
zap.String("header", header),
zap.String("body", body))
return getWriterErr
}
defer func(writer io.WriteCloser) {
_ = writer.Close()
}(writer)
_, writeErr := writer.Write(msgStruct.ToBytes())
if nil != writeErr {
s.log.Error("发送返回数据失败", zap.Error(writeErr),
zap.String("header", header),
zap.String("body", body))
return writeErr
}
return nil
}
func (s *Session) Close() {
_ = s.con.Close(websocket.StatusNormalClosure, "")
s.signal.fireCloseCb(s)
s.log.Info("连接断开")
}
func NewSession(sessionId string, con *websocket.Conn, signal *Signal) *Session {
return &Session{
id: sessionId,
cbLock: sync.RWMutex{},
onCbMap: sync.Map{},
signal: signal,
con: con,
log: zap.L().With(zap.String("sessionId", sessionId)),
}
}
func (s *Session) fireOnCb(msg *proto.MsgStruct) {
header := msg.GetHeader()
cbsRaw, ok := s.onCbMap.Load(header)
if ok {
for _, cb := range cbsRaw.([]proto.SessionCallback) {
go cb(proto.SessionCbStruct{
Session: s,
MsgStruct: msg,
SessionId: s.id,
Msg: msg.GetBody(),
})
}
}
cbsRaw, ok = s.onCbMap.Load("*")
if ok {
for _, cb := range cbsRaw.([]proto.SessionCallback) {
go cb(proto.SessionCbStruct{
Session: s,
MsgStruct: msg,
SessionId: s.id,
Msg: msg.GetBody(),
})
}
}
}
func (s *Session) Handle(r *http.Request) {
log := s.log
s.signal.fireConnectCb(s)
defer s.Close()
if !s.signal.fireAuthCb(proto.AuthCbStruct{
Path: r.RequestURI,
Origin: r.Header.Get("Origin"),
}) {
return
}
// 读取数据
lenHeaderBytes := make([]byte, 4)
lenBodyBytes := make([]byte, 4)
for {
_, buf, readErr := s.con.Read(context.Background())
if nil != readErr {
log.Error("读取msg失败", zap.Error(readErr))
return
}
reader := bytes.NewReader(buf)
// 读取 header 长度
_, readDataErr := io.ReadFull(reader, lenHeaderBytes)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
// 读取 body 长度
_, readDataErr = io.ReadFull(reader, lenBodyBytes)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
lenHeader := proto.BytesToInt(lenHeaderBytes)
lenBody := proto.BytesToInt(lenBodyBytes)
leftBuf := make([]byte, lenHeader+lenBody)
_, readDataErr = io.ReadFull(reader, leftBuf)
if nil != readDataErr {
log.Error("read websocket data error", zap.Error(readDataErr))
break
}
totalBuf := bytes.Buffer{}
totalBuf.Write(lenHeaderBytes)
totalBuf.Write(lenBodyBytes)
totalBuf.Write(leftBuf)
// 解析数据包
structFromBytes := proto.NewMsgStructFromBytes(totalBuf.Bytes())
if nil != structFromBytes {
s.fireOnCb(structFromBytes)
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/DreamPublic/signal.git
git@gitee.com:DreamPublic/signal.git
DreamPublic
signal
signal
v0.0.3

搜索帮助