1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sync.go 2.83 KB
一键复制 编辑 原始数据 按行查看 历史
package logstash
import (
"time"
"github.com/elastic/go-lumber/client/v2"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/transport"
)
const (
minWindowSize int = 1
defaultStartMaxWindowSize int = 10
)
type client struct {
*transport.Client
client *v2.SyncClient
win window
}
func newLumberjackClient(
conn *transport.Client,
compressLevel int,
maxWindowSize int,
timeout time.Duration,
beat string,
) (*client, error) {
c := &client{}
c.Client = conn
c.win.init(defaultStartMaxWindowSize, maxWindowSize)
enc, err := makeLogstashEventEncoder(beat)
if err != nil {
return nil, err
}
cl, err := v2.NewSyncClientWithConn(conn,
v2.JSONEncoder(enc),
v2.Timeout(timeout),
v2.CompressionLevel(compressLevel))
if err != nil {
return nil, err
}
c.client = cl
return c, nil
}
func (c *client) Connect(timeout time.Duration) error {
logp.Debug("logstash", "connect")
return c.Client.Connect()
}
func (c *client) Close() error {
logp.Debug("logstash", "close connection")
return c.Client.Close()
}
func (c *client) PublishEvent(data outputs.Data) error {
_, err := c.PublishEvents([]outputs.Data{data})
return err
}
// PublishEvents sends all events to logstash. On error a slice with all events
// not published or confirmed to be processed by logstash will be returned.
func (c *client) PublishEvents(
data []outputs.Data,
) ([]outputs.Data, error) {
publishEventsCallCount.Add(1)
totalNumberOfEvents := len(data)
for len(data) > 0 {
n, err := c.publishWindowed(data)
debug("%v events out of %v events sent to logstash. Continue sending",
n, len(data))
data = data[n:]
if err != nil {
c.win.shrinkWindow()
_ = c.Close()
logp.Err("Failed to publish events caused by: %v", err)
eventsNotAcked.Add(int64(len(data)))
ackedEvents.Add(int64(totalNumberOfEvents - len(data)))
return data, err
}
}
ackedEvents.Add(int64(totalNumberOfEvents))
return nil, nil
}
// publishWindowed published events with current maximum window size to logstash
// returning the total number of events sent (due to window size, or acks until
// failure).
func (c *client) publishWindowed(data []outputs.Data) (int, error) {
if len(data) == 0 {
return 0, nil
}
batchSize := len(data)
windowSize := c.win.get()
debug("Try to publish %v events to logstash with window size %v",
batchSize, windowSize)
// prepare message payload
if batchSize > windowSize {
data = data[:windowSize]
}
n, err := c.sendEvents(data)
if err != nil {
return n, err
}
c.win.tryGrowWindow(batchSize)
return len(data), nil
}
func (c *client) sendEvents(data []outputs.Data) (int, error) {
if len(data) == 0 {
return 0, nil
}
window := make([]interface{}, len(data))
for i, d := range data {
window[i] = d
}
return c.client.Send(window)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v5.3.1

搜索帮助