代码拉取完成,页面将自动刷新
package publisher
import (
	"errors"
	"flag"
	"os"
	"sync/atomic"
	"time"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/common/op"
	"github.com/elastic/beats/libbeat/logp"
	"github.com/elastic/beats/libbeat/outputs"
	"github.com/elastic/beats/libbeat/processors"
	"github.com/nranchev/go-libGeoIP"
	// load supported output plugins
	_ "github.com/elastic/beats/libbeat/outputs/console"
	_ "github.com/elastic/beats/libbeat/outputs/elasticsearch"
	_ "github.com/elastic/beats/libbeat/outputs/fileout"
	_ "github.com/elastic/beats/libbeat/outputs/kafka"
	_ "github.com/elastic/beats/libbeat/outputs/logstash"
	_ "github.com/elastic/beats/libbeat/outputs/redis"
	// load support output codec
	_ "github.com/elastic/beats/libbeat/outputs/codecs/format"
	_ "github.com/elastic/beats/libbeat/outputs/codecs/json"
)
// command line flags
var publishDisabled *bool
var debug = logp.MakeDebug("publish")
type Context struct {
	publishOptions
	Signal op.Signaler
}
type pipeline interface {
	publish(m message) bool
}
type publishOptions struct {
	Guaranteed bool
	Sync       bool
}
type TransactionalEventPublisher interface {
	PublishTransaction(transaction op.Signaler, events []common.MapStr)
}
type Publisher interface {
	Connect() Client
}
type BeatPublisher struct {
	shipperName    string // Shipper name as set in the configuration file
	hostname       string // Host name as returned by the operation system
	name           string // The shipperName if configured, the hostname otherwise
	version        string
	IPAddrs        []string
	disabled       bool
	Index          string
	Output         []*outputWorker
	TopologyOutput outputs.TopologyOutputer
	geoLite        *libgeo.GeoIP
	Processors     *processors.Processors
	globalEventMetadata common.EventMetadata // Fields and tags to add to each event.
	RefreshTopologyTimer <-chan time.Time
	// On shutdown the publisher is finished first and the outputers next,
	// so no publisher will attempt to send messages on closed channels.
	// Note: beat data producers must be shutdown before the publisher plugin
	wsPublisher workerSignal
	wsOutput    workerSignal
	pipelines struct {
		sync  pipeline
		async pipeline
	}
	// keep count of clients connected to publisher. A publisher is allowed to
	// Stop only if all clients have been disconnected
	numClients uint32
}
type ShipperConfig struct {
	common.EventMetadata `config:",inline"` // Fields and tags to add to each event.
	Name                 string             `config:"name"`
	RefreshTopologyFreq  time.Duration      `config:"refresh_topology_freq"`
	TopologyExpire       int                `config:"topology_expire"`
	Geoip                common.Geoip       `config:"geoip"`
	// internal publisher queue sizes
	QueueSize     *int `config:"queue_size"`
	BulkQueueSize *int `config:"bulk_queue_size"`
	MaxProcs      *int `config:"max_procs"`
}
type Topology struct {
	Name string `json:"name"`
	IP   string `json:"ip"`
}
const (
	DefaultQueueSize     = 1000
	DefaultBulkQueueSize = 0
)
func init() {
	publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}
