1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
output.go 4.55 KB
一键复制 编辑 原始数据 按行查看 历史
urso 提交于 2015-12-23 22:13 . Publisher options refactoring
package elasticsearch
import (
"crypto/tls"
"errors"
"net/url"
"strings"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
)
var debug = logp.MakeDebug("elasticsearch")
var (
// ErrNotConnected indicates failure due to client having no valid connection
ErrNotConnected = errors.New("not connected")
// ErrJSONEncodeFailed indicates encoding failures
ErrJSONEncodeFailed = errors.New("json encode failed")
// ErrResponseRead indicates error parsing Elasticsearch response
ErrResponseRead = errors.New("bulk item status parse failed.")
)
const (
defaultMaxRetries = 3
defaultBulkSize = 50
elasticsearchDefaultTimeout = 90 * time.Second
)
func init() {
outputs.RegisterOutputPlugin("elasticsearch", elasticsearchOutputPlugin{})
}
type elasticsearchOutputPlugin struct{}
type elasticsearchOutput struct {
index string
mode mode.ConnectionMode
topology
}
// NewOutput instantiates a new output plugin instance publishing to elasticsearch.
func (f elasticsearchOutputPlugin) NewOutput(
config *outputs.MothershipConfig,
topologyExpire int,
) (outputs.Outputer, error) {
// configure bulk size in config in case it is not set
if config.BulkMaxSize == nil {
bulkSize := defaultBulkSize
config.BulkMaxSize = &bulkSize
}
output := &elasticsearchOutput{}
err := output.init(*config, topologyExpire)
if err != nil {
return nil, err
}
return output, nil
}
func (out *elasticsearchOutput) init(
config outputs.MothershipConfig,
topologyExpire int,
) error {
tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
}
clients, err := mode.MakeClients(config, makeClientFactory(tlsConfig, config))
if err != nil {
return err
}
timeout := elasticsearchDefaultTimeout
if config.Timeout != 0 {
timeout = time.Duration(config.Timeout) * time.Second
}
maxRetries := defaultMaxRetries
if config.MaxRetries != nil {
maxRetries = *config.MaxRetries
}
maxAttempts := maxRetries + 1 // maximum number of send attempts (-1 = infinite)
if maxRetries < 0 {
maxAttempts = 0
}
var waitRetry = time.Duration(1) * time.Second
var maxWaitRetry = time.Duration(60) * time.Second
var m mode.ConnectionMode
out.clients = clients
if len(clients) == 1 {
client := clients[0]
m, err = mode.NewSingleConnectionMode(client, maxAttempts,
waitRetry, timeout, maxWaitRetry)
} else {
loadBalance := config.LoadBalance == nil || *config.LoadBalance
if loadBalance {
m, err = mode.NewLoadBalancerMode(clients, maxAttempts,
waitRetry, timeout, maxWaitRetry)
} else {
m, err = mode.NewFailOverConnectionMode(clients, maxAttempts, waitRetry, timeout)
}
}
if err != nil {
return err
}
if config.Save_topology {
err := out.EnableTTL()
if err != nil {
logp.Err("Fail to set _ttl mapping: %s", err)
// keep trying in the background
go func() {
for {
err := out.EnableTTL()
if err == nil {
break
}
logp.Err("Fail to set _ttl mapping: %s", err)
time.Sleep(5 * time.Second)
}
}()
}
}
out.TopologyExpire = 15000
if topologyExpire != 0 {
out.TopologyExpire = topologyExpire * 1000 // millisec
}
out.mode = m
out.index = config.Index
return nil
}
func makeClientFactory(
tls *tls.Config,
config outputs.MothershipConfig,
) func(string) (mode.ProtocolClient, error) {
return func(host string) (mode.ProtocolClient, error) {
esURL, err := getURL(config.Protocol, config.Path, host)
if err != nil {
logp.Err("Invalid host param set: %s, Error: %v", host, err)
return nil, err
}
var proxyURL *url.URL
if config.ProxyURL != "" {
proxyURL, err = url.Parse(config.ProxyURL)
if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
// Proxy was bogus. Try prepending "http://" to it and
// see if that parses correctly. If not, we fall
// through and complain about the original one.
proxyURL, err = url.Parse("http://" + config.ProxyURL)
if err != nil {
return nil, err
}
}
logp.Info("Using proxy URL: %s", proxyURL)
}
client := NewClient(esURL, config.Index, proxyURL, tls, config.Username, config.Password)
return client, nil
}
}
func (out *elasticsearchOutput) PublishEvent(
signaler outputs.Signaler,
opts outputs.Options,
event common.MapStr,
) error {
return out.mode.PublishEvent(signaler, opts, event)
}
func (out *elasticsearchOutput) BulkPublish(
trans outputs.Signaler,
opts outputs.Options,
events []common.MapStr,
) error {
return out.mode.PublishEvents(trans, opts, events)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v1.1.2

搜索帮助