2 Star 1 Fork 1

mosache / YFrame

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
server.go 4.88 KB
一键复制 编辑 原始数据 按行查看 历史
ヤ沒脩袮兲︶ 提交于 2023-08-29 18:31 . temp
package rpc
import (
"context"
"encoding/binary"
"errors"
"gitee.com/mosache/YFrame/micro/rpc/compress"
"gitee.com/mosache/YFrame/micro/rpc/compress/nothing"
"gitee.com/mosache/YFrame/micro/rpc/internal/errs"
"gitee.com/mosache/YFrame/micro/rpc/message"
"gitee.com/mosache/YFrame/micro/rpc/serialize"
"gitee.com/mosache/YFrame/micro/rpc/serialize/json"
"net"
"reflect"
"strconv"
"time"
)
type Server struct {
services map[string]reflectStub
serializers map[uint8]serialize.Serializer
compressors map[uint8]compress.Compressor
}
type reflectStub struct {
s Service
value reflect.Value
serializers map[uint8]serialize.Serializer
compressors map[uint8]compress.Compressor
}
func (s reflectStub) invoke(ctx context.Context, req *message.Request) ([]byte, error) {
var (
serializer serialize.Serializer
compressor compress.Compressor
err error
ok bool
)
method := s.value.MethodByName(req.MethodName)
inReq := reflect.New(method.Type().In(1).Elem()).Interface()
if serializer, ok = s.serializers[req.Serializer]; !ok {
return nil, errs.ErrServerSerializerNotFound
}
if compressor, ok = s.compressors[req.Compresser]; !ok {
return nil, errs.ErrServerCompressorNotFound
}
req.Data, err = compressor.UnCompress(req.Data)
if err != nil {
return nil, err
}
err = serializer.Decode(req.Data, inReq)
if err != nil {
return nil, err
}
results := method.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(inReq)})
if err := results[1].Interface(); err != nil {
return nil, results[1].Interface().(error)
}
var (
res []byte
er error
)
res, er = serializer.Encode(results[0].Interface())
if er != nil {
return nil, er
}
res, er = compressor.Compress(res)
if er != nil {
return nil, er
}
return res, nil
}
func NewServer() *Server {
s := &Server{
services: make(map[string]reflectStub),
serializers: map[uint8]serialize.Serializer{},
compressors: map[uint8]compress.Compressor{},
}
s.RegisterSerializer(json.Serializer{})
s.RegisterCompressor(nothing.Compressor{})
return s
}
func (s *Server) Register(srv Service) {
s.services[srv.Name()] = reflectStub{
s: srv,
value: reflect.ValueOf(srv),
serializers: s.serializers,
compressors: s.compressors,
}
}
func (s *Server) RegisterSerializer(serializer serialize.Serializer) {
s.serializers[serializer.Code()] = serializer
}
func (s *Server) RegisterCompressor(compressor compress.Compressor) {
s.compressors[compressor.Code()] = compressor
}
func (s *Server) Start(addr string) error {
listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}
defer func() { _ = listener.Close() }()
for {
conn, err := listener.Accept()
if err != nil {
continue
}
go func() {
if err := s.handConn(conn); err != nil {
conn.Close()
}
}()
}
}
func (s *Server) handConn(conn net.Conn) error {
dataBs, err := ReadMsg(conn)
req := message.DecodeReq(dataBs)
ctx := context.Background()
cancel := func() {}
if deadlineStr, ok := req.Meta["deadline"]; ok {
if deadline, er := strconv.ParseInt(deadlineStr, 10, 64); er == nil {
ctx, cancel = context.WithDeadline(ctx, time.UnixMilli(deadline))
}
}
if oneWay, ok := req.Meta["onWay"]; ok {
if oneWay == "onWay" {
ctx = ContextWithOneway(ctx)
}
}
var resp *message.Response
if resp, err = s.Invoke(ctx, req); err != nil {
/// 业务错误
resp.Error = []byte(err.Error())
}
cancel()
resp.CalculateHeaderLength()
resp.CalculateBodyLength()
/// 写回响应
if _, err = conn.Write(message.EncodeResp(resp)); err != nil {
return err
}
//if err = s.send(conn, message.EncodeResp(resp)); err != nil {
// return err
//}
return nil
}
//func (s *Server) handMsg(bs []byte) (any, error) {
//
// req := message.DecodeReq(bs)
//
// ret, err := s.Invoke(context.Background(), req)
//
// /// biz err
// if err != nil {
// return nil, err
// }
//
// return ret, nil
//}
func (s *Server) Invoke(ctx context.Context, req *message.Request) (*message.Response, error) {
srv, ok := s.services[req.ServiceName]
resp := &message.Response{
RequestID: req.RequestID,
Version: req.Version,
Compresser: req.Compresser,
Serializer: req.Serializer,
}
defer func() {
resp.CalculateHeaderLength()
resp.CalculateBodyLength()
}()
if !ok {
return nil, errors.New("[rpc] service not found")
}
if IsOneWayContext(ctx) {
go func() {
_, _ = srv.invoke(ctx, req)
}()
return resp, errs.ErrServerOneWayCall
}
respData, err := srv.invoke(ctx, req)
if err != nil {
return nil, err
}
resp.Data = respData
return resp, nil
}
func (s *Server) send(conn net.Conn, data []byte) error {
dataLen := len(data)
dataBs := make([]byte, dataLen+sendDataHeaderBytes)
binary.BigEndian.PutUint64(dataBs[:sendDataHeaderBytes], uint64(dataLen))
copy(dataBs[sendDataHeaderBytes:], data)
_, err := conn.Write(dataBs)
if err != nil {
return err
}
return nil
}
Go
1
https://gitee.com/mosache/YFrame.git
git@gitee.com:mosache/YFrame.git
mosache
YFrame
YFrame
v0.1.73

搜索帮助