2 Star 0 Fork 0

MixerJ/pitaya

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
util.go 8.42 KB
一键复制 编辑 原始数据 按行查看 历史
zhusw 提交于 2021-04-01 20:44 +08:00 . push
// Copyright (c) TFG Co. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package service
import (
"context"
"errors"
"fmt"
"gitee.com/mixerj/pitaya/conn/message"
"go.uber.org/zap"
pbProto "pb/protocol"
"reflect"
"gitee.com/mixerj/pitaya/component"
"gitee.com/mixerj/pitaya/constants"
e "gitee.com/mixerj/pitaya/errors"
"gitee.com/mixerj/pitaya/logger"
"gitee.com/mixerj/pitaya/pipeline"
"gitee.com/mixerj/pitaya/route"
"gitee.com/mixerj/pitaya/serialize"
"gitee.com/mixerj/pitaya/session"
"gitee.com/mixerj/pitaya/util"
"github.com/golang/protobuf/proto"
)
const (
agentVersion = 1
)
var errInvalidMsg = errors.New("invalid message type provided")
func getHandler(rt *route.Route) (*component.Handler, error) {
rtShort := rt.Short()
handler, ok := handlers[rtShort]
if !ok {
_err := fmt.Errorf("pitaya/handler: %s not found", rt.String())
return nil, _err
}
return handler, nil
}
func GetLocalHandler(rt *route.Route) bool {
_, err := getHandler(rt)
if nil != err {
return false
}
return true
}
func unmarshalHandlerArg(handler *component.Handler, serializer serialize.Serializer, payload []byte) (interface{}, error) {
if handler.IsRawArg {
return payload, nil
}
var arg interface{}
if handler.Type != nil {
arg = reflect.New(handler.Type.Elem()).Interface()
err := serializer.Unmarshal(payload, arg)
if err != nil {
return nil, err
}
}
return arg, nil
}
func unmarshalRemoteArg(remote *component.Remote, payload []byte) (interface{}, error) {
var arg interface{}
if remote.Type != nil {
arg = reflect.New(remote.Type.Elem()).Interface()
pb, ok := arg.(proto.Message)
if !ok {
return nil, constants.ErrWrongValueType
}
err := proto.Unmarshal(payload, pb)
if err != nil {
return nil, err
}
}
return arg, nil
}
func getMsgType(msgTypeIface interface{}) (pbProto.PacketType, error) {
var msgType pbProto.PacketType
if val, ok := msgTypeIface.(pbProto.PacketType); ok {
msgType = val
} else {
return msgType, errInvalidMsg
}
// else if val, ok := msgTypeIface.(pbProto.PacketType); ok {
// msgType = util.ConvertProtoToMessageType(val)
// }
return msgType, nil
}
func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) {
var err error
res := data
if len(pipeline.BeforeHandler.Handlers) > 0 {
for _, h := range pipeline.BeforeHandler.Handlers {
res, err = h(ctx, res)
if err != nil {
logger.Log.Debugf("pitaya/handler: broken pipeline: %s", err.Error())
return res, err
}
}
}
return res, nil
}
func executeAfterPipeline(ctx context.Context, res interface{}, err error) (interface{}, error) {
ret := res
if len(pipeline.AfterHandler.Handlers) > 0 {
for _, h := range pipeline.AfterHandler.Handlers {
ret, err = h(ctx, ret, err)
}
}
return ret, err
}
func serializeReturn(ser serialize.Serializer, ret interface{}) ([]byte, error) {
res, err := util.SerializeOrRaw(ser, message.Response, ret)
if err != nil {
logger.Log.Errorf("Failed to serialize return: %s", err.Error())
res, err = util.GetErrorPayload(ser, err)
if err != nil {
logger.Log.Error("cannot serialize message and respond to the client ", err.Error())
return nil, err
}
}
return res, nil
}
func processHandlerMessage(
ctx context.Context,
rt *route.Route,
serializer serialize.Serializer,
session *session.Session,
data []byte,
msgTypeIface interface{},
remote bool,
) ([]byte, error) {
if ctx == nil {
ctx = context.Background()
}
ctx = context.WithValue(ctx, constants.SessionCtxKey, session)
ctx = util.CtxWithDefaultLogger(ctx, rt.String(), session.UID())
h, err := getHandler(rt)
if err != nil {
return nil, e.NewError(err, e.ErrNotFoundCode)
}
msgType, err := getMsgType(msgTypeIface)
if err != nil {
return nil, e.NewError(err, e.ErrInternalCode)
}
logger := ctx.Value(constants.LoggerCtxKey).(*zap.SugaredLogger)
// exit, err := h.ValidateMessageType(msgType)
// if err != nil && exit {
// return nil, e.NewError(err, e.ErrBadRequestCode)
// } else if err != nil {
// logger.Warnf("invalid message type, error: %s", err.Error())
// }
// First unmarshal the handler arg that will be passed to
// both handler and pipeline functions
arg, err := unmarshalHandlerArg(h, serializer, data)
if err != nil {
return nil, e.NewError(err, e.ErrBadRequestCode)
}
if arg, err = executeBeforePipeline(ctx, arg); err != nil {
return nil, err
}
logger.Debugf("SID=%d, Data=%s", session.ID(), data)
args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)}
if arg != nil {
args = append(args, reflect.ValueOf(arg))
}
resp, err := util.Pcall(h.Method, args)
if remote && msgType == pbProto.PacketType_RESPONSE {
// This is a special case and should only happen with nats rpc client
// because we used nats request we have to answer to it or else a timeout
// will happen in the caller server and will be returned to the client
// the reason why we don't just Publish is to keep track of failed rpc requests
// with timeouts, maybe we can improve this flow
resp = []byte("ack")
}
resp, err = executeAfterPipeline(ctx, resp, err)
if err != nil {
return nil, err
}
ret, err := serializeReturn(serializer, resp)
if err != nil {
return nil, err
}
return ret, nil
}
func processHandlerProtoMessage(
lm unhandledProtoMessage,
serializer serialize.Serializer,
remote bool,
) ([]byte, error) {
defer util.FuncTimeCost()()
if lm.ctx == nil {
lm.ctx = context.Background()
}
lm.ctx = context.WithValue(lm.ctx, constants.SessionCtxKey, lm.agent.Session)
lm.ctx = util.CtxWithDefaultLogger(lm.ctx, lm.route.String(), lm.agent.Session.UID())
h, err := getHandler(lm.route)
if err != nil {
return nil, e.NewError(err, e.ErrNotFoundCode)
}
msgType, err := getMsgType(lm.packet.Type)
if err != nil {
return nil, e.NewError(err, e.ErrInternalCode)
}
loggerCtx := lm.ctx.Value(constants.LoggerCtxKey).(*zap.SugaredLogger)
// exit, err := h.ValidateMessageType(msgType)
// if err != nil && exit {
// return nil, e.NewError(err, e.ErrBadRequestCode)
// } else if err != nil {
// logger.Warnf("invalid message type, error: %s", err.Error())
// }
// First unmarshal the handler arg that will be passed to
// both handler and pipeline functions
arg, err := unmarshalHandlerArg(h, serializer, lm.msg.Request)
if err != nil {
return nil, e.NewError(err, e.ErrBadRequestCode)
}
if arg, err = executeBeforePipeline(lm.ctx, arg); err != nil {
return nil, err
}
loggerCtx.With("args", map[string]interface{}{
"sid": lm.agent.Session.ID(),
"data": arg,
}).Info("request args")
args := []reflect.Value{h.Receiver, reflect.ValueOf(lm.ctx)}
if arg != nil {
args = append(args, reflect.ValueOf(arg))
}
resp, err := util.Pcall(h.Method, args)
if remote && msgType == pbProto.PacketType_RESPONSE {
// This is a special case and should only happen with nats rpc client
// because we used nats request we have to answer to it or else a timeout
// will happen in the caller server and will be returned to the client
// the reason why we don't just Publish is to keep track of failed rpc requests
// with timeouts, maybe we can improve this flow
resp = []byte("ack")
}
resp, err = executeAfterPipeline(lm.ctx, resp, err)
if err != nil {
return nil, err
}
respArgs := map[string]interface{}{
"resp": resp,
"user_id": lm.agent.Session.UID(),
}
loggerCtx.With("args", respArgs).Info("response args")
ret, err := serializeReturn(serializer, resp)
if err != nil {
return nil, err
}
return ret, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mixerj/pitaya.git
git@gitee.com:mixerj/pitaya.git
mixerj
pitaya
pitaya
4e7898a663a6

搜索帮助