1 Star 0 Fork 0

s-dy/yogurt

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
interceptor.go 2.35 KB
一键复制 编辑 原始数据 按行查看 历史
s-dy 提交于 2022-11-07 22:55 +08:00 . grpc
package grpc
import (
"context"
"gitee.com/sdynasty/yogurt/middleware"
"gitee.com/sdynasty/yogurt/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
ic "gitee.com/sdynasty/yogurt/internal/context"
)
// unaryServerInterceptor is a gRPC unary server interceptor 一元拦截器
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := ic.Merge(ctx, s.baseCtx)
defer cancel()
md, _ := metadata.FromIncomingContext(ctx)
replyHeader := metadata.MD{}
tr := &Transport{
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
}
if s.endpoint != nil {
tr.endpoint = s.endpoint.String()
}
ctx = transport.NewServerContext(ctx, tr)
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}
h := func(ctx context.Context, req any) (any, error) {
return handler(ctx, req)
}
if next := s.middleware.Match(tr.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}
reply, err := h(ctx, req)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader) // 另一个方法grpc.SetTrailer也是写入元数据,在流式通讯中挂在最后一个流尾
}
return reply, err
}
}
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func NewWrappedStream(ctx context.Context, stream grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{
ServerStream: stream,
ctx: ctx,
}
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (s *Server) streamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx, cancel := ic.Merge(ss.Context(), s.baseCtx)
defer cancel()
md, _ := metadata.FromIncomingContext(ctx)
replyHeader := metadata.MD{}
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
})
ws := NewWrappedStream(ctx, ss)
err := handler(srv, ws)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return err
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sdynasty/yogurt.git
git@gitee.com:sdynasty/yogurt.git
sdynasty
yogurt
yogurt
01d4da0b550e

搜索帮助