1 Star 0 Fork 0

xlizy/common-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
nacos_grpc.go 6.36 KB
一键复制 编辑 原始数据 按行查看 历史
xlizy 提交于 2024-04-25 10:21 . save
package nacos_grpc
import (
"context"
"errors"
"fmt"
commonConfig "gitee.com/xlizy/common-go/base/config"
"gitee.com/xlizy/common-go/base/const/threadlocal"
"gitee.com/xlizy/common-go/base/enums/common_error"
"gitee.com/xlizy/common-go/components/nacos/v2"
"gitee.com/xlizy/common-go/utils/common"
"gitee.com/xlizy/common-go/utils/json"
"gitee.com/xlizy/common-go/utils/zlog"
"github.com/google/uuid"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"net"
"sync"
)
const traceIdKey = "trace_id"
var consumerConns = make(map[string]map[string]*grpc.ClientConn)
var consumerConnsLock = sync.RWMutex{}
type Service struct {
Name string
Weight float64
RegHandle func(s grpc.ServiceRegistrar)
}
type Services struct {
Consumer []Service
Provider []Service
}
var providerInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if _err := recover(); _err != nil {
errMsg := fmt.Sprintf("%s", _err)
zlog.Error("Grpc接口实现方法发生异常:{},{}", info.FullMethod, errMsg)
err = status.Error(codes.Code(common_error.RPC_EXECUTE_ERROR.Code()), common_error.RPC_EXECUTE_ERROR.Des())
}
}()
md, ok := metadata.FromIncomingContext(ctx)
if ok {
var traceId string
value, ok := md[traceIdKey]
if ok {
traceId = value[0]
} else {
traceId = uuid.New().String()
}
threadlocal.SetTraceId(traceId)
}
zlog.Info("grpc请求体:{}", json.ToJsonStr(req))
resp, err = handler(ctx, req)
if err != nil {
zlog.Error("grpc调用方法发生异常:{}", err.Error())
}
zlog.Info("grpc结果:{}", json.ToJsonStr(resp))
return resp, err
}
var consumerInterceptor = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
zlog.Info("调用grpc接口method:{},conn:{}", method, cc.Target())
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs(traceIdKey, threadlocal.GetTraceId()))
zlog.Info("grpc请求体:{}", json.ToJsonStr(req))
err := invoker(ctx, method, req, reply, cc, opts...)
zlog.Info("grpc结果:{}", json.ToJsonStr(reply))
return err
}
func InitNacosGrpc(services Services) {
namingClient := nacos.GetNamingClient()
if namingClient == nil {
zlog.Error("连接Nacos异常:namingClient is nil")
panic("连接Nacos异常")
} else {
for _, service := range services.Provider {
port, _ := common.GetFreePort()
listen, err1 := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err1 != nil {
zlog.Error("监听TCP端口异常", err1.Error())
panic(err1)
} else {
server := grpc.NewServer(grpc.UnaryInterceptor(providerInterceptor))
go func() {
if err := server.Serve(listen); err != nil {
zlog.Error("启动grpc服务异常:{}", err.Error())
panic("启动grpc服务异常")
}
}()
service.RegHandle(server)
regOk, err2 := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: common.GetLocalPriorityIp(commonConfig.PriorityNetwork.Networks),
Port: uint64(port),
ServiceName: "grpc:" + service.Name,
Weight: service.Weight,
Enable: true,
Healthy: true,
Ephemeral: true,
GroupName: "DEFAULT_GROUP",
})
if err2 != nil {
zlog.Error("注册grpc服务异常:{}", err2.Error())
panic("注册grpc服务异常")
} else {
if !regOk {
zlog.Error("注册grpc服务异常失败")
}
}
}
}
var updateConsumerConns = func(serviceName string, instances []model.Instance) {
consumerConnsLock.Lock()
for connKey, conn := range consumerConns[serviceName] {
delete(consumerConns[serviceName], connKey)
_ = conn.Close()
}
for _, instance := range instances {
connKey := instance.Ip + ":" + fmt.Sprintf("%d", instance.Port)
zlog.Info("connKey:{}", connKey)
if instance.Healthy && instance.Enable {
conn, err2 := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(consumerInterceptor))
if err2 != nil {
zlog.Error("连接grpc服务异常:{}", err2.Error())
} else {
consumerConns[serviceName][connKey] = conn
}
}
}
zlog.Info("最新consumerConns:{}", json.ToJsonStr(consumerConns))
consumerConnsLock.Unlock()
}
for _, service := range services.Consumer {
consumerConnsLock.Lock()
_, ok := consumerConns[service.Name]
if !ok {
consumerConns[service.Name] = make(map[string]*grpc.ClientConn)
}
consumerConnsLock.Unlock()
instances, err1 := namingClient.SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: "grpc:" + service.Name,
Clusters: []string{"DEFAULT"},
GroupName: "DEFAULT_GROUP",
})
if err1 != nil {
zlog.Error("获取grpc可用实例列表异常:{}", err1.Error())
} else {
updateConsumerConns(service.Name, instances)
}
err0 := nacos.BuildNamingClient().Subscribe(&vo.SubscribeParam{
ServiceName: "grpc:" + service.Name,
GroupName: "DEFAULT_GROUP",
Clusters: []string{"DEFAULT"},
SubscribeCallback: func(services []model.Instance, err error) {
threadlocal.SetTraceId(uuid.New().String())
zlog.Info("收到订阅消息:{}", fmt.Sprintf("%d", len(services)))
updateConsumerConns(service.Name, services)
},
})
if err0 != nil {
zlog.Error("订阅grpc服务异常:{}", err0.Error())
}
}
}
}
func GetGrpcConn(serviceName string) *grpc.ClientConn {
var conn *grpc.ClientConn
connKeyExist := false
connKey, err := nacos.GetAppIns("grpc:" + serviceName)
zlog.Info("GetAppIns:{}", connKey)
if err == nil {
consumerConnsLock.RLock()
conn, connKeyExist = consumerConns[serviceName][connKey]
zlog.Info("consumerConnsLen:{}", len(consumerConns[serviceName]))
for s, _ := range consumerConns[serviceName] {
zlog.Info("consumerConnsKey:{}", s)
}
consumerConnsLock.RUnlock()
if connKeyExist {
return conn
} else {
zlog.Error("获取grpc服务实例异常-未找到可用实例:{}", serviceName)
panic(errors.New("无可用Grpc服务:" + serviceName))
}
} else {
panic(errors.New("无可用Grpc服务:" + serviceName))
}
return conn
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/xlizy/common-go.git
git@gitee.com:xlizy/common-go.git
xlizy
common-go
common-go
v0.2.1

搜索帮助

0d507c66 1850385 C8b1a773 1850385