代码拉取完成,页面将自动刷新
package link
import (
"errors"
"sync"
"sync/atomic"
)
var SessionClosedError = errors.New("Session Closed")
var SessionBlockedError = errors.New("Session Blocked")
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type SessionID = uint64
var gNextSessionID uint64
// todo 将id生成规则替换成snowflake
func nextSessionID() SessionID {
return atomic.AddUint64(&gNextSessionID, 1)
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ISessionHandler interface {
HandleMessage(*Session, []byte)
HandleError(*Session, error)
//HandleClose(*Session, int, string) error
HandleConnect(*Session) error
HandleDisconnect(*Session)
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type SessionHandler struct {
}
func (SessionHandler) HandleMessage(*Session, []byte) {
}
func (SessionHandler) HandleError(*Session, error) {
}
//func (SessionHandler) HandleClose(*Session, int, string) error {
// return nil
//}
func (SessionHandler) HandleConnect(*Session) error {
return nil
}
func (SessionHandler) HandleDisconnect(*Session) {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type Session struct {
id uint64
codec Codec
manager *Manager
sendChan chan interface{}
recvMutex sync.Mutex
sendMutex sync.RWMutex
closeFlag int32
closeChan chan int
closeMutex sync.Mutex
firstCloseCallback *closeCallback
lastCloseCallback *closeCallback
State interface{}
}
func NewSession(codec Codec, sendChanSize int) *Session {
return newSession(nil, codec, sendChanSize)
}
func newSession(manager *Manager, codec Codec, sendChanSize int) *Session {
session := &Session{
codec: codec,
manager: manager,
closeChan: make(chan int),
id: nextSessionID(),
}
if sendChanSize > 0 {
session.sendChan = make(chan interface{}, sendChanSize)
go session.sendLoop()
}
return session
}
func (session *Session) ID() uint64 {
return session.id
}
func (session *Session) IsClosed() bool {
return atomic.LoadInt32(&session.closeFlag) == 1
}
func (session *Session) Close() error {
if atomic.CompareAndSwapInt32(&session.closeFlag, 0, 1) {
close(session.closeChan)
if session.sendChan != nil {
session.sendMutex.Lock()
close(session.sendChan)
if clear, ok := session.codec.(ClearSendChan); ok {
clear.ClearSendChan(session.sendChan)
}
session.sendMutex.Unlock()
}
err := session.codec.Close()
go func() {
session.invokeCloseCallbacks()
if session.manager != nil {
session.manager.delSession(session)
}
}()
return err
}
return SessionClosedError
}
func (session *Session) Codec() Codec {
return session.codec
}
func (session *Session) Receive() (interface{}, error) {
session.recvMutex.Lock()
defer session.recvMutex.Unlock()
msg, err := session.codec.Receive()
if err != nil {
session.Close()
}
return msg, err
}
func (session *Session) sendLoop() {
defer session.Close()
for {
select {
case msg, ok := <-session.sendChan:
if !ok || session.codec.Send(msg) != nil {
return
}
case <-session.closeChan:
return
}
}
}
func (session *Session) Send(msg interface{}) error {
if session.sendChan == nil {
if session.IsClosed() {
return SessionClosedError
}
session.sendMutex.Lock()
defer session.sendMutex.Unlock()
err := session.codec.Send(msg)
if err != nil {
session.Close()
}
return err
}
session.sendMutex.RLock()
if session.IsClosed() {
session.sendMutex.RUnlock()
return SessionClosedError
}
select {
case session.sendChan <- msg:
session.sendMutex.RUnlock()
return nil
default:
session.sendMutex.RUnlock()
session.Close()
return SessionBlockedError
}
}
type closeCallback struct {
Handler interface{}
Key interface{}
Func func()
Next *closeCallback
}
func (session *Session) AddCloseCallback(handler, key interface{}, callback func()) {
if session.IsClosed() {
return
}
session.closeMutex.Lock()
defer session.closeMutex.Unlock()
newItem := &closeCallback{handler, key, callback, nil}
if session.firstCloseCallback == nil {
session.firstCloseCallback = newItem
} else {
session.lastCloseCallback.Next = newItem
}
session.lastCloseCallback = newItem
}
func (session *Session) RemoveCloseCallback(handler, key interface{}) {
if session.IsClosed() {
return
}
session.closeMutex.Lock()
defer session.closeMutex.Unlock()
var prev *closeCallback
for callback := session.firstCloseCallback; callback != nil; prev, callback = callback, callback.Next {
if callback.Handler == handler && callback.Key == key {
if session.firstCloseCallback == callback {
session.firstCloseCallback = callback.Next
} else {
prev.Next = callback.Next
}
if session.lastCloseCallback == callback {
session.lastCloseCallback = prev
}
return
}
}
}
func (session *Session) invokeCloseCallbacks() {
session.closeMutex.Lock()
defer session.closeMutex.Unlock()
for callback := session.firstCloseCallback; callback != nil; callback = callback.Next {
callback.Func()
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。