代码拉取完成,页面将自动刷新
package server
import (
"context"
"errors"
"fmt"
"go-rpc-2/rpc/encoding_protocol"
"go-rpc-2/rpc/proxy"
"go-rpc-2/rpc/serialize_protocol"
"net"
"reflect"
"strconv"
"time"
)
// ReflectionStub service的代理,通过invoke执行相应方法
type ReflectionStub struct {
service proxy.Service
serializers map[uint8]serialize_protocol.Serializer
}
// Invoke 只需返回调用结果即err、data字段
func (r *ReflectionStub) Invoke(ctx context.Context, req *encoding_protocol.Request) *encoding_protocol.Response {
res := &encoding_protocol.Response{}
typ := reflect.TypeOf(r.service)
method, ok := typ.MethodByName(req.MethodName)
if !ok {
res.Err = fmt.Sprintf("server: service %s method %s not found", req.ServiceName, req.MethodName)
return res
}
in := make([]reflect.Value, method.Type.NumIn())
in[0] = reflect.ValueOf(r.service)
in[1] = reflect.ValueOf(ctx)
// req.Data就是请求参数(需要传给method的参数)但req.Data是[]byte,需要先序列化解码
var serializer serialize_protocol.Serializer
serializer, ok = r.serializers[req.Serializer]
if !ok {
res.Err = fmt.Sprintf("server: serializer %d not found", req.Serializer)
return res
}
methodIn := reflect.New(method.Type.In(2).Elem())
err := serializer.Decode(req.Data, methodIn.Interface())
if err != nil {
res.Err = fmt.Sprintf("server: decode req.data failed: %s", err.Error())
return res
}
in[2] = methodIn
// 调用方法
result := method.Func.Call(in)
if result[1].Interface() != nil {
res.Err = fmt.Sprintf("server: business error: %s", result[1].Interface().(error))
return res
}
// 对结果进行编码
var data []byte
data, err = serializer.Encode(result[0].Interface())
if err != nil {
res.Err = fmt.Sprintf("server: encode res.data failed: %s", err.Error())
return res
}
res.Data = data
return res
}
var _ proxy.Proxy = (*ReflectionStub)(nil)
type Server struct {
*encoding_protocol.ConnMsg
services map[string]*ReflectionStub
}
func (s *Server) Invoke(ctx context.Context, req *encoding_protocol.Request) *encoding_protocol.Response {
res := &encoding_protocol.Response{
MessageId: req.MessageId,
Version: req.Version,
Compressor: req.Compressor,
Serializer: req.Serializer,
}
// 找到对应服务
sr, ok := s.services[req.ServiceName]
if !ok {
res.Err = fmt.Sprintf("server: service %s not found", req.ServiceName)
return res
}
var cancel context.CancelFunc = func() {}
defer cancel()
if req.Meta != nil {
if req.Meta["one-way"] == "true" {
go func() {
sr.Invoke(ctx, req)
}()
res.Err = "server: 已开启one-way调用"
return res
}
var deadline string
deadline, ok = req.Meta["deadline"]
if ok {
t, err := strconv.ParseInt(deadline, 10, 64)
if err != nil {
res.Err = "server: parse deadline failed: " + err.Error()
return res
}
ctx, cancel = context.WithDeadline(ctx, time.UnixMilli(t))
}
}
// 调用服务
result := sr.Invoke(ctx, req)
if result.Err != "" {
res.Err = result.Err
}
res.Data = result.Data
cancel()
// 返回结果
return res
}
func (s *Server) Start(network string, address string) error {
listener, err := net.Listen(network, address)
if err != nil {
return err
}
for {
var conn net.Conn
conn, err = listener.Accept()
if err != nil {
return err
}
/**
三种处理方式:
- 异步:开goroutine处理conn数据
- 半异步:同步处理请求,异步返回响应
- 同步:同步处理请求,返回响应
*/
go func() {
res := &encoding_protocol.Response{}
defer func() {
resBs := res.Encode()
er := s.Write(conn, resBs)
if er != nil {
fmt.Println(er)
return
}
}()
var reqBs []byte
reqBs, err = s.Read(conn)
if err != nil {
res.Err = err.Error()
return
}
req := &encoding_protocol.Request{}
err = req.Decode(reqBs)
if err != nil {
res.Err = err.Error()
return
}
ctx := context.Background()
res = s.Invoke(ctx, req)
}()
}
}
var _ proxy.Proxy = (*Server)(nil)
// Builder 作用:build server时控制serializers注册全部完成后再拷贝到每个service上
type Builder struct {
services map[string]*ReflectionStub
serializers map[uint8]serialize_protocol.Serializer
}
type Opt func(builder *Builder)
func WithSerializers(serializers ...serialize_protocol.Serializer) Opt {
return func(s *Builder) {
if s.serializers == nil {
s.serializers = make(map[uint8]serialize_protocol.Serializer, len(serializers))
}
for _, serializer := range serializers {
s.serializers[serializer.Code()] = serializer
}
}
}
func WithServices(services ...proxy.Service) Opt {
return func(s *Builder) {
if s.services == nil {
s.services = make(map[string]*ReflectionStub, len(services))
}
for _, service := range services {
s.services[service.Name()] = &ReflectionStub{
service: service,
}
}
}
}
func NewServerBuilder(opts ...Opt) *Builder {
b := &Builder{}
for _, opt := range opts {
opt(b)
}
return b
}
func (s *Builder) Build() (*Server, error) {
if s.services == nil {
return nil, errors.New("serverBuilder: 未注册服务")
}
if s.serializers == nil {
serializer := &serialize_protocol.JsonSerializer{}
s.serializers = map[uint8]serialize_protocol.Serializer{
serializer.Code(): serializer,
}
}
for _, service := range s.services {
service.serializers = s.serializers
}
return &Server{
services: s.services,
}, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。