1 Star 0 Fork 0

llakcs / agile-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 5.86 KB
AI 代码解读
一键复制 编辑 原始数据 按行查看 历史
llakcs 提交于 2024-01-31 16:54 . 第一次提交
package grpc
import (
"context"
"crypto/tls"
"fmt"
"gitee.com/llakcs/agile-go/middleware"
"gitee.com/llakcs/agile-go/registry"
"gitee.com/llakcs/agile-go/selector"
"gitee.com/llakcs/agile-go/transport/grpc/resolver/discovery"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
grpcinsecure "google.golang.org/grpc/credentials/insecure"
"time"
)
func init() {
if selector.GlobalSelector() == nil {
selector.SetGlobalSelector(selector.NewWeightedRoundRobinBuilder())
}
}
type clientOptions struct {
endpoint string
tlsConf *tls.Config
timeout time.Duration
discovery registry.Discovery
middleware []middleware.Middleware
ints []grpc.UnaryClientInterceptor
streamInts []grpc.StreamClientInterceptor
grpcOpts []grpc.DialOption
healthCheckConfig string
printDiscoveryDebugLog bool
balancerName string
}
// ClientOption is gRPC client option.
type ClientOption func(o *clientOptions)
// WithEndpoint with client endpoint.
func WithEndpoint(endpoint string) ClientOption {
return func(o *clientOptions) {
o.endpoint = endpoint
}
}
// WithTimeout with client timeout.
func WithTimeout(timeout time.Duration) ClientOption {
return func(o *clientOptions) {
o.timeout = timeout
}
}
// WithMiddleware with client middleware.
func WithMiddleware(m ...middleware.Middleware) ClientOption {
return func(o *clientOptions) {
o.middleware = m
}
}
// WithDiscovery with client discovery.
func WithDiscovery(d registry.Discovery) ClientOption {
return func(o *clientOptions) {
o.discovery = d
}
}
// WithTLSConfig with TLS config.
func WithTLSConfig(c *tls.Config) ClientOption {
return func(o *clientOptions) {
o.tlsConf = c
}
}
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(in ...grpc.UnaryClientInterceptor) ClientOption {
return func(o *clientOptions) {
o.ints = in
}
}
// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
func WithStreamInterceptor(in ...grpc.StreamClientInterceptor) ClientOption {
return func(o *clientOptions) {
o.streamInts = in
}
}
// WithOptions with gRPC options.
func WithOptions(opts ...grpc.DialOption) ClientOption {
return func(o *clientOptions) {
o.grpcOpts = opts
}
}
// DialInsecure returns an insecure GRPC connection.
func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, true, opts...)
}
// Dial returns a GRPC connection.
func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, false, opts...)
}
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: balancerName,
printDiscoveryDebugLog: true,
healthCheckConfig: `,"healthCheckConfig":{"serviceName":""}`,
}
for _, o := range opts {
o(&options)
}
ints := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
}
sints := []grpc.StreamClientInterceptor{
streamClientInterceptor(),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
if len(options.streamInts) > 0 {
sints = append(sints, options.streamInts...)
}
grpcOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]%s}`,
options.balancerName, options.healthCheckConfig)),
grpc.WithChainUnaryInterceptor(ints...),
grpc.WithChainStreamInterceptor(sints...),
}
if options.discovery != nil {
grpcOpts = append(grpcOpts,
grpc.WithResolvers(
discovery.NewBuilder(
options.discovery,
discovery.WithInsecure(insecure),
discovery.PrintDebugLog(options.printDiscoveryDebugLog),
)))
}
if insecure {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
}
if options.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
}
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
// gRPC 单向请求的客户端拦截器
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if timeout > 0 {
//超时设置
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
h := func(ctx context.Context, req interface{}) (interface{}, error) {
//if tr, ok := transport.FromClientContext(ctx); ok {
// header := tr.RequestHeader()
// keys := header.Keys()
// keyvals := make([]string, 0, len(keys))
// for _, k := range keys {
// keyvals = append(keyvals, k, header.Get(k))
// }
// ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
//}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}
//var p selector.Peer
//ctx = selector.NewPeerContext(ctx, &p)
_, err := h(ctx, req)
return err
}
}
// gRPC 流式请求的客户端拦截器
func streamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { // nolint
//ctx = transport.NewClientContext(ctx, &Transport{
// endpoint: cc.Target(),
// operation: method,
// reqHeader: headerCarrier{},
// nodeFilters: filters,
//})
//var p selector.Peer
//ctx = selector.NewPeerContext(ctx, &p)
return streamer(ctx, desc, cc, method, opts...)
}
}
1
https://gitee.com/llakcs/agile-go.git
git@gitee.com:llakcs/agile-go.git
llakcs
agile-go
agile-go
v1.2.0

搜索帮助