1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
matchers.go 3.40 KB
一键复制 编辑 原始数据 按行查看 历史
package add_kubernetes_metadata
import (
"fmt"
"sync"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/format"
)
const (
FieldMatcherName = "fields"
FieldFormatMatcherName = "field_format"
)
// Matcher takes a new event and returns the index
type Matcher interface {
// MetadataIndex returns the index string to use in annotation lookups for the given
// event. A previous indexer should have generated that index for this to work
// This function can return "" if the event doesn't match
MetadataIndex(event common.MapStr) string
}
type Matchers struct {
sync.RWMutex
matchers []Matcher
}
type MatcherConstructor func(config common.Config) (Matcher, error)
func NewMatchers(configs PluginConfig) *Matchers {
matchers := []Matcher{}
for _, pluginConfigs := range configs {
for name, pluginConfig := range pluginConfigs {
matchFunc := Indexing.GetMatcher(name)
if matchFunc == nil {
logp.Warn("Unable to find matcher plugin %s", name)
continue
}
matcher, err := matchFunc(pluginConfig)
if err != nil {
logp.Warn("Unable to initialize matcher plugin %s due to error %v", name, err)
}
matchers = append(matchers, matcher)
}
}
return &Matchers{
matchers: matchers,
}
}
// MetadataIndex returns the index string for the first matcher from the Registry returning one
func (m *Matchers) MetadataIndex(event common.MapStr) string {
m.RLock()
defer m.RUnlock()
for _, matcher := range m.matchers {
index := matcher.MetadataIndex(event)
if index != "" {
return index
}
}
// No index returned
return ""
}
func (m *Matchers) Empty() bool {
if len(m.matchers) == 0 {
return true
}
return false
}
type FieldMatcher struct {
MatchFields []string
}
func NewFieldMatcher(cfg common.Config) (Matcher, error) {
config := struct {
LookupFields []string `config:"lookup_fields"`
}{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err)
}
if len(config.LookupFields) == 0 {
return nil, fmt.Errorf("lookup_fields can not be empty")
}
return &FieldMatcher{MatchFields: config.LookupFields}, nil
}
func (f *FieldMatcher) MetadataIndex(event common.MapStr) string {
for _, field := range f.MatchFields {
keyIface, err := event.GetValue(field)
if err == nil {
key, ok := keyIface.(string)
if ok {
return key
}
}
}
return ""
}
type FieldFormatMatcher struct {
Codec codec.Codec
}
func NewFieldFormatMatcher(cfg common.Config) (Matcher, error) {
config := struct {
Format string `config:"format"`
}{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err)
}
if config.Format == "" {
return nil, fmt.Errorf("`format` of `field_format` matcher can't be empty")
}
return &FieldFormatMatcher{
Codec: format.New(fmtstr.MustCompileEvent(config.Format)),
}, nil
}
func (f *FieldFormatMatcher) MetadataIndex(event common.MapStr) string {
bytes, err := f.Codec.Encode("", &beat.Event{
Fields: event,
})
if err != nil {
logp.Debug("kubernetes", "Unable to apply field format pattern on event")
}
if len(bytes) == 0 {
return ""
}
return string(bytes)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v6.2.2

搜索帮助