1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
publish.go 9.10 KB
一键复制 编辑 原始数据 按行查看 历史
package publisher
import (
"errors"
"flag"
"os"
"sync/atomic"
"time"
"github.com/nranchev/go-libGeoIP"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
// load supported output plugins
_ "github.com/elastic/beats/libbeat/outputs/console"
_ "github.com/elastic/beats/libbeat/outputs/elasticsearch"
_ "github.com/elastic/beats/libbeat/outputs/fileout"
_ "github.com/elastic/beats/libbeat/outputs/kafka"
_ "github.com/elastic/beats/libbeat/outputs/logstash"
_ "github.com/elastic/beats/libbeat/outputs/redis"
// load support output codec
_ "github.com/elastic/beats/libbeat/outputs/codecs/format"
_ "github.com/elastic/beats/libbeat/outputs/codecs/json"
)
// command line flags
var publishDisabled *bool
var debug = logp.MakeDebug("publish")
type Context struct {
publishOptions
Signal op.Signaler
}
type pipeline interface {
publish(m message) bool
}
type publishOptions struct {
Guaranteed bool
Sync bool
}
type TransactionalEventPublisher interface {
PublishTransaction(transaction op.Signaler, events []common.MapStr)
}
type Publisher interface {
Connect() Client
}
type BeatPublisher struct {
shipperName string // Shipper name as set in the configuration file
hostname string // Host name as returned by the operation system
name string // The shipperName if configured, the hostname otherwise
version string
IPAddrs []string
disabled bool
Index string
Output []*outputWorker
TopologyOutput outputs.TopologyOutputer
geoLite *libgeo.GeoIP
Processors *processors.Processors
globalEventMetadata common.EventMetadata // Fields and tags to add to each event.
RefreshTopologyTimer <-chan time.Time
// On shutdown the publisher is finished first and the outputers next,
// so no publisher will attempt to send messages on closed channels.
// Note: beat data producers must be shutdown before the publisher plugin
wsPublisher workerSignal
wsOutput workerSignal
pipelines struct {
sync pipeline
async pipeline
}
// keep count of clients connected to publisher. A publisher is allowed to
// Stop only if all clients have been disconnected
numClients uint32
}
type ShipperConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to each event.
Name string `config:"name"`
RefreshTopologyFreq time.Duration `config:"refresh_topology_freq"`
TopologyExpire int `config:"topology_expire"`
Geoip common.Geoip `config:"geoip"`
// internal publisher queue sizes
QueueSize *int `config:"queue_size"`
BulkQueueSize *int `config:"bulk_queue_size"`
MaxProcs *int `config:"max_procs"`
}
type Topology struct {
Name string `json:"name"`
IP string `json:"ip"`
}
const (
DefaultQueueSize = 1000
DefaultBulkQueueSize = 0
)
func init() {
publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}
func (publisher *BeatPublisher) IsPublisherIP(ip string) bool {
for _, myip := range publisher.IPAddrs {
if myip == ip {
return true
}
}
return false
}
func (publisher *BeatPublisher) GetServerName(ip string) string {
// in case the IP is localhost, return current shipper name
islocal, err := common.IsLoopback(ip)
if err != nil {
logp.Err("Parsing IP %s fails with: %s", ip, err)
return ""
}
if islocal {
return publisher.name
}
// find the shipper with the desired IP
if publisher.TopologyOutput != nil {
logp.Warn("Topology settings are deprecated.")
return publisher.TopologyOutput.GetNameByIP(ip)
}
return ""
}
func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP {
return publisher.geoLite
}
func (publisher *BeatPublisher) Connect() Client {
atomic.AddUint32(&publisher.numClients, 1)
return newClient(publisher)
}
func (publisher *BeatPublisher) UpdateTopologyPeriodically() {
for range publisher.RefreshTopologyTimer {
_ = publisher.PublishTopology() // ignore errors
}
}
func (publisher *BeatPublisher) PublishTopology(params ...string) error {
localAddrs := params
if len(params) == 0 {
addrs, err := common.LocalIPAddrsAsStrings(false)
if err != nil {
logp.Err("Getting local IP addresses fails with: %s", err)
return err
}
localAddrs = addrs
}
if publisher.TopologyOutput != nil {
debug("Add topology entry for %s: %s", publisher.name, localAddrs)
err := publisher.TopologyOutput.PublishIPs(publisher.name, localAddrs)
if err != nil {
return err
}
}
return nil
}
// Create new PublisherType
func New(
beatName string,
beatVersion string,
configs map[string]*common.Config,
shipper ShipperConfig,
processors *processors.Processors,
) (*BeatPublisher, error) {
publisher := BeatPublisher{}
err := publisher.init(beatName, beatVersion, configs, shipper, processors)
if err != nil {
return nil, err
}
return &publisher, nil
}
func (publisher *BeatPublisher) init(
beatName string,
beatVersion string,
configs map[string]*common.Config,
shipper ShipperConfig,
processors *processors.Processors,
) error {
var err error
publisher.Processors = processors
publisher.disabled = *publishDisabled
if publisher.disabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}
shipper.InitShipperConfig()
publisher.geoLite = common.LoadGeoIPData(shipper.Geoip)
publisher.wsPublisher.Init()
publisher.wsOutput.Init()
if len(configs) > 1 {
logp.Warn("Support for loading more than one output is deprecated and will not be supported in version 6.0.")
}
if !publisher.disabled {
plugins, err := outputs.InitOutputs(beatName, configs, shipper.TopologyExpire)
if err != nil {
return err
}
var outputers []*outputWorker
var topoOutput outputs.TopologyOutputer
for _, plugin := range plugins {
output := plugin.Output
config := plugin.Config
debug("Create output worker")
outputers = append(outputers,
newOutputWorker(
config,
output,
&publisher.wsOutput,
*shipper.QueueSize,
*shipper.BulkQueueSize))
if ok, _ := config.Bool("save_topology", 0); !ok {
continue
}
topo, ok := output.(outputs.TopologyOutputer)
if !ok {
logp.Err("Output type %s does not support topology logging",
plugin.Name)
return errors.New("Topology output not supported")
}
if topoOutput != nil {
logp.Err("Multiple outputs defined to store topology. " +
"Please add save_topology = true option only for one output.")
return errors.New("Multiple outputs defined to store topology")
}
topoOutput = topo
logp.Info("Using %s to store the topology", plugin.Name)
}
publisher.Output = outputers
publisher.TopologyOutput = topoOutput
}
if !publisher.disabled {
if len(publisher.Output) == 0 {
logp.Info("No outputs are defined. Please define one under the output section.")
return errors.New("No outputs are defined. Please define one under the output section.")
}
if publisher.TopologyOutput == nil {
logp.Debug("publish", "No output is defined to store the topology. The server fields might not be filled.")
}
}
publisher.shipperName = shipper.Name
publisher.hostname, err = os.Hostname()
publisher.version = beatVersion
if err != nil {
return err
}
if len(publisher.shipperName) > 0 {
publisher.name = publisher.shipperName
} else {
publisher.name = publisher.hostname
}
logp.Info("Publisher name: %s", publisher.name)
publisher.globalEventMetadata = shipper.EventMetadata
//Store the publisher's IP addresses
publisher.IPAddrs, err = common.LocalIPAddrsAsStrings(false)
if err != nil {
logp.Err("Failed to get local IP addresses: %s", err)
return err
}
if !publisher.disabled && publisher.TopologyOutput != nil {
RefreshTopologyFreq := 10 * time.Second
if shipper.RefreshTopologyFreq != 0 {
RefreshTopologyFreq = shipper.RefreshTopologyFreq
}
publisher.RefreshTopologyTimer = time.Tick(RefreshTopologyFreq)
logp.Info("Topology map refreshed every %s", RefreshTopologyFreq)
// register shipper and its public IP addresses
err = publisher.PublishTopology()
if err != nil {
logp.Err("Failed to publish topology: %s", err)
return err
}
// update topology periodically
go publisher.UpdateTopologyPeriodically()
}
publisher.pipelines.async = newAsyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize, &publisher.wsPublisher)
publisher.pipelines.sync = newSyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize)
return nil
}
func (publisher *BeatPublisher) Stop() {
if atomic.LoadUint32(&publisher.numClients) > 0 {
panic("All clients must disconnect before shutting down publisher pipeline")
}
publisher.wsPublisher.stop()
publisher.wsOutput.stop()
}
func (config *ShipperConfig) InitShipperConfig() {
// TODO: replace by ucfg
if config.QueueSize == nil || *config.QueueSize <= 0 {
queueSize := DefaultQueueSize
config.QueueSize = &queueSize
}
if config.BulkQueueSize == nil || *config.BulkQueueSize < 0 {
bulkQueueSize := DefaultBulkQueueSize
config.BulkQueueSize = &bulkQueueSize
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v5.6.1

搜索帮助