1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
Server.go 6.74 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-05-13 15:36 . 修改config
package grpcserver
import (
"context"
"crypto/tls"
"gitee.com/tylf2018/go-micro-framework/pkg/common/endpoint"
utilNet "gitee.com/tylf2018/go-micro-framework/pkg/common/util/net"
log "gitee.com/tylf2018/go-micro-framework/pkg/logger"
srvintc "gitee.com/tylf2018/go-micro-framework/server/grpcserver/interceptors/server_interceptors"
"gitee.com/tylf2018/go-micro-framework/server/grpcserver/proto/metadata"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"net"
"net/url"
"time"
)
type ServerOption func(o *Server)
type Server struct {
*grpc.Server
address string
lis net.Listener
tlsConf *tls.Config
endpoint *url.URL
unaryInterceptors []grpc.UnaryServerInterceptor // 一元拦截器
streamInterceptors []grpc.StreamServerInterceptor // 流式拦截器
grpcOpts []grpc.ServerOption
timeout time.Duration
// metadata是内置的一个查看当前支持grpc服务的接口服务
metadata *metadata.Server
enableHealthCheck bool
enableTracing bool
enableMetrics bool
}
func NewServer(opts ...ServerOption) *Server {
srv := &Server{
address: ":0", // 在没有设置 address 自己获取 ip 和 端口号
timeout: 5 * time.Minute,
enableTracing: false,
enableHealthCheck: true,
enableMetrics: false,
}
if len(opts) > 0 {
for _, opt := range opts {
opt(srv)
}
}
srv.buildServer()
return srv
}
func (s *Server) buildServer() {
// 一元拦截器
s.unaryInterceptors = s.buildUnaryInterceptors()
// 流式拦截器
s.streamInterceptors = s.buildStreamInterceptors()
// grpc.ServerOption
s.grpcOpts = s.buildGrpcOpts()
// 创建服务
s.Server = grpc.NewServer(s.grpcOpts...)
// register metadata Server
s.metadata = metadata.NewServer(s.Server)
// analysis 解析 address
if err := s.listenAndEndpoint(); err != nil {
panic(err)
}
// 注册健康检测服务
if s.enableHealthCheck {
grpc_health_v1.RegisterHealthServer(s.Server, health.NewServer())
}
// 可以支持 用户直接通过 grpc 的一个接口查看当前支持的所有的 rpc 服务
metadata.RegisterMetadataServer(s.Server, s.metadata) // 关键函数 是gRPC服务器端用于注册服务的方法
reflection.Register(s.Server)
}
func (s *Server) buildGrpcOpts() []grpc.ServerOption {
// 把传入的拦截器 转换成 grpc 的 ServerOption
grpcOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(s.unaryInterceptors...),
grpc.ChainStreamInterceptor(s.streamInterceptors...),
}
if s.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(s.tlsConf)))
}
// 把用户传入的grpc.ServerOption 放在一起
if len(s.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, s.grpcOpts...)
}
return grpcOpts
}
func (s *Server) buildStreamInterceptors() []grpc.StreamServerInterceptor {
streamInterceptors := []grpc.StreamServerInterceptor{
srvintc.StreamRecoverInterceptor, // 流式拦截器 错误 不退出程序 继续运行
}
if len(s.streamInterceptors) > 0 {
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
}
return streamInterceptors
}
func (s *Server) buildUnaryInterceptors() []grpc.UnaryServerInterceptor {
unaryInterceptors := []grpc.UnaryServerInterceptor{
srvintc.UnaryRecoverInterceptor, // 一元拦截器 异常处理(而不是一层层抛出 然后停止程序)
}
// 开启链路追踪
if s.enableTracing {
unaryInterceptors = append(unaryInterceptors, srvintc.UnaryTracingInterceptor)
}
// 开启 普罗米修斯监控
if s.enableMetrics {
unaryInterceptors = append(unaryInterceptors, srvintc.UnaryPrometheusInterceptor)
}
if s.timeout > 0 {
unaryInterceptors = append(unaryInterceptors, srvintc.UnaryTimeoutInterceptor(s.timeout))
}
if len(s.unaryInterceptors) > 0 {
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
}
return unaryInterceptors
}
func WithAddress(address string) ServerOption {
return func(s *Server) {
s.address = address
}
}
func WithListener(lis net.Listener) ServerOption {
return func(s *Server) {
s.lis = lis
}
}
func WithTLSConfig(c *tls.Config) ServerOption {
return func(s *Server) {
s.tlsConf = c
}
}
func WithServerUnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption {
return func(s *Server) {
s.unaryInterceptors = in
}
}
func AddServerUnaryInterceptor(in ...grpc.UnaryServerInterceptor) ServerOption {
return func(s *Server) {
s.unaryInterceptors = append(s.unaryInterceptors, in...)
}
}
func WithStreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption {
return func(s *Server) {
s.streamInterceptors = in
}
}
func AddStreamInterceptor(in ...grpc.StreamServerInterceptor) ServerOption {
return func(s *Server) {
s.streamInterceptors = append(s.streamInterceptors, in...)
}
}
func WithServerOption(opts ...grpc.ServerOption) ServerOption {
return func(s *Server) {
s.grpcOpts = opts
}
}
func AddServerOption(opts ...grpc.ServerOption) ServerOption {
return func(s *Server) {
s.grpcOpts = append(s.grpcOpts, opts...)
}
}
func WithServerTimeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.timeout = timeout
}
}
func WithServerEnableTracing(enableTracing bool) ServerOption {
return func(s *Server) {
s.enableTracing = enableTracing
}
}
// WithServerMetrics 设置是否开启 普罗米修斯 监控
func WithServerMetrics(metric bool) ServerOption {
return func(s *Server) {
s.enableMetrics = metric
}
}
// WithHealthCheck 设置是否开启健康检查
func WithHealthCheck(healthCheck bool) ServerOption {
return func(s *Server) {
s.enableHealthCheck = healthCheck
}
}
func (s *Server) Address() string {
return s.address
}
// Server的接口
// Start 启动 grpc 的服务
func (s *Server) Start(ctx context.Context) error {
log.InfoF("[gRPC] server listening on: %s", s.lis.Addr().String())
return s.Server.Serve(s.lis)
}
func (s *Server) Stop(ctx context.Context) error {
s.GracefulStop() // grpc 优雅退出
log.Info("[gRPC] server stopping")
return nil
}
// Endpointer 接口
// Endpoint return a real address to registry endpoint.
// examples:
//
// grpc://127.0.0.1:9000?isSecure=false
func (s *Server) Endpoint() (*url.URL, error) {
if err := s.listenAndEndpoint(); err != nil {
return nil, err
}
return s.endpoint, nil
}
// 完成 ip 和 端口的提取
func (s *Server) listenAndEndpoint() error {
if s.lis == nil {
lis, err := net.Listen("tcp", s.address)
if err != nil {
return err
}
s.lis = lis
}
if s.endpoint == nil {
addr, err := utilNet.Extract(s.address, s.lis)
if err != nil {
return err
}
s.endpoint = endpoint.NewEndpoint(endpoint.Scheme("grpc", s.tlsConf != nil), addr)
}
return nil
}
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
e87e0c3d7074

搜索帮助

53164aa7 5694891 3bd8fe86 5694891