1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
indexers.go 5.83 KB
一键复制 编辑 原始数据 按行查看 历史
package add_kubernetes_metadata
import (
"fmt"
"strings"
"sync"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
)
// Indexer take known pods and generate all the metadata we need to enrich
// events in a efficient way. By preindexing the metadata in the way it will be
// checked when matching events
type Indexer interface {
// GetMetadata generates event metadata for the given pod, then returns the
// list of indexes to create, with the metadata to put on them
GetMetadata(pod *Pod) []MetadataIndex
// GetIndexes return the list of indexes the given pod belongs to. This function
// must return the same indexes than GetMetadata
GetIndexes(pod *Pod) []string
}
// MetadataIndex holds a pair of index -> metadata info
type MetadataIndex struct {
Index string
Data common.MapStr
}
type Indexers struct {
sync.RWMutex
indexers []Indexer
}
//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *Pod) common.MapStr
}
type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error)
func NewIndexers(configs PluginConfig, metaGen *GenDefaultMeta) *Indexers {
indexers := []Indexer{}
for _, pluginConfigs := range configs {
for name, pluginConfig := range pluginConfigs {
indexFunc := Indexing.GetIndexer(name)
if indexFunc == nil {
logp.Warn("Unable to find indexing plugin %s", name)
continue
}
indexer, err := indexFunc(pluginConfig, metaGen)
if err != nil {
logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err)
}
indexers = append(indexers, indexer)
}
}
return &Indexers{
indexers: indexers,
}
}
// GetMetadata returns the composed metadata list from all registered indexers
func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex {
var metadata []MetadataIndex
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, m := range indexer.GetMetadata(pod) {
metadata = append(metadata, m)
}
}
return metadata
}
// GetIndexes returns the composed index list from all registered indexers
func (i *Indexers) GetIndexes(pod *Pod) []string {
var indexes []string
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, i := range indexer.GetIndexes(pod) {
indexes = append(indexes, i)
}
}
return indexes
}
func (i *Indexers) Empty() bool {
if len(i.indexers) == 0 {
return true
}
return false
}
type GenDefaultMeta struct {
annotations []string
labels []string
labelsExclude []string
}
func NewGenDefaultMeta(annotations, labels, labelsExclude []string) *GenDefaultMeta {
return &GenDefaultMeta{
annotations: annotations,
labels: labels,
labelsExclude: labelsExclude,
}
}
// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr {
labelMap := common.MapStr{}
annotationsMap := common.MapStr{}
if len(g.labels) == 0 {
for k, v := range pod.Metadata.Labels {
labelMap[k] = v
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.labels)
}
// Exclude any labels that are present in the exclude_labels config
for _, label := range g.labelsExclude {
delete(labelMap, label)
}
annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations)
meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.Name,
},
"namespace": pod.Metadata.Namespace,
}
if len(labelMap) != 0 {
meta["labels"] = labelMap
}
if len(annotationsMap) != 0 {
meta["annotations"] = annotationsMap
}
return meta
}
func generateMapSubset(input map[string]string, keys []string) common.MapStr {
output := common.MapStr{}
if input == nil {
return output
}
for _, key := range keys {
value, ok := input[key]
if ok {
output[key] = value
}
}
return output
}
// PodNameIndexer implements default indexer based on pod name
type PodNameIndexer struct {
genMeta GenMeta
}
func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}
func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex {
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name),
Data: data,
},
}
}
func (p *PodNameIndexer) GetIndexes(pod *Pod) []string {
return []string{fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name)}
}
// ContainerIndexer indexes pods based on all their containers IDs
type ContainerIndexer struct {
genMeta GenMeta
}
func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}
func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := c.genMeta.GenerateMetaData(pod)
var metadata []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}
containerMeta := commonMeta.Clone()
containerMeta["container"] = common.MapStr{
"name": status.Name,
}
metadata = append(metadata, MetadataIndex{
Index: cID,
Data: containerMeta,
})
}
return metadata
}
func (c *ContainerIndexer) GetIndexes(pod *Pod) []string {
var containers []string
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}
containers = append(containers, cID)
}
return containers
}
func containerID(status PodContainerStatus) string {
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.1.2

搜索帮助