2 Star 1 Fork 1

mosache/YFrame

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
client.go 5.25 KB
一键复制 编辑 原始数据 按行查看 历史
ヤ沒脩袮兲︶ 提交于 2023-08-29 18:31 . temp
package rpc
import (
"context"
"errors"
"gitee.com/mosache/YFrame/micro/rpc/compress"
"gitee.com/mosache/YFrame/micro/rpc/compress/nothing"
"gitee.com/mosache/YFrame/micro/rpc/internal/errs"
"gitee.com/mosache/YFrame/micro/rpc/message"
"gitee.com/mosache/YFrame/micro/rpc/serialize"
"gitee.com/mosache/YFrame/micro/rpc/serialize/json"
"github.com/silenceper/pool"
"net"
"reflect"
"strconv"
"time"
)
var (
ErrServiceIsNil = errors.New("[rpc] service 不能为nil")
ErrStructPtrOnly = errors.New("[rpc] service 只能传入结构体指针")
)
func (c *Client) initService(srv Service) error {
return setFuncField(srv, c, c.serializer, c.compressor)
}
func setFuncField(srv Service, p Proxy, serializer serialize.Serializer, compressor compress.Compressor) error {
if srv == nil {
return ErrServiceIsNil
}
typ := reflect.TypeOf(srv)
if typ.Kind() != reflect.Ptr || typ.Elem().Kind() != reflect.Struct {
return ErrStructPtrOnly
}
typ = typ.Elem()
val := reflect.ValueOf(srv).Elem()
for i := 0; i < typ.NumField(); i++ {
fieldType := typ.Field(i)
fieldVal := val.Field(i)
if fieldVal.CanSet() {
fn := func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
retVal := reflect.New(fieldType.Type.Out(0).Elem())
reqData, err := serializer.Encode(args[1].Interface())
if err != nil {
return []reflect.Value{retVal, reflect.ValueOf(err)}
}
reqData, err = compressor.Compress(reqData)
if err != nil {
return []reflect.Value{retVal, reflect.ValueOf(err)}
}
meta := make(map[string]string, 2)
if deadline, ok := ctx.Deadline(); ok {
meta["deadline"] = strconv.FormatInt(deadline.UnixMilli(), 10)
}
if IsOneWayContext(ctx) {
meta["onWay"] = "onWay"
}
req := &message.Request{
ServiceName: srv.Name(),
MethodName: fieldType.Name,
Data: reqData,
Serializer: serializer.Code(),
Compresser: compressor.Code(),
Meta: meta,
}
req.CalculateHeaderLength()
req.CalculateBodyLength()
resp, err := p.Invoke(ctx, req)
if err != nil {
return []reflect.Value{retVal, reflect.ValueOf(err)}
}
var serverErr error
if len(resp.Error) > 0 {
serverErr = errors.New(string(resp.Error))
}
if resp.Data, err = compressor.UnCompress(resp.Data); err != nil {
return []reflect.Value{retVal, reflect.ValueOf(err)}
}
err = serializer.Decode(resp.Data, retVal.Interface())
if err != nil {
return []reflect.Value{retVal, reflect.ValueOf(err)}
}
var retErrVal reflect.Value
if serverErr == nil {
retErrVal = reflect.Zero(reflect.TypeOf(new(error)).Elem())
} else {
retErrVal = reflect.ValueOf(serverErr)
}
return []reflect.Value{retVal, retErrVal}
}
proxyFunc := reflect.MakeFunc(fieldType.Type, fn)
fieldVal.Set(proxyFunc)
}
}
return nil
}
type clientOption func(c *Client)
type Client struct {
addr string
network string
pool pool.Pool
serializer serialize.Serializer
compressor compress.Compressor
}
func ClientWithSerializer(s serialize.Serializer) clientOption {
return func(c *Client) {
c.serializer = s
}
}
func ClientWithCompressor(compressor compress.Compressor) clientOption {
return func(c *Client) {
c.compressor = compressor
}
}
func NewClient(addr string, opts ...clientOption) *Client {
p, err := pool.NewChannelPool(&pool.Config{
MaxCap: 30,
MaxIdle: 10,
Factory: func() (interface{}, error) {
return net.DialTimeout("tcp", addr, time.Second*3)
},
Close: func(i interface{}) error {
return i.(net.Conn).Close()
},
IdleTimeout: time.Minute,
})
if err != nil {
panic(err)
}
c := &Client{addr: addr,
network: "tcp",
pool: p,
serializer: json.Serializer{},
compressor: nothing.Compressor{},
}
for _, opt := range opts {
opt(c)
}
return c
}
func (c *Client) Invoke(ctx context.Context, req *message.Request) (*message.Response, error) {
if e := ctx.Err(); e != nil {
return nil, e
}
var (
resp *message.Response
err error
)
ch := make(chan struct{})
defer func() {
close(ch)
}()
go func() {
resp, err = c.doInvoke(ctx, req)
select {
case ch <- struct{}{}:
default:
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ch:
return resp, err
}
//data := message.EncodeReq(req)
//
//respData, err := c.send(data)
//if err != nil {
// return nil, err
//}
//
//if IsOneWayContext(ctx) {
// return nil, errs.ErrOneWayCall
//}
//
//resp := message.DecodeResp(respData)
//resp.CalculateHeaderLength()
//resp.CalculateBodyLength()
//return resp, nil
}
func (c *Client) doInvoke(ctx context.Context, req *message.Request) (*message.Response, error) {
data := message.EncodeReq(req)
respData, err := c.send(data)
if err != nil {
return nil, err
}
if IsOneWayContext(ctx) {
return nil, errs.ErrOneWayCall
}
resp := message.DecodeResp(respData)
resp.CalculateHeaderLength()
resp.CalculateBodyLength()
return resp, nil
}
func (c *Client) send(data []byte) ([]byte, error) {
val, err := c.pool.Get()
if err != nil {
return nil, err
}
conn := val.(net.Conn)
defer func() {
_ = conn.Close()
}()
_, err = conn.Write(data)
if err != nil {
return nil, err
}
/// read
return ReadMsg(conn)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mosache/YFrame.git
git@gitee.com:mosache/YFrame.git
mosache
YFrame
YFrame
v0.1.72

搜索帮助