1 Star 0 Fork 0

天雨流芳 / go-micro-framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
tracinginterceptor.go 4.30 KB
一键复制 编辑 原始数据 按行查看 历史
天雨流芳 提交于 2024-03-22 15:24 . 新增grpc服务
package serverinterceptors
import (
"context"
trace2 "gitee.com/tylf2018/go-micro-framework/core/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
gcodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// UnaryTracingInterceptor is a grpc.UnaryServerInterceptor for opentelemetry.
func UnaryTracingInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (any, error) {
ctx, span := startSpan(ctx, info.FullMethod)
defer span.End()
trace2.MessageReceived.Event(ctx, 1, req)
resp, err := handler(ctx, req)
if err != nil {
s, ok := status.FromError(err)
if ok {
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(trace2.StatusCodeAttr(s.Code()))
trace2.MessageSent.Event(ctx, 1, s.Proto())
} else {
span.SetStatus(codes.Error, err.Error())
}
return nil, err
}
span.SetAttributes(trace2.StatusCodeAttr(gcodes.OK))
trace2.MessageSent.Event(ctx, 1, resp)
return resp, nil
}
// StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
func StreamTracingInterceptor(svr any, ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
ctx, span := startSpan(ss.Context(), info.FullMethod)
defer span.End()
if err := handler(svr, wrapServerStream(ctx, ss)); err != nil {
s, ok := status.FromError(err)
if ok {
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(trace2.StatusCodeAttr(s.Code()))
} else {
span.SetStatus(codes.Error, err.Error())
}
return err
}
span.SetAttributes(trace2.StatusCodeAttr(gcodes.OK))
return nil
}
// serverStream wraps around the embedded grpc.ServerStream,
// and intercepts the RecvMsg and SendMsg method call.
type serverStream struct {
grpc.ServerStream
ctx context.Context
receivedMessageID int
sentMessageID int
}
func (w *serverStream) Context() context.Context {
return w.ctx
}
func (w *serverStream) RecvMsg(m any) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
w.receivedMessageID++
trace2.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *serverStream) SendMsg(m any) error {
err := w.ServerStream.SendMsg(m)
w.sentMessageID++
trace2.MessageSent.Event(w.Context(), w.sentMessageID, m)
return err
}
func startSpan(ctx context.Context, method string) (context.Context, trace.Span) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.MD{}
}
bags, spanCtx := trace2.Extract(ctx, otel.GetTextMapPropagator(), &md)
ctx = baggage.ContextWithBaggage(ctx, bags)
tr := otel.Tracer(trace2.TraceName)
name, attr := trace2.SpanInfo(method, trace2.PeerFromCtx(ctx))
/*
trace.ContextWithRemoteSpanContext(ctx, spanCtx)
这是一个OpenTelemetry的函数调用,它接受两个参数
分别是原始的上下文ctx和远程追踪的SpanContext对象spanCtx
该函数会创建一个新的上下文对象, 并将远程追踪的SpanContext信息添加到新的上下文中
将远程追踪的SpanContext信息与当前的上下文关联起来
这样,在后续的代码中, 使用这个新的上下文对象ctx进行追踪操作时, 就可以将请求的追踪信息关联到当前的追踪上下文中
这对于分布式系统中的追踪和监控非常重要, 因为它能够跟踪请求在不同服务之间的传播路径,从而形成完整的分布式追踪链路
trace.WithSpanKind(trace.SpanKindServer)
这是一个 trace.WithSpanKind 函数调用, 用于设置创建的新追踪 Span 的种类
在这里, 使用 trace.SpanKindServer 表示该 Span 是服务端追踪。
trace.WithAttributes(attr...)
这是一个 trace.WithAttributes 函数调用, 用于设置创建的新追踪 Span 的属性
attr... 是一个属性列表,包含了多个键值对,用于添加一些自定义的追踪属性信息。
*/
return tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
}
// wrapServerStream wraps the given grpc.ServerStream with the given context.
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/tylf2018/go-micro-framework.git
git@gitee.com:tylf2018/go-micro-framework.git
tylf2018
go-micro-framework
go-micro-framework
4cc90ded505a

搜索帮助