1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
consumer.go 3.51 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2017-07-31 22:21 . Pipeline cleanups (#4776)
package pipeline
import (
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/queue"
)
// eventConsumer collects and forwards events from the queue to the outputs work queue.
// The eventConsumer is managed by the controller and receives additional pause signals
// from the retryer in case of too many events failing to be send or if retryer
// is receiving cancelled batches from outputs to be closed on output reloading.
type eventConsumer struct {
logger *logp.Logger
done chan struct{}
ctx *batchContext
pause atomic.Bool
wait atomic.Bool
sig chan consumerSignal
queue queue.Queue
consumer queue.Consumer
out *outputGroup
}
type consumerSignal struct {
tag consumerEventTag
consumer queue.Consumer
out *outputGroup
}
type consumerEventTag uint8
const (
sigConsumerCheck consumerEventTag = iota
sigConsumerUpdateOutput
sigConsumerUpdateInput
)
func newEventConsumer(
log *logp.Logger,
queue queue.Queue,
ctx *batchContext,
) *eventConsumer {
c := &eventConsumer{
logger: log,
done: make(chan struct{}),
sig: make(chan consumerSignal, 3),
out: nil,
queue: queue,
consumer: queue.Consumer(),
ctx: ctx,
}
c.pause.Store(true)
go c.loop(c.consumer)
return c
}
func (c *eventConsumer) close() {
c.consumer.Close()
close(c.done)
}
func (c *eventConsumer) sigWait() {
c.wait.Store(true)
c.sigHint()
}
func (c *eventConsumer) sigUnWait() {
c.wait.Store(false)
c.sigHint()
}
func (c *eventConsumer) sigPause() {
c.pause.Store(true)
c.sigHint()
}
func (c *eventConsumer) sigContinue() {
c.pause.Store(false)
c.sigHint()
}
func (c *eventConsumer) sigHint() {
// send signal to unblock a consumer trying to publish events.
// With flags being set atomically, multiple signals can be compressed into one
// signal -> drop if queue is not empty
select {
case c.sig <- consumerSignal{tag: sigConsumerCheck}:
default:
}
}
func (c *eventConsumer) updOutput(grp *outputGroup) {
// close consumer to break consumer worker from pipeline
c.consumer.Close()
// update output
c.sig <- consumerSignal{
tag: sigConsumerUpdateOutput,
out: grp,
}
// update eventConsumer with new queue connection
c.consumer = c.queue.Consumer()
c.sig <- consumerSignal{
tag: sigConsumerUpdateInput,
consumer: c.consumer,
}
}
func (c *eventConsumer) loop(consumer queue.Consumer) {
log := c.logger
log.Debug("start pipeline event consumer")
var (
out workQueue
batch *Batch
paused = true
)
handleSignal := func(sig consumerSignal) {
switch sig.tag {
case sigConsumerCheck:
case sigConsumerUpdateOutput:
c.out = sig.out
case sigConsumerUpdateInput:
consumer = sig.consumer
}
paused = c.paused()
if !paused && c.out != nil && batch != nil {
out = c.out.workQueue
} else {
out = nil
}
}
for {
if !paused && c.out != nil && consumer != nil && batch == nil {
out = c.out.workQueue
queueBatch, err := consumer.Get(c.out.batchSize)
if err != nil {
out = nil
consumer = nil
continue
}
batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
paused = c.paused()
if paused {
out = nil
}
}
select {
case sig := <-c.sig:
handleSignal(sig)
continue
default:
}
select {
case <-c.done:
log.Debug("stop pipeline event consumer")
return
case sig := <-c.sig:
handleSignal(sig)
case out <- batch:
batch = nil
}
}
}
func (c *eventConsumer) paused() bool {
return c.pause.Load() || c.wait.Load()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.1.4

搜索帮助