1 Star 0 Fork 0

jmesyan / kudos

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
agent.go 2.98 KB
一键复制 编辑 原始数据 按行查看 历史
jmesyan 提交于 2020-12-07 23:29 . v3 init
package pomelo
import (
"net"
"reflect"
"gitee.com/jmesyan/kudos/v3/log"
"gitee.com/jmesyan/kudos/v3/network"
"gitee.com/jmesyan/kudos/v3/protocol"
"gitee.com/jmesyan/kudos/v3/protocol/pomelo/message"
"gitee.com/jmesyan/kudos/v3/protocol/pomelo/pkg"
"gitee.com/jmesyan/kudos/v3/rpc"
"gitee.com/jmesyan/kudos/v3/service/codecService"
"github.com/kudoochui/rpcx/client"
)
type agent struct {
conn network.Conn
connector *Connector
session *rpc.Session
userData interface{}
agentHandler *agentHandler
chanRet chan *client.Call
writeChan chan *[]byte
}
func NewAgent(conn network.Conn, connector *Connector) *agent{
a := &agent{
conn: conn,
connector: connector,
session: rpc.NewSession(connector.nodeId),
userData: nil,
chanRet: make(chan *client.Call, 100),
writeChan: make(chan *[]byte, 100),
}
a.agentHandler = NewAgentHandler(a)
return a
}
func (a *agent) Run() {
go func() {
defer a.conn.Close()
for {
select {
case ri := <-a.chanRet:
if ri.Error != nil {
log.Error("failed to call: %v", ri.Error)
} else {
args := ri.Args.(*rpc.Args)
if a.connector.handlerFilter != nil {
a.connector.handlerFilter.After(ri.ServicePath + "." + ri.ServiceMethod, ri)
}
a.WriteResponse(args.MsgId, ri.Reply)
}
case b := <-a.writeChan:
if b == nil {
return
}
err := a.conn.WriteMessage(*b)
protocol.FreePoolBuffer(b)
if err != nil {
log.Error("ws WriteMessage: %s", err.Error())
return
}
}
}
}()
for {
buffer := protocol.GetPoolMsg()
err := a.conn.ReadMsg(buffer)
if err != nil {
log.Debug("read message: %v", err)
break
}
a.agentHandler.Handle(buffer)
}
close(a.writeChan)
}
func (a *agent) OnClose() {
if a.agentHandler.timerHandler != nil {
a.connector.timers.ClearTimeout(a.agentHandler.timerHandler)
}
a.connector.connection.OnDisconnect(a.session)
a.connector.sessions.DelSession(a)
}
func (a *agent) WriteResponse(msgId int, msg interface{}) {
_codec := codecService.GetCodecService()
if _codec != nil {
data, err := _codec.Marshal(msg)
if err != nil {
log.Error("marshal message %v error: %v", reflect.TypeOf(msg), err)
return
}
//routeId := msgService.GetMsgService().GetRouteId(route)
buffer := message.Encode(msgId, message.TYPE_RESPONSE, 0, data)
err = a.conn.WriteMessage(*pkg.Encode(pkg.TYPE_DATA, buffer))
protocol.FreePoolBuffer(&buffer)
if err != nil {
log.Error("write message %v error: %v", reflect.TypeOf(msg), err)
}
}
}
// Write to channel. Make sure buffer from protocol.GetPoolBuffer()
func (a *agent) Write(data *[]byte) {
a.writeChan <- data
}
func (a *agent) LocalAddr() net.Addr {
return a.conn.LocalAddr()
}
func (a *agent) RemoteAddr() net.Addr {
return a.conn.RemoteAddr()
}
func (a *agent) Close() {
a.conn.Close()
}
func (a *agent) UserData() interface{} {
return a.userData
}
func (a *agent) SetUserData(data interface{}) {
a.userData = data
}
func (a *agent) GetSession() *rpc.Session {
return a.session
}
1
https://gitee.com/jmesyan/kudos.git
git@gitee.com:jmesyan/kudos.git
jmesyan
kudos
kudos
v3.0.1

搜索帮助