1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
parse.go 3.73 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2016-11-14 14:50 . More Packetbeat cleanups (#2972)
package memcache
// Generic memcache parser types and helper functions for use by binary and text parser protocol parsers.
import (
"time"
"github.com/elastic/beats/libbeat/common/streambuf"
)
const (
codeSpace byte = ' '
codeTab = '\t'
)
type parserConfig struct {
maxValues int
maxBytesPerValue int
parseUnkown bool
}
type parser struct {
state parserState
message *message
config *parserConfig
}
type parserState uint8
const (
parseStateCommand parserState = iota
parseStateTextCommand
parseStateBinaryCommand
parseStateData
parseStateDataBinary
parseStateIncompleteData
parseStateIncompleteDataBinary
parseStateFailing
)
type parserStateFn func(parser *parser, buf *streambuf.Buffer) parseResult
type argParser func(parser *parser, hdr, buf *streambuf.Buffer) error
type parseResult struct {
err error
msg *message
}
// module init
func init() {
// link parseCommand (break compile time initialization loop check)
parseCommand = doParseCommand
}
func newParser(config *parserConfig) *parser {
var p parser
p.init(config)
return &p
}
func (p *parser) init(config *parserConfig) {
p.state = parseStateCommand
p.message = nil
p.config = config
}
func (p *parser) reset() {
debug("parser(%p) reset", p)
p.init(p.config)
}
func (p *parser) parse(buf *streambuf.Buffer, ts time.Time) (*message, error) {
if p.message == nil {
p.message = newMessage(ts)
}
res := p.dispatch(p.state, buf)
return res.msg, res.err
}
func (p *parser) dispatch(state parserState, buf *streambuf.Buffer) parseResult {
var f parserStateFn
switch state {
case parseStateCommand:
f = parseCommand
case parseStateTextCommand:
f = parseTextCommand
case parseStateBinaryCommand:
f = parseBinaryCommand
case parseStateData:
f = parseData
case parseStateIncompleteData:
f = parseData
case parseStateDataBinary:
f = parseDataBinary
case parseStateIncompleteDataBinary:
f = parseDataBinary
case parseStateFailing:
f = parseFailing
}
return f(p, buf)
}
func (p *parser) needMore() parseResult {
return parseResult{nil, nil}
}
func (p *parser) yield(nbytes int) parseResult {
p.state = parseStateCommand
msg := p.message
msg.Size = uint64(nbytes - int(msg.bytesLost))
p.message = nil
debug("yield(%p) memcache message type %v", p, msg.command.code)
return parseResult{nil, msg}
}
func (p *parser) yieldNoData(buf *streambuf.Buffer) parseResult {
return p.yield(buf.BufferConsumed())
}
func (p *parser) failing(err error) parseResult {
p.state = parseStateFailing
return parseResult{err, nil}
}
func (p *parser) contWith(buf *streambuf.Buffer, state parserState) parseResult {
p.state = state
return p.dispatch(state, buf)
}
func (p *parser) contWithShallow(
buf *streambuf.Buffer,
fn parserStateFn,
) parseResult {
return fn(p, buf)
}
func (p *parser) appendMessageData(data []byte) {
msg := p.message
if p.config.maxValues != 0 {
msg.data = memcacheData{data}
if len(msg.data.data) > p.config.maxBytesPerValue {
msg.data.data = msg.data.data[0:p.config.maxBytesPerValue]
}
msg.values = append(msg.values, msg.data)
}
msg.countValues++
}
func parseFailing(parser *parser, buf *streambuf.Buffer) parseResult {
return parser.failing(errParserCaughtInError)
}
// required to break initialization loop warning
var parseCommand parserStateFn
func doParseCommand(parser *parser, buf *streambuf.Buffer) parseResult {
// check if binary + text command and dispatch
if !buf.Avail(2) {
return parser.needMore()
}
magic := buf.Bytes()[0]
isBinary := magic == memcacheMagicRequest || magic == memcacheMagicResponse
if isBinary {
return parser.contWith(buf, parseStateBinaryCommand)
}
return parser.contWith(buf, parseStateTextCommand)
}
func argparseNoop(p *parser, h, b *streambuf.Buffer) error {
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v5.6.7

搜索帮助