1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
client.go 4.15 KB
一键复制 编辑 原始数据 按行查看 历史
urso 提交于 2016-01-20 15:27 . Filebeat async publisher support
package publisher
import (
"expvar"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
)
// Metrics that can retrieved through the expvar web interface.
var (
publishedEvents = expvar.NewInt("libbeatPublishedEvents")
)
// Client is used by beats to publish new events.
type Client interface {
// PublishEvent publishes one event with given options. If Sync option is set,
// PublishEvent will block until output plugins report success or failure state
// being returned by this method.
PublishEvent(event common.MapStr, opts ...ClientOption) bool
// PublishEvents publishes multiple events with given options. If Guaranteed
// option is set, PublishEvent will block until output plugins report
// success or failure state being returned by this method.
PublishEvents(events []common.MapStr, opts ...ClientOption) bool
}
// ChanClient will forward all published events one by one to the given channel
type ChanClient struct {
Channel chan common.MapStr
}
type ExtChanClient struct {
Channel chan PublishMessage
}
type PublishMessage struct {
Context Context
Events []common.MapStr
}
type client struct {
publisher *PublisherType
beatMeta common.MapStr
tags []string
}
// ClientOption allows API users to set additional options when publishing events.
type ClientOption func(option Context) Context
// Guaranteed option will retry publishing the event, until send attempt have
// been ACKed by output plugin.
func Guaranteed(o Context) Context {
o.Guaranteed = true
return o
}
// Sync option will block the event publisher until an event has been ACKed by
// the output plugin or failed.
func Sync(o Context) Context {
o.Sync = true
return o
}
func Signal(signaler outputs.Signaler) ClientOption {
return func(ctx Context) Context {
if ctx.Signal == nil {
ctx.Signal = signaler
} else {
ctx.Signal = outputs.NewCompositeSignaler(ctx.Signal, signaler)
}
return ctx
}
}
func newClient(pub *PublisherType) *client {
return &client{
publisher: pub,
beatMeta: common.MapStr{
"name": pub.name,
"hostname": pub.hostname,
},
tags: pub.tags,
}
}
func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
c.annotateEvent(event)
ctx, client := c.getClient(opts)
publishedEvents.Add(1)
return client.PublishEvent(ctx, event)
}
func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
for _, event := range events {
c.annotateEvent(event)
}
ctx, client := c.getClient(opts)
publishedEvents.Add(int64(len(events)))
return client.PublishEvents(ctx, events)
}
func (c *client) annotateEvent(event common.MapStr) {
event["beat"] = c.beatMeta
if len(c.tags) > 0 {
event["tags"] = c.tags
}
if logp.IsDebug("publish") {
PrintPublishEvent(event)
}
}
func (c *client) getClient(opts []ClientOption) (Context, eventPublisher) {
ctx := makeContext(opts)
if ctx.Sync {
return ctx, c.publisher.syncPublisher.client()
}
return ctx, c.publisher.asyncPublisher.client()
}
// PublishEvent will publish the event on the channel. Options will be ignored.
// Always returns true.
func (c ChanClient) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
c.Channel <- event
return true
}
// PublishEvents publishes all event on the configured channel. Options will be ignored.
// Always returns true.
func (c ChanClient) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
for _, event := range events {
c.Channel <- event
}
return true
}
// PublishEvent will publish the event on the channel. Options will be ignored.
// Always returns true.
func (c ExtChanClient) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
c.Channel <- PublishMessage{makeContext(opts), []common.MapStr{event}}
return true
}
// PublishEvents publishes all event on the configured channel. Options will be ignored.
// Always returns true.
func (c ExtChanClient) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
c.Channel <- PublishMessage{makeContext(opts), events}
return true
}
func makeContext(opts []ClientOption) Context {
var ctx Context
for _, opt := range opts {
ctx = opt(ctx)
}
return ctx
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v1.2.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385