代码拉取完成,页面将自动刷新
docker
prospector to filter on stream (#6057)
package reader
import (
"bytes"
"encoding/json"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/pkg/errors"
)
// DockerJSON processor renames a given field
type DockerJSON struct {
reader Reader
// stream filter, `all`, `stderr` or `stdout`
stream string
}
type dockerLog struct {
Timestamp string `json:"time"`
Log string `json:"log"`
Stream string `json:"stream"`
}
// NewDockerJSON creates a new reader renaming a field
func NewDockerJSON(r Reader, stream string) *DockerJSON {
return &DockerJSON{
stream: stream,
reader: r,
}
}
// Next returns the next line.
func (p *DockerJSON) Next() (Message, error) {
for {
message, err := p.reader.Next()
if err != nil {
return message, err
}
var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}
if p.stream != "all" && p.stream != line.Stream {
continue
}
// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}
message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。