代码拉取完成,页面将自动刷新
package trace_dumper
import (
"bytes"
"context"
"encoding/json"
"fmt"
"gitee.com/info-superbahn-ict/superbahn/internal/supbstrategies/algorithm/old/trace_dumper/format"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbagent/resources/manager_containers"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbmanager/resources/manager_clusters/clusters"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbnervous"
"gitee.com/info-superbahn-ict/superbahn/sync/define"
"github.com/gogo/protobuf/jsonpb"
"github.com/jaegertracing/jaeger/model"
"github.com/olivere/elastic"
"github.com/sirupsen/logrus"
"hash/fnv"
"time"
)
type cache struct {
Devices map[string]string
Services map[string]string
}
var (
recorder *elastic.BulkProcessor
strategyCache = &cache{}
domain = NewFromDomain(false, []string{}, "@")
)
// Span is ES database representation of the domain span.
type metricsELK struct {
TraceID format.TraceID `json:"traceID"`
SpanID format.SpanID `json:"spanID"`
ParentSpanID format.SpanID `json:"parentSpanID,omitempty"` // deprecated
Flags uint32 `json:"flags,omitempty"`
OperationName string `json:"operationName"`
References []format.Reference `json:"references"`
StartTime uint64 `json:"startTime"` // microseconds since Unix epoch
// ElasticSearch does not support a UNIX Epoch timestamp in microseconds,
// so Jaeger maps StartTime to a 'long' type. This extra StartTimeMillis field
// works around this issue, enabling timerange queries.
StartTimeMillis uint64 `json:"startTimeMillis"`
Duration uint64 `json:"duration"` // microseconds
Tags []format.KeyValue `json:"tags"`
// Alternative representation of tags for better kibana support
Tag map[string]interface{} `json:"tag,omitempty"`
Logs []format.Log `json:"logs"`
Process format.Process `json:"process,omitempty"`
}
type MessageSupb struct {
Key string `json:"key"`
Data string `json:"data"`
}
type Service struct {
ServiceName string `json:"serviceName"`
OperationName string `json:"operationName"`
}
func Init(ctx context.Context, nu supbnervous.Controller, clusterMeta *clusters.Cluster, outPut chan<- *clusters.ClusterRes) {
var err error
logrus.Infof("start trace dumper")
if recorder == nil {
url, ok := clusterMeta.ClusterMasterContainerEnvs["ELK_URL"]
if !ok {
logrus.Fatalf("cluster meta don't find ELK_URL")
}
logrus.Infof("recv elk url %v",url)
options := []elastic.ClientOptionFunc{elastic.SetURL(url),
elastic.SetSniff(false), elastic.SetHealthcheck(true),
}
rd, err := elastic.NewClient(options...)
if err != nil {
logrus.Fatalf("new recoder %v", err)
}
_, err = rd.IndexPutTemplate("jaeger-span").BodyString(spanMap).Do(ctx)
if err != nil {
logrus.Fatalf("new recoder %v", err)
}
_, err = rd.IndexPutTemplate("jaeger-service").BodyString(serviceMap).Do(ctx)
if err != nil {
logrus.Fatalf("new recoder %v", err)
}
recorder, err = rd.BulkProcessor().BulkSize(5 * 1000 * 1000).Workers(1).BulkActions(1000).FlushInterval(time.Millisecond * 200).Do(ctx)
if err != nil {
logrus.Fatalf("new recoder %v", err)
}
// url, ok := clusterMeta.ClusterMasterContainerEnvs["ELK_URL"]
// if !ok {
// logrus.Fatalf("cluster meta don't find ELK_URL")
// }
// if recorder, err = elk.NewMetricsRecorder(ctx, url); err != nil {
// logrus.Fatalf("new recoder %v", err)
// }
}
//
err = nu.RPCRegister(define.RPCFunctionNameOfManagerStrategyCollectorObjectTrace, func(args ...interface{}) (interface{}, error) {
if len(args) < 2 {
return "", fmt.Errorf("need two args, guid and data")
}
data := &MessageSupb{}
if err = json.Unmarshal([]byte(args[1].(string)), data); err != nil {
return nil, fmt.Errorf("unmarshall message supb %v", err)
}
spanData := &model.Span{}
if err = jsonpb.Unmarshal(bytes.NewReader([]byte(data.Data)), spanData); err != nil {
return nil, fmt.Errorf("unmarshall span err %v", err)
}
traceElk := domain.FromDomainEmbedProcess(spanData)
service := Service{
ServiceName: traceElk.Process.ServiceName,
OperationName: traceElk.OperationName,
}
h := fnv.New64a()
_, _ = h.Write([]byte(service.ServiceName))
_, _ = h.Write([]byte(service.OperationName))
id := fmt.Sprintf("%x", h.Sum64())
date := time.Unix(0,int64(traceElk.StartTimeMillis*1e6)).Format("2006-01-02")
bulkIndexReq1 := elastic.NewBulkIndexRequest()
bulkIndexReq1.Index("jaeger-service-"+date).Id(id).Doc(service)
bulkIndexReq2 := elastic.NewBulkIndexRequest()
bulkIndexReq2.Index("jaeger-span-"+date).Doc(&traceElk)
recorder.Add(bulkIndexReq1)
recorder.Add(bulkIndexReq2)
btsk,_ :=json.Marshal(traceElk)
logrus.Infof("recv guid: %v trace key: %v Data: %s", args[0].(string), data.Key,btsk)
return "RECEIVE TRACE", nil
})
if err != nil {
logrus.Errorf("register log receive failed")
}
logrus.Infof("init finished")
}
/*
the policy function maybe reboot automatically, because of the control meta change
*/
func TraceDumper(ctx context.Context, nu supbnervous.Controller, clusterMeta *clusters.Cluster, outPut chan<- *clusters.ClusterRes) {
log := logrus.WithContext(ctx).WithFields(logrus.Fields{
define.LogsPrintCommonLabelOfFunction: "policy.algorithm.executor",
})
log.Infof("start trace dumper")
for guid := range clusterMeta.ClusterResources {
_, err := nu.RPCCallCustom(guid, 10, 500, define.RPCFunctionNameOfObjectRunningOnAgentForOpenTracePush, clusterMeta.ClusterMasterGuid)
if err != nil {
log.Errorf("get %v metrics %v", guid, err)
}
log.Infof("open log push %v", guid)
}
// if recorder is nil, new it
ttk := time.NewTicker(time.Duration(5) * time.Second)
for {
select {
case <-ctx.Done():
for guid := range clusterMeta.ClusterResources {
_, err := nu.RPCCallCustom(guid, 10, 500, define.RPCFunctionNameOfObjectRunningOnAgentForOpenTracePush, manager_containers.TracePushOff)
if err != nil {
log.Errorf("get %v metrics %v", guid, err)
}
log.Infof("close metrics push %v", guid)
}
return
case <-ttk.C:
//log.Infof("strategy beat")
}
}
}
func Close(ctx context.Context, nu supbnervous.Controller, clusterMeta *clusters.Cluster, outPut chan<- *clusters.ClusterRes) {
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。