代码拉取完成,页面将自动刷新
package pipeline
import (
"sync"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
type Batch struct {
original queue.Batch
ctx *batchContext
ttl int
events []publisher.Event
}
type batchContext struct {
observer outputObserver
retryer *retryer
}
var batchPool = sync.Pool{
New: func() interface{} {
return &Batch{}
},
}
func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
if original == nil {
panic("empty batch")
}
b := batchPool.Get().(*Batch)
*b = Batch{
original: original,
ctx: ctx,
ttl: ttl,
events: original.Events(),
}
return b
}
func releaseBatch(b *Batch) {
*b = Batch{} // clear batch
batchPool.Put(b)
}
func (b *Batch) Events() []publisher.Event {
return b.events
}
func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events))
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Drop() {
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Retry() {
b.ctx.retryer.retry(b)
}
func (b *Batch) Cancelled() {
b.ctx.retryer.cancelled(b)
}
func (b *Batch) RetryEvents(events []publisher.Event) {
b.updEvents(events)
b.Retry()
}
func (b *Batch) CancelledEvents(events []publisher.Event) {
b.updEvents(events)
b.Cancelled()
}
func (b *Batch) updEvents(events []publisher.Event) {
l1 := len(b.events)
l2 := len(events)
if l1 > l2 {
// report subset of events not to be retried as ACKed
b.ctx.observer.outBatchACKed(l1 - l2)
}
b.events = events
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。