3 Star 2 Fork 0

info-superbahn-ict/superbahn

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
transform.go 9.04 KB
一键复制 编辑 原始数据 按行查看 历史
tangshibo 提交于 2022-01-06 11:04 +08:00 . update
package trace_dumper
import (
"gitee.com/info-superbahn-ict/superbahn/internal/supbstrategies/algorithm/old/trace_dumper/format"
"github.com/jaegertracing/jaeger/model"
"strings"
"time"
)
// NewFromDomain creates FromDomain used to convert supbmod span to db span
func NewFromDomain(allTagsAsObject bool, tagKeysAsFields []string, tagDotReplacement string) FromDomain {
tags := map[string]bool{}
for _, k := range tagKeysAsFields {
tags[k] = true
}
return FromDomain{allTagsAsFields: allTagsAsObject, tagKeysAsFields: tags, tagDotReplacement: tagDotReplacement}
}
// FromDomain is used to convert supbmod span to db span
type FromDomain struct {
allTagsAsFields bool
tagKeysAsFields map[string]bool
tagDotReplacement string
}
// FromDomainEmbedProcess converts supbmod.Span into json.Span format.
// This format includes a ParentSpanID and an embedded Process.
func (fd FromDomain) FromDomainEmbedProcess(span *model.Span) *metricsELK {
return fd.convertSpanEmbedProcess(span)
}
func (fd FromDomain) convertSpanInternal(span *model.Span) metricsELK {
tags, tagsMap := fd.convertKeyValuesString(span.Tags)
return metricsELK{
TraceID: format.TraceID(span.TraceID.String()),
SpanID: format.SpanID(span.SpanID.String()),
Flags: uint32(span.Flags),
OperationName: span.OperationName,
StartTime: model.TimeAsEpochMicroseconds(span.StartTime),
StartTimeMillis: model.TimeAsEpochMicroseconds(span.StartTime) / 1000,
Duration: model.DurationAsMicroseconds(span.Duration),
Tags: tags,
Tag: tagsMap,
Logs: fd.convertLogs(span.Logs),
}
}
func (fd FromDomain) convertSpanEmbedProcess(span *model.Span) *metricsELK {
s := fd.convertSpanInternal(span)
s.Process = fd.convertProcess(span.Process)
s.References = fd.convertReferences(span)
return &s
}
func (fd FromDomain) convertReferences(span *model.Span) []format.Reference {
out := make([]format.Reference, 0, len(span.References))
for _, ref := range span.References {
out = append(out, format.Reference{
RefType: fd.convertRefType(ref.RefType),
TraceID: format.TraceID(ref.TraceID.String()),
SpanID: format.SpanID(ref.SpanID.String()),
})
}
return out
}
func (fd FromDomain) convertRefType(refType model.SpanRefType) format.ReferenceType {
if refType == model.FollowsFrom {
return format.FollowsFrom
}
return format.ChildOf
}
func (fd FromDomain) convertKeyValuesString(keyValues model.KeyValues) ([]format.KeyValue, map[string]interface{}) {
var tagsMap map[string]interface{}
var kvs []format.KeyValue
for _, kv := range keyValues {
if kv.GetVType() != model.BinaryType && (fd.allTagsAsFields || fd.tagKeysAsFields[kv.Key]) {
if tagsMap == nil {
tagsMap = map[string]interface{}{}
}
tagsMap[strings.Replace(kv.Key, ".", fd.tagDotReplacement, -1)] = kv.Value()
} else {
kvs = append(kvs, convertKeyValue(kv))
}
}
if kvs == nil {
kvs = make([]format.KeyValue, 0)
}
return kvs, tagsMap
}
func (fd FromDomain) convertLogs(logs []model.Log) []format.Log {
out := make([]format.Log, len(logs))
for i, log := range logs {
var kvs []format.KeyValue
for _, kv := range log.Fields {
kvs = append(kvs, convertKeyValue(kv))
}
out[i] = format.Log{
Timestamp: model.TimeAsEpochMicroseconds(log.Timestamp),
Fields: kvs,
}
}
return out
}
func (fd FromDomain) convertProcess(process *model.Process) format.Process {
tags, tagsMap := fd.convertKeyValuesString(process.Tags)
return format.Process{
ServiceName: process.ServiceName,
Tags: tags,
Tag: tagsMap,
}
}
func convertKeyValue(kv model.KeyValue) format.KeyValue {
return format.KeyValue{
Key: kv.Key,
Type: format.ValueType(strings.ToLower(kv.VType.String())),
Value: kv.AsString(),
}
}
type spanAndServiceIndexFn func(spanTime time.Time) (string, string)
func getSpanAndServiceIndexFn() spanAndServiceIndexFn {
var(
archive = true
useReadWriteAliases = false
prefix = ""
spanDateLayout="2021-11-25"
serviceDateLayout= "2021-11-25"
)
//if prefix != "" {
// prefix += "-"
//}
spanIndexPrefix := prefix + "jaeger-span-"
serviceIndexPrefix := prefix + "jaeger-service-"
if archive {
return func(date time.Time) (string, string) {
if useReadWriteAliases {
return archiveIndex(spanIndexPrefix, "-write"), ""
}
return archiveIndex(spanIndexPrefix, "archive"), ""
}
}
if useReadWriteAliases {
return func(spanTime time.Time) (string, string) {
return spanIndexPrefix + "write", serviceIndexPrefix + "write"
}
}
return func(date time.Time) (string, string) {
return indexWithDate(spanIndexPrefix, spanDateLayout, date), indexWithDate(serviceIndexPrefix, serviceDateLayout, date)
}
}
func archiveIndex(indexPrefix, archiveSuffix string) string {
return indexPrefix + archiveSuffix
}
func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string {
spanDate := date.UTC().Format(indexDateLayout)
return indexPrefix + spanDate
}
var spanMap = `{
"index_patterns": "*jaeger-span-*",
"settings":{
"index.number_of_shards": 5,
"index.number_of_replicas": 1,
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
},
"mappings":{
"dynamic_templates":[
{
"span_tags_map":{
"mapping":{
"type":"keyword",
"ignore_above":256
},
"path_match":"tag.*"
}
},
{
"process_tags_map":{
"mapping":{
"type":"keyword",
"ignore_above":256
},
"path_match":"process.tag.*"
}
}
],
"properties":{
"traceID":{
"type":"keyword",
"ignore_above":256
},
"parentSpanID":{
"type":"keyword",
"ignore_above":256
},
"spanID":{
"type":"keyword",
"ignore_above":256
},
"operationName":{
"type":"keyword",
"ignore_above":256
},
"startTime":{
"type":"long"
},
"startTimeMillis":{
"type":"date",
"format":"epoch_millis"
},
"duration":{
"type":"long"
},
"flags":{
"type":"integer"
},
"logs":{
"type":"nested",
"dynamic":false,
"properties":{
"timestamp":{
"type":"long"
},
"fields":{
"type":"nested",
"dynamic":false,
"properties":{
"key":{
"type":"keyword",
"ignore_above":256
},
"value":{
"type":"keyword",
"ignore_above":256
},
"tagType":{
"type":"keyword",
"ignore_above":256
}
}
}
}
},
"process":{
"properties":{
"serviceName":{
"type":"keyword",
"ignore_above":256
},
"tag":{
"type":"object"
},
"tags":{
"type":"nested",
"dynamic":false,
"properties":{
"key":{
"type":"keyword",
"ignore_above":256
},
"value":{
"type":"keyword",
"ignore_above":256
},
"tagType":{
"type":"keyword",
"ignore_above":256
}
}
}
}
},
"references":{
"type":"nested",
"dynamic":false,
"properties":{
"refType":{
"type":"keyword",
"ignore_above":256
},
"traceID":{
"type":"keyword",
"ignore_above":256
},
"spanID":{
"type":"keyword",
"ignore_above":256
}
}
},
"tag":{
"type":"object"
},
"tags":{
"type":"nested",
"dynamic":false,
"properties":{
"key":{
"type":"keyword",
"ignore_above":256
},
"value":{
"type":"keyword",
"ignore_above":256
},
"tagType":{
"type":"keyword",
"ignore_above":256
}
}
}
}
}
}`
var serviceMap=`{
"index_patterns": "*jaeger-service-*",
"settings":{
"index.number_of_shards": 5,
"index.number_of_replicas": 1,
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
},
"mappings":{
"dynamic_templates":[
{
"span_tags_map":{
"mapping":{
"type":"keyword",
"ignore_above":256
},
"path_match":"tag.*"
}
},
{
"process_tags_map":{
"mapping":{
"type":"keyword",
"ignore_above":256
},
"path_match":"process.tag.*"
}
}
],
"properties":{
"serviceName":{
"type":"keyword",
"ignore_above":256
},
"operationName":{
"type":"keyword",
"ignore_above":256
}
}
}
}`
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/info-superbahn-ict/superbahn.git
git@gitee.com:info-superbahn-ict/superbahn.git
info-superbahn-ict
superbahn
superbahn
5fda629dab96

搜索帮助