代码拉取完成,页面将自动刷新
package fgin
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// 获取一个grpc服务
func (f *Fgin) GetGrpcServer(path string) *grpc.Server {
// 读取配置
readConfig(path)
// 判断定义的协议
if fginConfig.NetType != "grpc" {
panic(fmt.Sprintf("您配置的net_type:%v,本方法适用的协议为:grpc", fginConfig.NetType))
}
// 服务名必填
if fginConfig.ServiceName == "" {
panic("grpc服务,service_name必填")
}
// tag必填
if fginConfig.Tag == "" {
panic("grpc服务,tag必填")
}
// 初始化日志
logInit()
// 启动通用组件
startFginGrpcPlug()
// 自定义组件
PlugsGrpcStart()
// 创建服务
s := newGrpcServer()
// 实现默认的健康检查
healthServer := health.NewServer()
healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(s, healthServer)
return s
}
func startFginGrpcPlug() {
var wg sync.WaitGroup
// 1. 初始化mysql gorm
wg.Add(1)
go func() {
defer wg.Done()
initGorm()
}()
// 2. 初始化redis
wg.Add(1)
go func() {
defer wg.Done()
initRedis()
}()
// 3. 初始化rabbitmq
wg.Add(1)
go func() {
defer wg.Done()
initRabbitmq()
}()
// 4. grpc客户端
wg.Add(1)
go func() {
defer wg.Done()
initGrpcClientV2()
}()
// 5. jaeger初始化
wg.Add(1)
go func() {
defer wg.Done()
initJaeger()
}()
// 6. 注册到consul,如果有配置
wg.Add(1)
go func() {
defer wg.Done()
initConsul()
}()
// 7. prometheus初始化
wg.Add(1)
go func() {
defer wg.Done()
initPrometheus()
}()
// 8. 神盾启动
wg.Add(1)
go func() {
defer wg.Done()
initAegis()
}()
// 9. 独立校验器
wg.Add(1)
go func() {
defer wg.Done()
initValidator()
}()
wg.Wait()
}
// 启动grpc服务
func (f *Fgin) RunGrpc(s *grpc.Server) {
if fginConfig.Ip == "" {
logSuger.Warn("ip没有配置,使用默认ip:127.0.0.1")
fginConfig.Ip = "127.0.0.1"
}
if fginConfig.Port == 0 {
logSuger.Warn("port没有配置,使用默认port:8081")
fginConfig.Port = 8081
}
address := fmt.Sprintf("%v:%v", fginConfig.Ip, fginConfig.Port)
listen, err := net.Listen("tcp", address)
if err != nil {
panic(err.Error())
}
defer listen.Close()
defer rabbitmqClose()
go func() {
logSuger.Info(fmt.Sprintf("grpc服务%v启动成功:%v", fginConfig.ServiceName, address))
if err := s.Serve(listen); err != nil {
log.Fatalf("listen: %v\n", err)
}
}()
// 优雅停止grpc服务
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
deregisterService() // 存在consul注销服务
logSuger.Info("Grpc Shutdown Server ...")
s.GracefulStop()
logSuger.Info("Grpc Server exiting")
}
// 自定义创建grpc server
func newGrpcServer() *grpc.Server {
// 2025-04-01 将opts配置抛出,实现更多框架自由度
// opts := []grpc.ServerOption{}
// opts = append(opts, grpc.ConnectionTimeout(5*time.Second))
// opts = append(opts, grpc.UnaryInterceptor(fgrpcInterceptor()))
// 使用可注册的opts
opts := grpcServerOptionsObj.opts
// 默认客户端5s没有完成连接,自动断开
if !grpcServerOptionsObj.disableDefaultTimeout {
opts = append(opts, grpc.ConnectionTimeout(5*time.Second))
}
// 添加拦截器
if !grpcServerOptionsObj.disableDefaultUnaryInterceptor {
opts = append(opts, grpc.UnaryInterceptor(fgrpcInterceptor()))
}
s := grpc.NewServer(opts...)
return s
}
// fgrpc拦截器, grpc只要只能有一个拦截器:具备日志功能
func fgrpcInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if info.FullMethod == "/grpc.health.v1.Health/Check" {
// 健康检查忽略日志
return handler(ctx, req)
}
defer recoverInit()
// 限流中间件
if err := fgrpcServerRateLimiterMiddlware(info.FullMethod); err != nil {
return nil, err
}
// 熔断中间件
if err := fgrpcCircuitBreakerMiddlware(info.FullMethod); err != nil {
return nil, err
}
var (
err error
m any
span opentracing.Span
jaegerOk = false
)
start := time.Now()
// 实现jaeger上报
if (fginConfig.Jaeger != Jaeger{}) {
span, ctx, jaegerOk = jaegerInitGrpc(ctx, info.FullMethod)
if jaegerOk {
defer span.Finish()
}
}
// prometheus监控
if (fginConfig.Prometheus != Prometheus{}) {
code := "0"
defer func() {
if err != nil {
grpcErr, ok := status.FromError(err)
if !ok {
// 非grpc框架产生错误, 使用400替代,表示内部逻辑异常
code = "400"
} else {
code = fmt.Sprintf("%d", grpcErr.Code())
}
}
// 计数
requestCount.WithLabelValues("grpc", info.FullMethod, code).Inc()
// 耗时
obs := requestHistogram.WithLabelValues("grpc", info.FullMethod, code)
timer := prometheus.NewTimer(obs)
timer.ObserveDuration()
}()
}
// grpc自定义拦截器
ctx, err = PlugsGrpcMiddStart(ctx, req, info)
if err != nil {
return nil, err
}
// 运行grpc方法
m, err = handler(ctx, req)
// grpc 自定义拦截器 执行后
if err == nil {
err = PlugsGrpcMiddAfterStart(ctx, req, info, err)
}
// 耗时
consTime := time.Since(start)
if err != nil {
if (fginConfig.Log != Log{}) {
logInfo := fmt.Sprintf("RPC:%s,cons:%s,req:%v,err:%v", info.FullMethod, consTime, req, err)
logSuger.Error(logInfo)
}
if jaegerOk {
// 使用jaeger,报错需要上报
span.SetTag("error", true)
span.LogKV("error_message", err.Error())
}
// 熔断器标记失败
circuitBreaker.MarkFailed()
} else {
if (fginConfig.Log != Log{}) {
logInfo := fmt.Sprintf("RPC:%s,cons:%s,req:%v,err:%v", info.FullMethod, consTime, req, err)
logSuger.Info(logInfo)
}
// 熔断器标记成功
circuitBreaker.MarkSuccess()
}
return m, err
}
}
func jaegerInitGrpc(ctx context.Context, service string) (opentracing.Span, context.Context, bool) {
var tracer opentracing.Tracer
if opentracing.IsGlobalTracerRegistered() {
tracer = opentracing.GlobalTracer()
} else {
logSuger.Error("http jaeger GlobalTracer is undefine")
return nil, ctx, false
}
md, _ := metadata.FromIncomingContext(ctx)
spanCtx, err := opentracing.GlobalTracer().Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(md),
)
var span opentracing.Span
if err == nil {
span = opentracing.StartSpan(service, opentracing.ChildOf(spanCtx))
} else {
span = tracer.StartSpan(service)
}
newCtx := opentracing.ContextWithSpan(ctx, span)
return span, newCtx, true
}
// 处理panic异常
func recoverInit() {
if err := recover(); err != nil {
if (fginConfig.Log == Log{} || logSuger == nil) {
fmt.Println(err)
} else {
logSuger.Error(err)
}
}
}
// 限流器
func fgrpcServerRateLimiterMiddlware(method string) error {
if err := rateLimiter.Allow(); err != nil {
logSuger.Info(fginConfig.ServiceName + ":" + method + rateErrMsg)
return status.Error(rateCode, fginConfig.ServiceName+":"+method+rateErrMsg)
}
return nil
}
// 熔断器
func fgrpcCircuitBreakerMiddlware(method string) error {
if err := circuitBreaker.Allow(); err != nil {
logSuger.Info(fginConfig.ServiceName + ":" + method + circuitErrMsg)
return status.Error(circuitCode, fginConfig.ServiceName+":"+method+circuitErrMsg)
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。