1 Star 0 Fork 0

vick / kinfu

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
fgrpc.go 4.75 KB
一键复制 编辑 原始数据 按行查看 历史
vick 提交于 2024-03-04 19:35 . fix:fgin框架提交
package fgin
import (
"context"
"fmt"
"io"
"net"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
)
// 手动实现健康检查部分
type HealthService struct{}
func (s *HealthService) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_SERVING,
}, nil
}
func (s *HealthService) Watch(req *healthpb.HealthCheckRequest, srv healthpb.Health_WatchServer) error {
return fmt.Errorf("HealthService.Watch() method not implemented")
}
// 获取一个grpc服务
func (f *Fgin) GetGrpcServer(path string) *grpc.Server {
// 读取配置
readConfig(path)
// 判断定义的协议
if fginConfig.NetType != "grpc" {
panic(fmt.Sprintf("您配置的net_type:%v,本方法适用的协议为:grpc", fginConfig.NetType))
}
// 服务名必填
if fginConfig.ServiceName == "" {
panic("grpc服务,service_name必填")
}
// tag必填
if fginConfig.Tag == "" {
panic("grpc服务,tag必填")
}
// 初始化日志
logInit()
// 初始化mysql gorm
initGorm()
// 初始化redis
initRedis()
// 初始化rabbitmq
initRabbitmq()
// 创建服务
s := newGrpcServer()
// 实现默认的健康检查
healthpb.RegisterHealthServer(s, &HealthService{})
return s
}
// 启动grpc服务
func (f *Fgin) RunGrpc(s *grpc.Server) {
if fginConfig.Ip == "" {
logSuger.Warn("ip没有配置,使用默认ip:127.0.0.1")
fginConfig.Ip = "127.0.0.1"
}
if fginConfig.Port == 0 {
logSuger.Warn("port没有配置,使用默认port:8081")
fginConfig.Port = 8081
}
address := fmt.Sprintf("%v:%v", fginConfig.Ip, fginConfig.Port)
listen, err := net.Listen("tcp", address)
if err != nil {
panic(err.Error())
}
defer listen.Close()
defer rabbitmqClose()
go func() {
// 注册到consul,如果有配置
initConsul()
}()
logSuger.Info(fmt.Sprintf("grpc服务%v启动成功:%v", fginConfig.ServiceName, address))
if err := s.Serve(listen); err != nil {
panic(err.Error())
}
}
// 自定义创建grpc server
func newGrpcServer() *grpc.Server {
opts := []grpc.ServerOption{}
// 添加拦截器
opts = append(opts, grpc.UnaryInterceptor(fgrpcInterceptor()))
s := grpc.NewServer(opts...)
return s
}
// fgrpc拦截器, grpc只要只能有一个拦截器:具备日志功能
func fgrpcInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
defer recoverInit()
start := time.Now()
m, err := handler(ctx, req)
if info.FullMethod == "/grpc.health.v1.Health/Check" {
// 健康检查忽略日志
return m, err
}
var close io.Closer
var span opentracing.Span // 声明方法中的全局变量
var jaegerOk = false
if (fginConfig.Jaeger != Jaeger{}) {
span, close, jaegerOk = jaegerInit(ctx, info.FullMethod)
if jaegerOk {
defer close.Close()
defer span.Finish()
}
}
end := time.Now()
if err != nil {
if (fginConfig.Log != Log{}) {
logInfo := fmt.Sprintf("RPC: %s,req:%v start time: %s, end time: %s, err: %v", info.FullMethod, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
logSuger.Error(logInfo)
}
if jaegerOk {
// 使用jaeger,报错需要上报
span.SetTag("error", true)
span.LogKV("error_message", err.Error())
}
} else {
if (fginConfig.Log != Log{}) {
logInfo := fmt.Sprintf("RPC: %s,req:%v start time: %s, end time: %s, err: %v", info.FullMethod, req, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
logSuger.Info(logInfo)
}
}
return m, err
}
}
func jaegerInit(ctx context.Context, service string) (opentracing.Span, io.Closer, bool) {
cfg := &config.Configuration{
ServiceName: fginConfig.ServiceName,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: fginConfig.Jaeger.Address,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
logSuger.Error("grpc jaeger初始化失败")
return nil, nil, false
}
opentracing.SetGlobalTracer(tracer)
md, _ := metadata.FromIncomingContext(ctx)
spanCtx, err := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(md),
)
if err != nil {
logSuger.Error("grpc jaegerSpan err:" + err.Error())
return nil, nil, false
}
span := opentracing.StartSpan(service, opentracing.ChildOf(spanCtx))
return span, closer, true
}
// 处理panic异常
func recoverInit() {
if err := recover(); err != nil {
if (fginConfig.Log == Log{}) {
fmt.Println(err)
} else {
logSuger.Error(err)
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/wu-jin-feng/kinfu.git
git@gitee.com:wu-jin-feng/kinfu.git
wu-jin-feng
kinfu
kinfu
c48b5026ddc5

搜索帮助

344bd9b3 5694891 D2dac590 5694891