代码拉取完成,页面将自动刷新
package module
import (
"expvar"
"fmt"
"sync"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/metricbeat/mb"
"github.com/joeshaw/multierror"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
)
// Expvar metric names.
const (
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
)
var (
debugf = logp.MakeDebug("module")
fetchesLock = sync.Mutex{}
fetches = expvar.NewMap("fetches")
)
// Wrapper contains the Module and the private data associated with
// running the Module and its MetricSets.
//
// Use NewWrapper or NewWrappers to construct new Wrappers.
type Wrapper struct {
mb.Module
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
}
// metricSetWrapper contains the MetricSet and the private data associated with
// running the MetricSet. It contains a pointer to the parent Module.
type metricSetWrapper struct {
mb.MetricSet
module *Wrapper // Parent Module.
stats *expvar.Map // expvar stats for this MetricSet.
}
// NewWrapper create a new Module and its associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them in the Wrapper.
func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers([]*common.Config{moduleConfig}, r)
if err != nil {
return nil, err
}
if len(mws) == 0 {
return nil, fmt.Errorf("module not created")
}
return mws[0], nil
}
// NewWrappers creates new Modules and their associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them all in a Wrapper.
func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
modules, err := mb.NewModules(modulesConfig, r)
if err != nil {
return nil, err
}
// Wrap the Modules and MetricSet's.
var wrappers []*Wrapper
var errs multierror.Errors
for k, v := range modules {
debugf("Initializing Module type '%s': %T=%+v", k.Name(), k, k)
f, err := processors.New(k.Config().Filters)
if err != nil {
errs = append(errs, errors.Wrapf(err, "module %s", k.Name()))
continue
}
mw := &Wrapper{
Module: k,
filters: f,
}
wrappers = append(wrappers, mw)
msws := make([]*metricSetWrapper, 0, len(v))
for _, ms := range v {
debugf("Initializing MetricSet type '%s/%s' for host '%s': %T=%+v",
ms.Module().Name(), ms.Name(), ms.Host(), ms, ms)
expMap, err := getMetricSetExpvarMap(mw.Name(), ms.Name())
if err != nil {
return nil, err
}
msw := &metricSetWrapper{
MetricSet: ms,
module: mw,
stats: expMap,
}
msws = append(msws, msw)
}
mw.metricSets = msws
}
return wrappers, errs.Err()
}
// Wrapper methods
// Start starts the Module's MetricSet workers which are responsible for
// fetching metrics. The workers will continue to periodically fetch until the
// done channel is closed. When the done channel is closed all MetricSet workers
// will stop and the returned output channel will be closed.
//
// The returned channel is buffered with a length one one. It must drained to
// prevent blocking the operation of the MetricSets.
//
// Start should be called only once in the life of a Wrapper.
func (mw *Wrapper) Start(done <-chan struct{}) <-chan common.MapStr {
debugf("Starting %s", mw)
defer debugf("Stopped %s", mw)
out := make(chan common.MapStr, 1)
// Start one worker per MetricSet + host combination.
var wg sync.WaitGroup
wg.Add(len(mw.metricSets))
for _, msw := range mw.metricSets {
go func(msw *metricSetWrapper) {
defer wg.Done()
msw.startFetching(done, out)
}(msw)
}
// Close the output channel when all writers to the channel have stopped.
go func() {
wg.Wait()
close(out)
}()
return out
}
// String returns a string representation of Wrapper.
func (mw *Wrapper) String() string {
return fmt.Sprintf("Wrapper[name=%s, len(metricSetWrappers)=%d]",
mw.Name(), len(mw.metricSets))
}
// Hash returns the hash value of the module wrapper
// This allows to check if two modules are the same / have the same config
func (mw *Wrapper) Hash() uint64 {
// Check if hash was calculated previously
if mw.configHash > 0 {
return mw.configHash
}
var err error
// Config is unpacked into map[string]interface{} to also take metricset configs into account for the hash
var c map[string]interface{}
mw.UnpackConfig(&c)
mw.configHash, err = hashstructure.Hash(c, nil)
if err != nil {
logp.Err("Error creating config hash for module %s: %s", mw.String(), err)
}
return mw.configHash
}
// metricSetWrapper methods
// startFetching performs an immediate fetch for the MetricSet then it
// begins a continuous timer scheduled loop to fetch data. To stop the loop the
// done channel should be closed.
func (msw *metricSetWrapper) startFetching(
done <-chan struct{},
out chan<- common.MapStr,
) {
debugf("Starting %s", msw)
defer debugf("Stopped %s", msw)
// Fetch immediately.
err := msw.fetch(done, out)
if err != nil {
logp.Err("%v", err)
}
// Start timer for future fetches.
t := time.NewTicker(msw.Module().Config().Period)
defer t.Stop()
for {
select {
case <-done:
return
case <-t.C:
err := msw.fetch(done, out)
if err != nil {
logp.Err("%v", err)
}
}
}
}
// fetch invokes the appropriate Fetch method for the MetricSet and publishes
// the result using the publisher client. This method will recover from panics
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(done <-chan struct{}, out chan<- common.MapStr) error {
defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+
"'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host()))
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
event, err := msw.singleEventFetch(fetcher)
if err != nil {
return err
}
if event != nil {
msw.stats.Add(eventsKey, 1)
writeEvent(done, out, event)
}
case mb.EventsFetcher:
events, err := msw.multiEventFetch(fetcher)
if err != nil {
return err
}
for _, event := range events {
msw.stats.Add(eventsKey, 1)
if !writeEvent(done, out, event) {
break
}
}
default:
return fmt.Errorf("MetricSet '%s/%s' does not implement a Fetcher "+
"interface", msw.Module().Name(), msw.Name())
}
return nil
}
func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher) (common.MapStr, error) {
start := time.Now()
event, err := fetcher.Fetch()
elapsed := time.Since(start)
if err == nil {
msw.stats.Add(successesKey, 1)
} else {
msw.stats.Add(failuresKey, 1)
}
if event, err = createEvent(msw, event, err, start, elapsed); err != nil {
return nil, errors.Wrap(err, "createEvent failed")
}
return event, nil
}
func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher) ([]common.MapStr, error) {
start := time.Now()
events, err := fetcher.Fetch()
elapsed := time.Since(start)
var rtnEvents []common.MapStr
if err == nil {
msw.stats.Add(successesKey, 1)
for _, event := range events {
if event, err = createEvent(msw, event, nil, start, elapsed); err != nil {
return nil, errors.Wrap(err, "createEvent failed")
}
if event != nil {
rtnEvents = append(rtnEvents, event)
}
}
} else {
msw.stats.Add(failuresKey, 1)
event, err := createEvent(msw, nil, err, start, elapsed)
if err != nil {
return nil, errors.Wrap(err, "createEvent failed")
}
if event != nil {
rtnEvents = append(rtnEvents, event)
}
}
return rtnEvents, nil
}
// String returns a string representation of metricSetWrapper.
func (msw *metricSetWrapper) String() string {
return fmt.Sprintf("metricSetWrapper[module=%s, name=%s, host=%s]",
msw.module.Name(), msw.Name(), msw.Host())
}
// other utility functions
func writeEvent(done <-chan struct{}, out chan<- common.MapStr, event common.MapStr) bool {
select {
case <-done:
return false
case out <- event:
return true
}
}
func getMetricSetExpvarMap(module, name string) (*expvar.Map, error) {
key := fmt.Sprintf("%s-%s", module, name)
fetchesLock.Lock()
defer fetchesLock.Unlock()
expVar := fetches.Get(key)
switch m := expVar.(type) {
case nil:
expMap := new(expvar.Map).Init()
fetches.Set(key, expMap)
expMap.Add(successesKey, 0)
expMap.Add(failuresKey, 0)
expMap.Add(eventsKey, 0)
return expMap, nil
case *expvar.Map:
return m, nil
default:
return nil, fmt.Errorf("unexpected expvar.Var type (%T) found for key '%s'", m, key)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。