Fetch the repository succeeded.
This action will force synchronization from Coder/gona, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
package channel
import (
"fmt"
"net"
"sync"
"gitee.com/andyxt/gona/internal/logger"
"gitee.com/andyxt/gona/utils"
)
const (
ChannelKey string = "ChannelKey"
)
type SocketChannel struct {
Attr
mConn net.Conn
mPipeline ChannelPipeline
isInactive bool
mReader *SocketChannelReader
mWriter *SocketChannelWriter
}
func NewSocketChannel(params map[string]interface{}, conn net.Conn, channelInitializer ChannelInitializer) (this *SocketChannel) {
return newSocketChannel(params, conn, channelInitializer)
}
func newSocketChannel(params map[string]interface{}, conn net.Conn, channelInitializer ChannelInitializer) (this *SocketChannel) {
this = new(SocketChannel)
this.initAttr(params)
this.mConn = conn
this.mPipeline = NewDefaultChannelPipeline(this)
channelInitializer.InitChannel(this.mPipeline)
this.mReader = NewSocketChannelReader(this.mConn, this, this, this)
this.mWriter = NewSocketChannelWriter(this.mConn, this, this, this)
return
}
func (chanenl *SocketChannel) initAttr(params map[string]interface{}) {
chanenl.lock = new(sync.Mutex)
chanenl.attr = make(map[string]interface{})
for k, v := range params {
chanenl.Set(k, v)
}
chanenl.Set(ChannelKey, utils.UUID())
}
func (chanenl *SocketChannel) ID() string {
return chanenl.GetString(ChannelKey)
}
func (chanenl *SocketChannel) Start() {
chanenl.startRead()
chanenl.startWrite()
}
func (chanenl *SocketChannel) startRead() {
chanenl.mReader.Start()
}
func (chanenl *SocketChannel) startWrite() {
chanenl.mWriter.Start()
}
// for Channel
func (chanenl *SocketChannel) RemoteAddr() string {
return chanenl.mConn.RemoteAddr().String()
}
// for Channel
func (chanenl *SocketChannel) Write(data []byte) {
if !chanenl.isInactive {
chanenl.mWriter.Write(data)
}
}
// for Channel
func (chanenl *SocketChannel) Close() {
if !chanenl.isInactive {
chanenl.mWriter.Close()
}
}
// 关闭底层网络
func (chanenl *SocketChannel) closeSocket() {
defer func() {
recover()
}()
chanenl.mConn.Close()
}
// for ChannelError
func (chanenl *SocketChannel) IOReadError(err error) {
chlCtxKey := chanenl.ID()
logger.Debug("SocketChannel IOReadError 1!", "chlCtxID=", chlCtxKey, " closeSocket")
chanenl.closeSocket() //关闭底层网络
logger.Debug("SocketChannel IOReadError 2!", "chlCtxID=", chlCtxKey, " closeWriteChan")
chanenl.mWriter.Close() //解决写线程堵downChan
logger.Debug("SocketChannel IOReadError 3!", "chlCtxID=", chlCtxKey, " ReadRoutine done")
}
func (chanenl *SocketChannel) IOWriteError(err error) {
chlCtxKey := chanenl.ID()
logger.Debug("SocketChannel IOWriteError 1!", "chlCtxID=", chlCtxKey, "closeSocket")
chanenl.closeSocket() //关闭底层网络
logger.Debug("SocketChannel IOWriteError 2!", "chlCtxID=", chlCtxKey, "closeWriteChan")
chanenl.mWriter.Close() //解决写线程堵downChan
logger.Debug("SocketChannel IOWriteError 3!", "chlCtxID=", chlCtxKey, "WriteRoutine done")
}
// for ChannelCallBack
func (chanenl *SocketChannel) Active() {
chanenl.invokeActive()
}
func (chanenl *SocketChannel) invokeActive() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.mPipeline.FireChannelActive()
}
// for ChannelCallBack
func (chanenl *SocketChannel) Inactive() {
chanenl.invokeInactive()
}
func (chanenl *SocketChannel) invokeInactive() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.isInactive = true
chanenl.mPipeline.FireChannelInactive()
}
// for ChannelCallBack
func (chanenl *SocketChannel) MsgReceived(data []byte) {
chanenl.invokeMsgReceived(data)
}
func (chanenl *SocketChannel) invokeMsgReceived(data []byte) {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.mPipeline.FireMessageReceived(data)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。