1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
batch.go 1.48 KB
一键复制 编辑 原始数据 按行查看 历史
package pipeline
import (
"sync"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
type Batch struct {
original queue.Batch
ctx *batchContext
ttl int
events []publisher.Event
}
type batchContext struct {
observer outputObserver
retryer *retryer
}
var batchPool = sync.Pool{
New: func() interface{} {
return &Batch{}
},
}
func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
if original == nil {
panic("empty batch")
}
b := batchPool.Get().(*Batch)
*b = Batch{
original: original,
ctx: ctx,
ttl: ttl,
events: original.Events(),
}
return b
}
func releaseBatch(b *Batch) {
*b = Batch{} // clear batch
batchPool.Put(b)
}
func (b *Batch) Events() []publisher.Event {
return b.events
}
func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events))
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Drop() {
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Retry() {
b.ctx.retryer.retry(b)
}
func (b *Batch) Cancelled() {
b.ctx.retryer.cancelled(b)
}
func (b *Batch) RetryEvents(events []publisher.Event) {
b.updEvents(events)
b.Retry()
}
func (b *Batch) CancelledEvents(events []publisher.Event) {
b.updEvents(events)
b.Cancelled()
}
func (b *Batch) updEvents(events []publisher.Event) {
l1 := len(b.events)
l2 := len(events)
if l1 > l2 {
// report subset of events not to be retried as ACKed
b.ctx.observer.outBatchACKed(l1 - l2)
}
b.events = events
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.1.1

搜索帮助