代码拉取完成,页面将自动刷新
package channel
import (
"fmt"
"net"
"sync"
"gitee.com/andyxt/gona/logger"
"gitee.com/andyxt/gona/utils"
)
const (
ChannelKey string = "ChannelKey"
)
type SocketChannel struct {
Attr
mConn net.Conn
mPipeline ChannelPipeline
isInactive bool
mReader *SocketChannelReader
mWriter *SocketChannelWriter
}
func NewSocketChannel(params map[string]interface{}, conn net.Conn, channelInitializer ChannelInitializer) (this *SocketChannel) {
return newSocketChannel(params, conn, channelInitializer)
}
func newSocketChannel(params map[string]interface{}, conn net.Conn, channelInitializer ChannelInitializer) (this *SocketChannel) {
this = new(SocketChannel)
this.initAttr(params)
this.mConn = conn
this.mPipeline = NewDefaultChannelPipeline(this)
channelInitializer.InitChannel(this.mPipeline)
this.mReader = NewSocketChannelReader(this.mConn, this, this, this)
this.mWriter = NewSocketChannelWriter(this.mConn, this, this, this)
return
}
func (chanenl *SocketChannel) initAttr(params map[string]interface{}) {
chanenl.lock = new(sync.Mutex)
chanenl.attr = make(map[string]interface{})
for k, v := range params {
chanenl.Set(k, v)
}
chanenl.Set(ChannelKey, utils.UUID())
}
func (chanenl *SocketChannel) ID() string {
return chanenl.GetString(ChannelKey)
}
func (chanenl *SocketChannel) Start() {
chanenl.startRead()
chanenl.startWrite()
}
func (chanenl *SocketChannel) startRead() {
chanenl.mReader.Start()
}
func (chanenl *SocketChannel) startWrite() {
chanenl.mWriter.Start()
}
// for Channel
func (chanenl *SocketChannel) RemoteAddr() string {
return chanenl.mConn.RemoteAddr().String()
}
// for Channel
func (chanenl *SocketChannel) Write(data []byte) {
if !chanenl.isInactive {
chanenl.mWriter.Write(data)
}
}
// for Channel
func (chanenl *SocketChannel) Close() {
if !chanenl.isInactive {
chanenl.mWriter.Close()
}
}
// 关闭底层网络
func (chanenl *SocketChannel) closeSocket() {
defer func() {
recover()
}()
chanenl.mConn.Close()
}
// for ChannelError
func (chanenl *SocketChannel) IOReadError(err error) {
chlCtxKey := chanenl.ID()
logger.Debug("SocketChannel IOReadError 1!", "chlCtxID=", chlCtxKey, " closeSocket")
chanenl.closeSocket() //关闭底层网络
logger.Debug("SocketChannel IOReadError 2!", "chlCtxID=", chlCtxKey, " closeWriteChan")
chanenl.mWriter.Close() //解决写线程堵downChan
logger.Debug("SocketChannel IOReadError 3!", "chlCtxID=", chlCtxKey, " ReadRoutine done")
}
func (chanenl *SocketChannel) IOWriteError(err error) {
chlCtxKey := chanenl.ID()
logger.Debug("SocketChannel IOWriteError 1!", "chlCtxID=", chlCtxKey, "closeSocket")
chanenl.closeSocket() //关闭底层网络
logger.Debug("SocketChannel IOWriteError 2!", "chlCtxID=", chlCtxKey, "closeWriteChan")
chanenl.mWriter.Close() //解决写线程堵downChan
logger.Debug("SocketChannel IOWriteError 3!", "chlCtxID=", chlCtxKey, "WriteRoutine done")
}
// for ChannelCallBack
func (chanenl *SocketChannel) Active() {
chanenl.invokeActive()
}
func (chanenl *SocketChannel) invokeActive() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.mPipeline.FireChannelActive()
}
// for ChannelCallBack
func (chanenl *SocketChannel) Inactive() {
chanenl.invokeInactive()
}
func (chanenl *SocketChannel) invokeInactive() {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.isInactive = true
chanenl.mPipeline.FireChannelInactive()
}
// for ChannelCallBack
func (chanenl *SocketChannel) MsgReceived(data []byte) {
chanenl.invokeMsgReceived(data)
}
func (chanenl *SocketChannel) invokeMsgReceived(data []byte) {
defer func() {
if recoverErr := recover(); recoverErr != nil {
logger.Error(fmt.Sprint(recoverErr, string(utils.Stack(3))))
}
}()
chanenl.mPipeline.FireMessageReceived(data)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。