代码拉取完成,页面将自动刷新
package cluster
/*
regest rpc
*/
import (
"fmt"
"github.com/viphxin/xingo/logger"
"github.com/viphxin/xingo/utils"
"math/rand"
"reflect"
"time"
)
type RpcMsgHandle struct {
PoolSize int32
TaskQueue []chan *RpcRequest
Apis map[string]reflect.Value
}
func NewRpcMsgHandle() *RpcMsgHandle {
return &RpcMsgHandle{
PoolSize: utils.GlobalObject.PoolSize,
TaskQueue: make([]chan *RpcRequest, utils.GlobalObject.PoolSize),
Apis: make(map[string]reflect.Value),
}
}
/*
处理rpc消息
*/
func (this *RpcMsgHandle) DoMsg(request *RpcRequest) {
if request.Rpcdata.MsgType == RESPONSE && request.Rpcdata.Key != "" {
//放回异步结果
AResultGlobalObj.FillAsyncResult(request.Rpcdata.Key, request.Rpcdata)
return
} else {
//rpc 请求
if f, ok := this.Apis[request.Rpcdata.Target]; ok {
//存在
st := time.Now()
if request.Rpcdata.MsgType == REQUEST_FORRESULT {
ret := f.Call([]reflect.Value{reflect.ValueOf(request)})
if len(ret) == 0 {
return
}
packdata, err := utils.GlobalObject.RpcCProtoc.GetDataPack().Pack(0, &RpcData{
MsgType: RESPONSE,
Result: ret[0].Interface().(map[string]interface{}),
Key: request.Rpcdata.Key,
})
if err == nil {
request.Fconn.Send(packdata)
} else {
logger.Error(err)
}
} else if request.Rpcdata.MsgType == REQUEST_NORESULT {
f.Call([]reflect.Value{reflect.ValueOf(request)})
}
logger.Debug(fmt.Sprintf("rpc %s cost total time: %f ms", request.Rpcdata.Target, time.Now().Sub(st).Seconds()*1000))
} else {
logger.Error(fmt.Sprintf("not found rpc: %s", request.Rpcdata.Target))
}
}
}
func (this *RpcMsgHandle) DeliverToMsgQueue(pkg interface{}) {
request := pkg.(*RpcRequest)
//add to worker pool
index := rand.Int31n(utils.GlobalObject.PoolSize)
taskQueue := this.TaskQueue[index]
logger.Debug(fmt.Sprintf("add to rpc pool : %d", index))
taskQueue <- request
}
func (this *RpcMsgHandle) DoMsgFromGoRoutine(pkg interface{}) {
request := pkg.(*RpcRequest)
go this.DoMsg(request)
}
func (this *RpcMsgHandle) AddRouter(router interface{}) {
value := reflect.ValueOf(router)
tp := value.Type()
for i := 0; i < value.NumMethod(); i += 1 {
name := tp.Method(i).Name
if _, ok := this.Apis[name]; ok {
//存在
panic("repeated rpc " + name)
}
this.Apis[name] = value.Method(i)
logger.Info("add rpc " + name)
}
}
func (this *RpcMsgHandle) StartWorkerLoop(poolSize int) {
if utils.GlobalObject.IsThreadSafeMode(){
this.TaskQueue[0] = make(chan *RpcRequest, utils.GlobalObject.MaxWorkerLen)
go func(){
for{
select {
case rpcRequest := <- this.TaskQueue[0]:
this.DoMsg(rpcRequest)
case delayCall := <- utils.GlobalObject.GetSafeTimer().GetTriggerChannel():
delayCall.Call()
}
}
}()
}else{
for i := 0; i < poolSize; i += 1 {
c := make(chan *RpcRequest, utils.GlobalObject.MaxWorkerLen)
this.TaskQueue[i] = c
go func(index int, taskQueue chan *RpcRequest) {
logger.Info(fmt.Sprintf("init rpc thread pool %d.", index))
for {
request := <-taskQueue
this.DoMsg(request)
}
}(i, c)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。