代码拉取完成,页面将自动刷新
package event
import (
"fmt"
"io/ioutil"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/mb"
"github.com/ericchiang/k8s"
"github.com/ghodss/yaml"
)
// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
if err := mb.Registry.AddMetricSet("kubernetes", "event", New); err != nil {
panic(err)
}
}
// MetricSet type defines all fields of the MetricSet
// The event MetricSet listens to events from Kubernetes API server and streams them to the output.
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
watcher *Watcher
}
// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Experimental("The kubernetes event metricset is experimental")
config := defaultKuberentesEventsConfig()
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the kubernetes event configuration: %s", err)
}
var client *k8s.Client
if config.InCluster == true {
client, err = k8s.NewInClusterClient()
if err != nil {
return nil, fmt.Errorf("Unable to get in cluster configuration")
}
} else {
data, err := ioutil.ReadFile(config.KubeConfig)
if err != nil {
return nil, fmt.Errorf("read kubeconfig: %v", err)
}
// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err = yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("unmarshal kubeconfig: %v", err)
}
client, err = k8s.NewClient(&config)
if err != nil {
return nil, err
}
}
watcher := NewWatcher(client, config.SyncPeriod, config.Namespace)
return &MetricSet{
BaseMetricSet: base,
watcher: watcher,
}, nil
}
// Run method provides the Kubernetes event watcher with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporter) {
// Start event watcher
m.watcher.Run()
for {
select {
case <-reporter.Done():
m.watcher.Stop()
return
case msg := <-m.watcher.eventQueue:
// Ignore events that are deleted
if msg.Metadata.DeletionTimestamp == nil {
reporter.Event(generateMapStrFromEvent(msg))
}
}
}
}
func generateMapStrFromEvent(eve *Event) common.MapStr {
eventMeta := common.MapStr{
"timestamp": common.MapStr{
"created": eve.Metadata.CreationTimestamp,
},
"name": eve.Metadata.Name,
"namespace": eve.Metadata.Namespace,
"self_link": eve.Metadata.SelfLink,
"generate_name": eve.Metadata.GenerateName,
"uid": eve.Metadata.UID,
"resource_version": eve.Metadata.ResourceVersion,
}
if len(eve.Metadata.Labels) != 0 {
labels := make(common.MapStr, len(eve.Metadata.Labels))
for k, v := range eve.Metadata.Labels {
labels[k] = v
}
eventMeta["labels"] = labels
}
if len(eve.Metadata.Annotations) != 0 {
annotations := make(common.MapStr, len(eve.Metadata.Annotations))
for k, v := range eve.Metadata.Annotations {
annotations[k] = v
}
eventMeta["annotations"] = annotations
}
output := common.MapStr{
"message": eve.Message,
"reason": eve.Reason,
"type": eve.Type,
"count": eve.Count,
"involved_object": common.MapStr{
"api_version": eve.InvolvedObject.APIVersion,
"resource_version": eve.InvolvedObject.ResourceVersion,
"name": eve.InvolvedObject.Name,
"kind": eve.InvolvedObject.Kind,
"uid": eve.InvolvedObject.UID,
},
"metadata": eventMeta,
}
tsMap := make(common.MapStr)
if eve.FirstTimestamp != nil {
tsMap["first_occurrence"] = eve.FirstTimestamp.UTC()
}
if eve.LastTimestamp != nil {
tsMap["last_occurrence"] = eve.LastTimestamp.UTC()
}
if len(tsMap) != 0 {
output["timestamp"] = tsMap
}
return output
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。