1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
client.go 17.87 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
package elasticsearch
import (
"bytes"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)
type Client struct {
Connection
tlsConfig *transport.TLSConfig
index outil.Selector
pipeline *outil.Selector
params map[string]string
timeout time.Duration
// buffered bulk requests
bulkRequ *bulkRequest
// buffered json response reader
json jsonReader
// additional configs
compressionLevel int
proxyURL *url.URL
}
type ClientSettings struct {
URL string
Proxy *url.URL
TLS *transport.TLSConfig
Username, Password string
Parameters map[string]string
Headers map[string]string
Index outil.Selector
Pipeline *outil.Selector
Timeout time.Duration
CompressionLevel int
}
type connectCallback func(client *Client) error
type Connection struct {
URL string
Username string
Password string
Headers map[string]string
http *http.Client
onConnectCallback func() error
encoder bodyEncoder
version string
}
// Metrics that can retrieved through the expvar web interface.
var (
ackedEvents = expvar.NewInt("libbeat.es.published_and_acked_events")
eventsNotAcked = expvar.NewInt("libbeat.es.published_but_not_acked_events")
publishEventsCallCount = expvar.NewInt("libbeat.es.call_count.PublishEvents")
statReadBytes = expvar.NewInt("libbeat.es.publish.read_bytes")
statWriteBytes = expvar.NewInt("libbeat.es.publish.write_bytes")
statReadErrors = expvar.NewInt("libbeat.es.publish.read_errors")
statWriteErrors = expvar.NewInt("libbeat.es.publish.write_errors")
)
var (
nameItems = []byte("items")
nameStatus = []byte("status")
nameError = []byte("error")
)
var (
errExpectedItemObject = errors.New("expected item response object")
errExpectedStatusCode = errors.New("expected item status code")
errUnexpectedEmptyObject = errors.New("empty object")
errExcpectedObjectEnd = errors.New("expected end of object")
)
const (
eventType = "doc"
)
// NewClient instantiates a new client.
func NewClient(
s ClientSettings,
onConnectCallback connectCallback,
) (*Client, error) {
proxy := http.ProxyFromEnvironment
if s.Proxy != nil {
proxy = http.ProxyURL(s.Proxy)
}
pipeline := s.Pipeline
if pipeline != nil && pipeline.IsEmpty() {
pipeline = nil
}
u, err := url.Parse(s.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err)
}
if u.User != nil {
s.Username = u.User.Username()
s.Password, _ = u.User.Password()
u.User = nil
// Re-write URL without credentials.
s.URL = u.String()
}
logp.Info("Elasticsearch url: %s", s.URL)
// TODO: add socks5 proxy support
var dialer, tlsDialer transport.Dialer
dialer = transport.NetDialer(s.Timeout)
tlsDialer, err = transport.TLSDialer(dialer, s.TLS, s.Timeout)
if err != nil {
return nil, err
}
iostats := &transport.IOStats{
Read: statReadBytes,
Write: statWriteBytes,
ReadErrors: statReadErrors,
WriteErrors: statWriteErrors,
}
dialer = transport.StatsDialer(dialer, iostats)
tlsDialer = transport.StatsDialer(tlsDialer, iostats)
params := s.Parameters
bulkRequ, err := newBulkRequest(s.URL, "", "", params, nil)
if err != nil {
return nil, err
}
var encoder bodyEncoder
compression := s.CompressionLevel
if compression == 0 {
encoder = newJSONEncoder(nil)
} else {
encoder, err = newGzipEncoder(compression, nil)
if err != nil {
return nil, err
}
}
client := &Client{
Connection: Connection{
URL: s.URL,
Username: s.Username,
Password: s.Password,
Headers: s.Headers,
http: &http.Client{
Transport: &http.Transport{
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
Proxy: proxy,
},
Timeout: s.Timeout,
},
encoder: encoder,
},
tlsConfig: s.TLS,
index: s.Index,
pipeline: pipeline,
params: params,
timeout: s.Timeout,
bulkRequ: bulkRequ,
compressionLevel: compression,
proxyURL: s.Proxy,
}
client.Connection.onConnectCallback = func() error {
if onConnectCallback != nil {
return onConnectCallback(client)
}
return nil
}
return client, nil
}
func (client *Client) Clone() *Client {
// when cloning the connection callback and params are not copied. A
// client's close is for example generated for topology-map support. With params
// most likely containing the ingest node pipeline and default callback trying to
// create install a template, we don't want these to be included in the clone.
c, _ := NewClient(
ClientSettings{
URL: client.URL,
Index: client.index,
Pipeline: client.pipeline,
Proxy: client.proxyURL,
TLS: client.tlsConfig,
Username: client.Username,
Password: client.Password,
Parameters: nil, // XXX: do not pass params?
Headers: client.Headers,
Timeout: client.http.Timeout,
CompressionLevel: client.compressionLevel,
},
nil, // XXX: do not pass connection callback?
)
return c
}
// PublishEvents sends all events to elasticsearch. On error a slice with all
// events not published or confirmed to be processed by elasticsearch will be
// returned. The input slice backing memory will be reused by return the value.
func (client *Client) PublishEvents(
data []outputs.Data,
) ([]outputs.Data, error) {
begin := time.Now()
publishEventsCallCount.Add(1)
if len(data) == 0 {
return nil, nil
}
body := client.encoder
body.Reset()
// encode events into bulk request buffer, dropping failed elements from
// events slice
data = bulkEncodePublishRequest(body, client.index, client.pipeline, data)
if len(data) == 0 {
return nil, nil
}
requ := client.bulkRequ
requ.Reset(body)
status, result, sendErr := client.sendBulkRequest(requ)
if sendErr != nil {
logp.Err("Failed to perform any bulk index operations: %s", sendErr)
return data, sendErr
}
debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
len(data),
time.Now().Sub(begin))
// check response for transient errors
var failedEvents []outputs.Data
if status != 200 {
failedEvents = data
} else {
client.json.init(result.raw)
failedEvents = bulkCollectPublishFails(&client.json, data)
}
ackedEvents.Add(int64(len(data) - len(failedEvents)))
eventsNotAcked.Add(int64(len(failedEvents)))
if len(failedEvents) > 0 {
if sendErr == nil {
sendErr = mode.ErrTempBulkFailure
}
return failedEvents, sendErr
}
return nil, nil
}
// fillBulkRequest encodes all bulk requests and returns slice of events
// successfully added to bulk request.
func bulkEncodePublishRequest(
body bulkWriter,
index outil.Selector,
pipeline *outil.Selector,
data []outputs.Data,
) []outputs.Data {
okEvents := data[:0]
for _, datum := range data {
meta := createEventBulkMeta(index, pipeline, datum)
if err := body.Add(meta, datum.Event); err != nil {
logp.Err("Failed to encode event: %s", err)
continue
}
okEvents = append(okEvents, datum)
}
return okEvents
}
func createEventBulkMeta(
index outil.Selector,
pipelineSel *outil.Selector,
data outputs.Data,
) interface{} {
event := data.Event
pipeline, err := getPipeline(data, pipelineSel)
if err != nil {
logp.Err("Failed to select pipeline: %v", err)
}
if pipeline == "" {
type bulkMetaIndex struct {
Index string `json:"_index"`
DocType string `json:"_type"`
}
type bulkMeta struct {
Index bulkMetaIndex `json:"index"`
}
return bulkMeta{
Index: bulkMetaIndex{
Index: getIndex(event, index),
DocType: eventType,
},
}
}
type bulkMetaIndex struct {
Index string `json:"_index"`
DocType string `json:"_type"`
Pipeline string `json:"pipeline"`
}
type bulkMeta struct {
Index bulkMetaIndex `json:"index"`
}
return bulkMeta{
Index: bulkMetaIndex{
Index: getIndex(event, index),
Pipeline: pipeline,
DocType: eventType,
},
}
}
func getPipeline(data outputs.Data, pipelineSel *outil.Selector) (string, error) {
if meta := outputs.GetMetadata(data.Values); meta != nil {
if pipeline, exists := meta["pipeline"]; exists {
if p, ok := pipeline.(string); ok {
return p, nil
}
return "", errors.New("pipeline metadata is no string")
}
}
if pipelineSel != nil {
return pipelineSel.Select(data.Event)
}
return "", nil
}
// getIndex returns the full index name
// Index is either defined in the config as part of the output
// or can be overload by the event through setting index
func getIndex(event common.MapStr, index outil.Selector) string {
ts := time.Time(event["@timestamp"].(common.Time)).UTC()
// Check for dynamic index
// XXX: is this used/needed?
if _, ok := event["beat"]; ok {
beatMeta, ok := event["beat"].(common.MapStr)
if ok {
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
if dynamicIndexValue, ok := dynamicIndex.(string); ok {
return fmt.Sprintf("%s-%d.%02d.%02d",
dynamicIndexValue, ts.Year(), ts.Month(), ts.Day())
}
}
}
}
str, _ := index.Select(event)
return str
}
// bulkCollectPublishFails checks per item errors returning all events
// to be tried again due to error code returned for that items. If indexing an
// event failed due to some error in the event itself (e.g. does not respect mapping),
// the event will be dropped.
func bulkCollectPublishFails(
reader *jsonReader,
data []outputs.Data,
) []outputs.Data {
if err := reader.expectDict(); err != nil {
logp.Err("Failed to parse bulk respose: expected JSON object")
return nil
}
// find 'items' field in response
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response")
return nil
}
if kind == dictEnd {
logp.Err("Failed to parse bulk response: no 'items' field in response")
return nil
}
// found items array -> continue
if bytes.Equal(name, nameItems) {
break
}
reader.ignoreNext()
}
// check items field is an array
if err := reader.expectArray(); err != nil {
logp.Err("Failed to parse bulk respose: expected items array")
return nil
}
count := len(data)
failed := data[:0]
for i := 0; i < count; i++ {
status, msg, err := itemStatus(reader)
if err != nil {
return nil
}
if status < 300 {
continue // ok value
}
if status < 500 && status != 429 {
// hard failure, don't collect
logp.Warn("Can not index event (status=%v): %s", status, msg)
continue
}
debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
failed = append(failed, data[i])
}
return failed
}
func itemStatus(reader *jsonReader) (int, []byte, error) {
// skip outer dictionary
if err := reader.expectDict(); err != nil {
return 0, nil, errExpectedItemObject
}
// find first field in outer dictionary (e.g. 'create')
kind, _, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response item: %s", err)
return 0, nil, err
}
if kind == dictEnd {
err = errUnexpectedEmptyObject
logp.Err("Failed to parse bulk response item: %s", err)
return 0, nil, err
}
// parse actual item response code and error message
status, msg, err := itemStatusInner(reader)
// close dictionary. Expect outer dictionary to have only one element
kind, _, err = reader.step()
if err != nil {
logp.Err("Failed to parse bulk response item: %s", err)
return 0, nil, err
}
if kind != dictEnd {
err = errExcpectedObjectEnd
logp.Err("Failed to parse bulk response item: %s", err)
return 0, nil, err
}
return status, msg, nil
}
func itemStatusInner(reader *jsonReader) (int, []byte, error) {
if err := reader.expectDict(); err != nil {
return 0, nil, errExpectedItemObject
}
status := -1
var msg []byte
for {
kind, name, err := reader.nextFieldName()
if err != nil {
logp.Err("Failed to parse bulk response item: %s", err)
}
if kind == dictEnd {
break
}
switch {
case bytes.Equal(name, nameStatus): // name == "status"
status, err = reader.nextInt()
if err != nil {
logp.Err("Failed to parse bulk response item: %s", err)
return 0, nil, err
}
case bytes.Equal(name, nameError): // name == "error"
msg, err = reader.ignoreNext() // collect raw string for "error" field
if err != nil {
return 0, nil, err
}
default: // ignore unknown fields
_, err = reader.ignoreNext()
if err != nil {
return 0, nil, err
}
}
}
if status < 0 {
return 0, nil, errExpectedStatusCode
}
return status, msg, nil
}
func (client *Client) PublishEvent(data outputs.Data) error {
// insert the events one by one
event := data.Event
index := getIndex(event, client.index)
debugf("Publish event: %s", event)
pipeline, err := getPipeline(data, client.pipeline)
if err != nil {
logp.Err("Failed to select pipeline: %v", err)
}
if pipeline != "" {
debugf("select pipeline: %v", pipeline)
}
var status int
if pipeline == "" {
status, _, err = client.Index(index, eventType, "", client.params, event)
} else {
status, _, err = client.Ingest(index, eventType, pipeline, "", client.params, event)
}
// check indexing error
if err != nil {
logp.Warn("Fail to insert a single event: %s", err)
if err == ErrJSONEncodeFailed {
// don't retry unencodable values
return nil
}
}
switch {
case status == 0: // event was not send yet
return nil
case status >= 500 || status == 429: // server error, retry
return err
case status >= 300 && status < 500:
// won't be able to index event in Elasticsearch => don't retry
return nil
}
return nil
}
// LoadTemplate loads a template into Elasticsearch overwriting the existing
// template if it exists. If you wish to not overwrite an existing template
// then use CheckTemplate prior to calling this method.
func (client *Client) LoadTemplate(templateName string, template map[string]interface{}) error {
path := "/_template/" + templateName
body, err := client.LoadJSON(path, template)
if err != nil {
return fmt.Errorf("couldn't load template: %v. Response body: %s", err, body)
}
logp.Info("Elasticsearch template with name '%s' loaded", templateName)
return nil
}
func (client *Client) LoadJSON(path string, json map[string]interface{}) ([]byte, error) {
status, body, err := client.Request("PUT", path, "", nil, json)
if err != nil {
return body, fmt.Errorf("couldn't load json. Error: %s", err)
}
if status > 300 {
return body, fmt.Errorf("couldn't load json. Status: %v", status)
}
return body, nil
}
// GetVersion returns the elasticsearch version the client is connected to
func (client *Client) GetVersion() string {
return client.Connection.version
}
// CheckTemplate checks if a given template already exist. It returns true if
// and only if Elasticsearch returns with HTTP status code 200.
func (client *Client) CheckTemplate(templateName string) bool {
status, _, _ := client.Request("HEAD", "/_template/"+templateName, "", nil, nil)
if status != 200 {
return false
}
return true
}
func (conn *Connection) Connect(timeout time.Duration) error {
var err error
conn.version, err = conn.Ping(timeout)
if err != nil {
return err
}
err = conn.onConnectCallback()
if err != nil {
return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %v", err)
}
return nil
}
// Ping sends a GET request to the Elasticsearch
func (conn *Connection) Ping(timeout time.Duration) (string, error) {
debugf("ES Ping(url=%v, timeout=%v)", conn.URL, timeout)
conn.http.Timeout = timeout
status, body, err := conn.execRequest("GET", conn.URL, nil)
if err != nil {
debugf("Ping request failed with: %v", err)
return "", err
}
if status >= 300 {
return "", fmt.Errorf("Non 2xx response code: %d", status)
}
var response struct {
Version struct {
Number string
}
}
err = json.Unmarshal(body, &response)
if err != nil {
return "", fmt.Errorf("Failed to parse JSON response: %v", err)
}
debugf("Ping status code: %v", status)
logp.Info("Connected to Elasticsearch version %s", response.Version.Number)
return response.Version.Number, nil
}
func (conn *Connection) Close() error {
return nil
}
func (conn *Connection) Request(
method, path string,
pipeline string,
params map[string]string,
body interface{},
) (int, []byte, error) {
url := makeURL(conn.URL, path, pipeline, params)
debugf("%s %s %s %v", method, url, pipeline, body)
if body == nil {
return conn.execRequest(method, url, nil)
}
if err := conn.encoder.Marshal(body); err != nil {
logp.Warn("Failed to json encode body (%v): %#v", err, body)
return 0, nil, ErrJSONEncodeFailed
}
return conn.execRequest(method, url, conn.encoder.Reader())
}
func (conn *Connection) execRequest(
method, url string,
body io.Reader,
) (int, []byte, error) {
req, err := http.NewRequest(method, url, body)
if err != nil {
logp.Warn("Failed to create request", err)
return 0, nil, err
}
if body != nil {
conn.encoder.AddHeader(&req.Header)
}
return conn.execHTTPRequest(req)
}
func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) {
req.Header.Add("Accept", "application/json")
if conn.Username != "" || conn.Password != "" {
req.SetBasicAuth(conn.Username, conn.Password)
}
for name, value := range conn.Headers {
req.Header.Add(name, value)
}
resp, err := conn.http.Do(req)
if err != nil {
return 0, nil, err
}
defer closing(resp.Body)
status := resp.StatusCode
var retErr error
if status >= 300 {
retErr = fmt.Errorf("%v", resp.Status)
}
obj, err := ioutil.ReadAll(resp.Body)
if err != nil {
return status, nil, retErr
}
return status, obj, retErr
}
func (conn *Connection) GetVersion() string {
return conn.version
}
func closing(c io.Closer) {
err := c.Close()
if err != nil {
logp.Warn("Close failed with: %v", err)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v5.6.8

搜索帮助

344bd9b3 5694891 D2dac590 5694891