func (publisher *BeatPublisher) IsPublisherIP(ip string) bool {
	for _, myip := range publisher.IPAddrs {
		if myip == ip {
			return true
		}
	}
	return false
}
func (publisher *BeatPublisher) GetServerName(ip string) string {
	// in case the IP is localhost, return current shipper name
	islocal, err := common.IsLoopback(ip)
	if err != nil {
		logp.Err("Parsing IP %s fails with: %s", ip, err)
		return ""
	}
	if islocal {
		return publisher.name
	}
	// find the shipper with the desired IP
	if publisher.TopologyOutput != nil {
		logp.Warn("Topology settings are deprecated.")
		return publisher.TopologyOutput.GetNameByIP(ip)
	}
	return ""
}
func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP {
	return publisher.geoLite
}
func (publisher *BeatPublisher) Connect() Client {
	atomic.AddUint32(&publisher.numClients, 1)
	return newClient(publisher)
}
func (publisher *BeatPublisher) UpdateTopologyPeriodically() {
	for range publisher.RefreshTopologyTimer {
		_ = publisher.PublishTopology() // ignore errors
	}
}
func (publisher *BeatPublisher) PublishTopology(params ...string) error {
	localAddrs := params
	if len(params) == 0 {
		addrs, err := common.LocalIPAddrsAsStrings(false)
		if err != nil {
			logp.Err("Getting local IP addresses fails with: %s", err)
			return err
		}
		localAddrs = addrs
	}
	if publisher.TopologyOutput != nil {
		debug("Add topology entry for %s: %s", publisher.name, localAddrs)
		err := publisher.TopologyOutput.PublishIPs(publisher.name, localAddrs)
		if err != nil {
			return err
		}
	}
	return nil
}
// Create new PublisherType
func New(
	beatName string,
	beatVersion string,
	configs map[string]*common.Config,
	shipper ShipperConfig,
	processors *processors.Processors,
) (*BeatPublisher, error) {
	publisher := BeatPublisher{}
	err := publisher.init(beatName, beatVersion, configs, shipper, processors)
	if err != nil {
		return nil, err
	}
	return &publisher, nil
}
func (publisher *BeatPublisher) init(
	beatName string,
	beatVersion string,
	configs map[string]*common.Config,
	shipper ShipperConfig,
	processors *processors.Processors,
) error {
	var err error
	publisher.Processors = processors
	publisher.disabled = *publishDisabled
	if publisher.disabled {
		logp.Info("Dry run mode. All output types except the file based one are disabled.")
	}
	shipper.InitShipperConfig()
	publisher.geoLite = common.LoadGeoIPData(shipper.Geoip)
	publisher.wsPublisher.Init()
	publisher.wsOutput.Init()
	if !publisher.disabled {
		plugins, err := outputs.InitOutputs(beatName, configs, shipper.TopologyExpire)
		if err != nil {
			return err
		}
		var outputers []*outputWorker
		var topoOutput outputs.TopologyOutputer
		for _, plugin := range plugins {
			output := plugin.Output
			config := plugin.Config
			debug("Create output worker")
			outputers = append(outputers,
				newOutputWorker(
					config,
					output,
					&publisher.wsOutput,
					*shipper.QueueSize,
					*shipper.BulkQueueSize))
			if ok, _ := config.Bool("save_topology", 0); !ok {
				continue
			}
			topo, ok := output.(outputs.TopologyOutputer)
			if !ok {
				logp.Err("Output type %s does not support topology logging",
					plugin.Name)
				return errors.New("Topology output not supported")
			}
			if topoOutput != nil {
				logp.Err("Multiple outputs defined to store topology. " +
					"Please add save_topology = true option only for one output.")
				return errors.New("Multiple outputs defined to store topology")
			}
			topoOutput = topo
			logp.Info("Using %s to store the topology", plugin.Name)
		}
		publisher.Output = outputers
		publisher.TopologyOutput = topoOutput
	}
	if !publisher.disabled {
		if len(publisher.Output) == 0 {
			logp.Info("No outputs are defined. Please define one under the output section.")
			return errors.New("No outputs are defined. Please define one under the output section.")
		}
		if publisher.TopologyOutput == nil {
			logp.Debug("publish", "No output is defined to store the topology. The server fields might not be filled.")
		}
	}
	publisher.shipperName = shipper.Name
	publisher.hostname, err = os.Hostname()
	publisher.version = beatVersion
	if err != nil {
		return err
	}
	if len(publisher.shipperName) > 0 {
		publisher.name = publisher.shipperName
	} else {
		publisher.name = publisher.hostname
	}
	logp.Info("Publisher name: %s", publisher.name)
	publisher.globalEventMetadata = shipper.EventMetadata
	//Store the publisher's IP addresses
	publisher.IPAddrs, err = common.LocalIPAddrsAsStrings(false)
	if err != nil {
		logp.Err("Failed to get local IP addresses: %s", err)
		return err
	}
	if !publisher.disabled && publisher.TopologyOutput != nil {
		RefreshTopologyFreq := 10 * time.Second
		if shipper.RefreshTopologyFreq != 0 {
			RefreshTopologyFreq = shipper.RefreshTopologyFreq
		}
		publisher.RefreshTopologyTimer = time.Tick(RefreshTopologyFreq)
		logp.Info("Topology map refreshed every %s", RefreshTopologyFreq)
		// register shipper and its public IP addresses
		err = publisher.PublishTopology()
		if err != nil {
			logp.Err("Failed to publish topology: %s", err)
			return err
		}
		// update topology periodically
		go publisher.UpdateTopologyPeriodically()
	}
	publisher.pipelines.async = newAsyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize, &publisher.wsPublisher)
	publisher.pipelines.sync = newSyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize)
	return nil
}
func (publisher *BeatPublisher) Stop() {
	if atomic.LoadUint32(&publisher.numClients) > 0 {
		panic("All clients must disconnect before shutting down publisher pipeline")
	}
	publisher.wsPublisher.stop()
	publisher.wsOutput.stop()
}
func (config *ShipperConfig) InitShipperConfig() {
	// TODO: replace by ucfg
	if config.QueueSize == nil || *config.QueueSize <= 0 {
		queueSize := DefaultQueueSize
		config.QueueSize = &queueSize
	}
	if config.BulkQueueSize == nil || *config.BulkQueueSize < 0 {
		bulkQueueSize := DefaultBulkQueueSize
		config.BulkQueueSize = &bulkQueueSize
	}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
 马建仓 AI 助手
马建仓 AI 助手