代码拉取完成,页面将自动刷新
package serverinterceptors
import (
"context"
trace2 "gitee.com/tylf2018/go-micro-framework/core/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
gcodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// UnaryTracingInterceptor is a grpc.UnaryServerInterceptor for opentelemetry.
func UnaryTracingInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (any, error) {
ctx, span := startSpan(ctx, info.FullMethod)
defer span.End()
trace2.MessageReceived.Event(ctx, 1, req)
resp, err := handler(ctx, req)
if err != nil {
s, ok := status.FromError(err)
if ok {
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(trace2.StatusCodeAttr(s.Code()))
trace2.MessageSent.Event(ctx, 1, s.Proto())
} else {
span.SetStatus(codes.Error, err.Error())
}
return nil, err
}
span.SetAttributes(trace2.StatusCodeAttr(gcodes.OK))
trace2.MessageSent.Event(ctx, 1, resp)
return resp, nil
}
// StreamTracingInterceptor returns a grpc.StreamServerInterceptor for opentelemetry.
func StreamTracingInterceptor(svr any, ss grpc.ServerStream, info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
ctx, span := startSpan(ss.Context(), info.FullMethod)
defer span.End()
if err := handler(svr, wrapServerStream(ctx, ss)); err != nil {
s, ok := status.FromError(err)
if ok {
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(trace2.StatusCodeAttr(s.Code()))
} else {
span.SetStatus(codes.Error, err.Error())
}
return err
}
span.SetAttributes(trace2.StatusCodeAttr(gcodes.OK))
return nil
}
// serverStream wraps around the embedded grpc.ServerStream,
// and intercepts the RecvMsg and SendMsg method call.
type serverStream struct {
grpc.ServerStream
ctx context.Context
receivedMessageID int
sentMessageID int
}
func (w *serverStream) Context() context.Context {
return w.ctx
}
func (w *serverStream) RecvMsg(m any) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
w.receivedMessageID++
trace2.MessageReceived.Event(w.Context(), w.receivedMessageID, m)
}
return err
}
func (w *serverStream) SendMsg(m any) error {
err := w.ServerStream.SendMsg(m)
w.sentMessageID++
trace2.MessageSent.Event(w.Context(), w.sentMessageID, m)
return err
}
func startSpan(ctx context.Context, method string) (context.Context, trace.Span) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.MD{}
}
bags, spanCtx := trace2.Extract(ctx, otel.GetTextMapPropagator(), &md)
ctx = baggage.ContextWithBaggage(ctx, bags)
tr := otel.Tracer(trace2.TraceName)
name, attr := trace2.SpanInfo(method, trace2.PeerFromCtx(ctx))
/*
trace.ContextWithRemoteSpanContext(ctx, spanCtx)
这是一个OpenTelemetry的函数调用,它接受两个参数
分别是原始的上下文ctx和远程追踪的SpanContext对象spanCtx
该函数会创建一个新的上下文对象, 并将远程追踪的SpanContext信息添加到新的上下文中
将远程追踪的SpanContext信息与当前的上下文关联起来
这样,在后续的代码中, 使用这个新的上下文对象ctx进行追踪操作时, 就可以将请求的追踪信息关联到当前的追踪上下文中
这对于分布式系统中的追踪和监控非常重要, 因为它能够跟踪请求在不同服务之间的传播路径,从而形成完整的分布式追踪链路
trace.WithSpanKind(trace.SpanKindServer)
这是一个 trace.WithSpanKind 函数调用, 用于设置创建的新追踪 Span 的种类
在这里, 使用 trace.SpanKindServer 表示该 Span 是服务端追踪。
trace.WithAttributes(attr...)
这是一个 trace.WithAttributes 函数调用, 用于设置创建的新追踪 Span 的属性
attr... 是一个属性列表,包含了多个键值对,用于添加一些自定义的追踪属性信息。
*/
return tr.Start(trace.ContextWithRemoteSpanContext(ctx, spanCtx), name, trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attr...))
}
// wrapServerStream wraps the given grpc.ServerStream with the given context.
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。