代码拉取完成,页面将自动刷新
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/micro-tools/wf/extend/utils.
package grpctracing
import (
"context"
"gitee.com/micro-tools/wf/extend/utils/krpc/internal"
"gitee.com/micro-tools/wf/extend/utils/krpc/internal/grpcctx"
"gitee.com/micro-tools/wf/extend/utils/krpc/internal/grpcutils"
"gitee.com/micro-tools/wf/net/gtrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpcCodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, callOpts ...grpc.CallOption) error {
tracer := otel.GetTracerProvider().Tracer(
tracingInstrumentGrpcClient,
trace.WithInstrumentationVersion(internal.VERSION),
)
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
defer func() {
span.End()
}()
Inject(ctx, metadataCopy)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
span.SetAttributes(gtrace.CommonLabels()...)
span.AddEvent(tracingEventGrpcRequest, trace.WithAttributes(
attribute.Any(tracingEventGrpcRequestBaggage, gtrace.GetBaggageMap(ctx)),
attribute.Any(tracingEventGrpcMetadataOutgoing, grpcctx.Ctx.OutgoingMap(ctx)),
attribute.String(
tracingEventGrpcRequestMessage,
grpcutils.MarshalMessageToJsonStringForTracing(
req, "Request", tracingMaxContentLogSize,
),
),
))
err := invoker(ctx, method, req, reply, cc, callOpts...)
span.AddEvent(tracingEventGrpcResponse, trace.WithAttributes(
attribute.String(
tracingEventGrpcResponseMessage,
grpcutils.MarshalMessageToJsonStringForTracing(
reply, "Response", tracingMaxContentLogSize,
),
),
))
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpcCodes.OK))
}
return err
}
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(
ctx context.Context, desc *grpc.StreamDesc,
cc *grpc.ClientConn, method string, streamer grpc.Streamer,
callOpts ...grpc.CallOption) (grpc.ClientStream, error) {
tracer := otel.GetTracerProvider().Tracer(
tracingInstrumentGrpcClient,
trace.WithInstrumentationVersion(internal.VERSION),
)
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
)
Inject(ctx, metadataCopy)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)
span.SetAttributes(gtrace.CommonLabels()...)
s, err := streamer(ctx, desc, cc, method, callOpts...)
stream := wrapClientStream(s, desc)
go func() {
if err == nil {
err = <-stream.finished
}
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpcCodes.OK))
}
span.End()
}()
return stream, err
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。