1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
thrift.go 27.08 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2016-11-14 14:50 . More Packetbeat cleanups (#2972)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146
package thrift
import (
"encoding/binary"
"encoding/hex"
"expvar"
"fmt"
"math"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
)
type thriftPlugin struct {
// config
ports []int
stringMaxSize int
collectionMaxSize int
dropAfterNStructFields int
captureReply bool
obfuscateStrings bool
sendRequest bool
sendResponse bool
TransportType byte
ProtocolType byte
transactions *common.Cache
transactionTimeout time.Duration
publishQueue chan *thriftTransaction
results publish.Transactions
idl *thriftIdl
}
type thriftMessage struct {
ts time.Time
tcpTuple common.TCPTuple
cmdlineTuple *common.CmdlineTuple
direction uint8
start int
fields []thriftField
isRequest bool
hasException bool
version uint32
Type uint32
method string
seqID uint32
params string
returnValue string
exceptions string
frameSize uint32
service string
notes []string
}
type thriftField struct {
Type byte
id uint16
value string
}
type thriftStream struct {
tcptuple *common.TCPTuple
data []byte
parseOffset int
parseState int
// when this is set, don't care about the
// traffic in this direction. Used to skip large responses.
skipInput bool
message *thriftMessage
}
type thriftTransaction struct {
tuple common.TCPTuple
src common.Endpoint
dst common.Endpoint
responseTime int32
ts time.Time
bytesIn uint64
bytesOut uint64
request *thriftMessage
reply *thriftMessage
}
const (
thriftStartState = iota
thriftFieldState
)
const (
thriftVersionMask = 0xffff0000
thriftVersion1 = 0x80010000
ThriftTypeMask = 0x000000ff
)
// Thrift types
const (
ThriftTypeStop = 0
ThriftTypeVoid = 1
ThriftTypeBool = 2
ThriftTypeByte = 3
ThriftTypeDouble = 4
ThriftTypeI16 = 6
ThriftTypeI32 = 8
ThriftTypeI64 = 10
ThriftTypeString = 11
ThriftTypeStruct = 12
ThriftTypeMap = 13
ThriftTypeSet = 14
ThriftTypeList = 15
ThriftTypeUtf8 = 16
ThriftTypeUtf16 = 17
)
// Thrift message types
const (
_ = iota
ThriftMsgTypeCall
ThriftMsgTypeReply
ThriftMsgTypeException
ThriftMsgTypeOneway
)
// Thrift protocol types
const (
thriftTBinary = 1
thriftTCompact = 2
)
// Thrift transport types
const (
thriftTSocket = 1
thriftTFramed = 2
)
var (
unmatchedRequests = expvar.NewInt("thrift.unmatched_requests")
unmatchedResponses = expvar.NewInt("thrift.unmatched_responses")
)
func init() {
protos.Register("thrift", New)
}
func New(
testMode bool,
results publish.Transactions,
cfg *common.Config,
) (protos.Plugin, error) {
p := &thriftPlugin{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}
if err := p.init(testMode, results, &config); err != nil {
return nil, err
}
return p, nil
}
func (thrift *thriftPlugin) init(
testMode bool,
results publish.Transactions,
config *thriftConfig,
) error {
thrift.InitDefaults()
err := thrift.readConfig(config)
if err != nil {
return err
}
thrift.transactions = common.NewCache(
thrift.transactionTimeout,
protos.DefaultTransactionHashSize)
thrift.transactions.StartJanitor(thrift.transactionTimeout)
if !testMode {
thrift.publishQueue = make(chan *thriftTransaction, 1000)
thrift.results = results
go thrift.publishTransactions()
}
return nil
}
func (thrift *thriftPlugin) getTransaction(k common.HashableTCPTuple) *thriftTransaction {
v := thrift.transactions.Get(k)
if v != nil {
return v.(*thriftTransaction)
}
return nil
}
func (thrift *thriftPlugin) InitDefaults() {
// defaults
thrift.stringMaxSize = 200
thrift.collectionMaxSize = 15
thrift.dropAfterNStructFields = 500
thrift.TransportType = thriftTSocket
thrift.ProtocolType = thriftTBinary
thrift.captureReply = true
thrift.obfuscateStrings = false
thrift.sendRequest = false
thrift.sendResponse = false
thrift.transactionTimeout = protos.DefaultTransactionExpiration
}
func (thrift *thriftPlugin) readConfig(config *thriftConfig) error {
var err error
thrift.ports = config.Ports
thrift.sendRequest = config.SendRequest
thrift.sendResponse = config.SendResponse
thrift.stringMaxSize = config.StringMaxSize
thrift.collectionMaxSize = config.CollectionMaxSize
thrift.dropAfterNStructFields = config.DropAfterNStructFields
thrift.captureReply = config.CaptureReply
thrift.obfuscateStrings = config.ObfuscateStrings
switch config.TransportType {
case "socket":
thrift.TransportType = thriftTSocket
case "framed":
thrift.TransportType = thriftTFramed
default:
return fmt.Errorf("Transport type `%s` not known", config.TransportType)
}
switch config.ProtocolType {
case "binary":
thrift.ProtocolType = thriftTBinary
default:
return fmt.Errorf("Protocol type `%s` not known", config.ProtocolType)
}
if len(config.IdlFiles) > 0 {
thrift.idl, err = newThriftIdl(config.IdlFiles)
if err != nil {
return err
}
}
return nil
}
func (thrift *thriftPlugin) GetPorts() []int {
return thrift.ports
}
func (m *thriftMessage) String() string {
return fmt.Sprintf("IsRequest: %t Type: %d Method: %s SeqId: %d Params: %s ReturnValue: %s Exceptions: %s",
m.isRequest, m.Type, m.method, m.seqID, m.params, m.returnValue, m.exceptions)
}
func (thrift *thriftPlugin) readMessageBegin(s *thriftStream) (bool, bool) {
var ok, complete bool
var offset, off int
m := s.message
if len(s.data[s.parseOffset:]) < 9 {
return true, false // ok, not complete
}
sz := common.BytesNtohl(s.data[s.parseOffset : s.parseOffset+4])
if int32(sz) < 0 {
m.version = sz & thriftVersionMask
if m.version != thriftVersion1 {
logp.Debug("thrift", "Unexpected version: %d", m.version)
}
logp.Debug("thriftdetailed", "version = %d", m.version)
offset = s.parseOffset + 4
logp.Debug("thriftdetailed", "offset = %d", offset)
m.Type = sz & ThriftTypeMask
m.method, ok, complete, off = thrift.readString(s.data[offset:])
if !ok {
return false, false // not ok, not complete
}
if !complete {
logp.Debug("thriftdetailed", "Method name not complete")
return true, false // ok, not complete
}
offset += off
logp.Debug("thriftdetailed", "method = %s", m.method)
logp.Debug("thriftdetailed", "offset = %d", offset)
if len(s.data[offset:]) < 4 {
logp.Debug("thriftdetailed", "Less then 4 bytes remaining")
return true, false // ok, not complete
}
m.seqID = common.BytesNtohl(s.data[offset : offset+4])
s.parseOffset = offset + 4
} else {
// no version mode
offset = s.parseOffset
m.method, ok, complete, off = thrift.readString(s.data[offset:])
if !ok {
return false, false // not ok, not complete
}
if !complete {
logp.Debug("thriftdetailed", "Method name not complete")
return true, false // ok, not complete
}
offset += off
logp.Debug("thriftdetailed", "method = %s", m.method)
logp.Debug("thriftdetailed", "offset = %d", offset)
if len(s.data[offset:]) < 5 {
return true, false // ok, not complete
}
m.Type = uint32(s.data[offset])
offset++
m.seqID = common.BytesNtohl(s.data[offset : offset+4])
s.parseOffset = offset + 4
}
if m.Type == ThriftMsgTypeCall || m.Type == ThriftMsgTypeOneway {
m.isRequest = true
} else {
m.isRequest = false
}
return true, true
}
// Functions to decode simple types
// They all have the same signature, returning the string value and the
// number of bytes consumed (off).
type thriftFieldReader func(data []byte) (value string, ok bool, complete bool, off int)
// thriftReadString caps the returned value to ThriftStringMaxSize but returns the
// off to the end of it.
func (thrift *thriftPlugin) readString(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 4 {
return "", true, false, 0 // ok, not complete
}
sz := int(common.BytesNtohl(data[:4]))
if int32(sz) < 0 {
return "", false, false, 0 // not ok
}
if len(data[4:]) < sz {
return "", true, false, 0 // ok, not complete
}
if sz > thrift.stringMaxSize {
value = string(data[4 : 4+thrift.stringMaxSize])
value += "..."
} else {
value = string(data[4 : 4+sz])
}
off = 4 + sz
return value, true, true, off // all good
}
func (thrift *thriftPlugin) readAndQuoteString(data []byte) (value string, ok bool, complete bool, off int) {
value, ok, complete, off = thrift.readString(data)
if value == "" {
value = `""`
} else if thrift.obfuscateStrings {
value = `"*"`
} else {
if utf8.ValidString(value) {
value = strconv.Quote(value)
} else {
value = hex.EncodeToString([]byte(value))
}
}
return value, ok, complete, off
}
func (thrift *thriftPlugin) readBool(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 1 {
return "", true, false, 0
}
if data[0] == byte(0) {
value = "false"
} else {
value = "true"
}
return value, true, true, 1
}
func (thrift *thriftPlugin) readByte(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 1 {
return "", true, false, 0
}
value = strconv.Itoa(int(data[0]))
return value, true, true, 1
}
func (thrift *thriftPlugin) readDouble(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 8 {
return "", true, false, 0
}
bits := binary.BigEndian.Uint64(data[:8])
double := math.Float64frombits(bits)
value = strconv.FormatFloat(double, 'f', -1, 64)
return value, true, true, 8
}
func (thrift *thriftPlugin) readI16(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 2 {
return "", true, false, 0
}
i16 := common.BytesNtohs(data[:2])
value = strconv.Itoa(int(i16))
return value, true, true, 2
}
func (thrift *thriftPlugin) readI32(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 4 {
return "", true, false, 0
}
i32 := common.BytesNtohl(data[:4])
value = strconv.Itoa(int(i32))
return value, true, true, 4
}
func (thrift *thriftPlugin) readI64(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 8 {
return "", true, false, 0
}
i64 := common.BytesNtohll(data[:8])
value = strconv.FormatInt(int64(i64), 10)
return value, true, true, 8
}
// Common implementation for lists and sets (they share the same binary repr).
func (thrift *thriftPlugin) readListOrSet(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 5 {
return "", true, false, 0
}
typ := data[0]
funcReader, typeFound := thrift.funcReadersByType(typ)
if !typeFound {
logp.Debug("thrift", "Field type %d not known", typ)
return "", false, false, 0
}
sz := int(common.BytesNtohl(data[1:5]))
if sz < 0 {
logp.Debug("thrift", "List/Set too big: %d", sz)
return "", false, false, 0
}
fields := []string{}
offset := 5
for i := 0; i < sz; i++ {
value, ok, complete, bytesRead := funcReader(data[offset:])
if !ok {
return "", false, false, 0
}
if !complete {
return "", true, false, 0
}
if i < thrift.collectionMaxSize {
fields = append(fields, value)
} else if i == thrift.collectionMaxSize {
fields = append(fields, "...")
}
offset += bytesRead
}
return strings.Join(fields, ", "), true, true, offset
}
func (thrift *thriftPlugin) readSet(data []byte) (value string, ok bool, complete bool, off int) {
value, ok, complete, off = thrift.readListOrSet(data)
if value != "" {
value = "{" + value + "}"
}
return value, ok, complete, off
}
func (thrift *thriftPlugin) readList(data []byte) (value string, ok bool, complete bool, off int) {
value, ok, complete, off = thrift.readListOrSet(data)
if value != "" {
value = "[" + value + "]"
}
return value, ok, complete, off
}
func (thrift *thriftPlugin) readMap(data []byte) (value string, ok bool, complete bool, off int) {
if len(data) < 6 {
return "", true, false, 0
}
typeKey := data[0]
typeValue := data[1]
funcReaderKey, typeFound := thrift.funcReadersByType(typeKey)
if !typeFound {
logp.Debug("thrift", "Field type %d not known", typeKey)
return "", false, false, 0
}
funcReaderValue, typeFound := thrift.funcReadersByType(typeValue)
if !typeFound {
logp.Debug("thrift", "Field type %d not known", typeValue)
return "", false, false, 0
}
sz := int(common.BytesNtohl(data[2:6]))
if sz < 0 {
logp.Debug("thrift", "Map too big: %d", sz)
return "", false, false, 0
}
fields := []string{}
offset := 6
for i := 0; i < sz; i++ {
key, ok, complete, bytesRead := funcReaderKey(data[offset:])
if !ok {
return "", false, false, 0
}
if !complete {
return "", true, false, 0
}
offset += bytesRead
value, ok, complete, bytesRead := funcReaderValue(data[offset:])
if !ok {
return "", false, false, 0
}
if !complete {
return "", true, false, 0
}
offset += bytesRead
if i < thrift.collectionMaxSize {
fields = append(fields, key+": "+value)
} else if i == thrift.collectionMaxSize {
fields = append(fields, "...")
}
}
return "{" + strings.Join(fields, ", ") + "}", true, true, offset
}
func (thrift *thriftPlugin) readStruct(data []byte) (value string, ok bool, complete bool, off int) {
var bytesRead int
offset := 0
fields := []thriftField{}
// Loop until hitting a STOP or reaching the maximum number of elements
// we follow in a stream (at which point, we assume we interpreted something
// wrong).
for i := 0; ; i++ {
var field thriftField
if i >= thrift.dropAfterNStructFields {
logp.Debug("thrift", "Too many fields in struct. Dropping as error")
return "", false, false, 0
}
if len(data) < 1 {
return "", true, false, 0
}
field.Type = byte(data[offset])
offset++
if field.Type == ThriftTypeStop {
return thrift.formatStruct(fields, false, []*string{}), true, true, offset
}
if len(data[offset:]) < 2 {
return "", true, false, 0 // not complete
}
field.id = common.BytesNtohs(data[offset : offset+2])
offset += 2
funcReader, typeFound := thrift.funcReadersByType(field.Type)
if !typeFound {
logp.Debug("thrift", "Field type %d not known", field.Type)
return "", false, false, 0
}
field.value, ok, complete, bytesRead = funcReader(data[offset:])
if !ok {
return "", false, false, 0
}
if !complete {
return "", true, false, 0
}
fields = append(fields, field)
offset += bytesRead
}
}
func (thrift *thriftPlugin) formatStruct(fields []thriftField, resolveNames bool,
fieldnames []*string) string {
toJoin := []string{}
for i, field := range fields {
if i == thrift.collectionMaxSize {
toJoin = append(toJoin, "...")
break
}
if resolveNames && int(field.id) < len(fieldnames) && fieldnames[field.id] != nil {
toJoin = append(toJoin, *fieldnames[field.id]+": "+field.value)
} else {
toJoin = append(toJoin, strconv.Itoa(int(field.id))+": "+field.value)
}
}
return "(" + strings.Join(toJoin, ", ") + ")"
}
// Dictionary wrapped in a function to avoid "initialization loop"
func (thrift *thriftPlugin) funcReadersByType(typ byte) (fn thriftFieldReader, exists bool) {
switch typ {
case ThriftTypeBool:
return thrift.readBool, true
case ThriftTypeByte:
return thrift.readByte, true
case ThriftTypeDouble:
return thrift.readDouble, true
case ThriftTypeI16:
return thrift.readI16, true
case ThriftTypeI32:
return thrift.readI32, true
case ThriftTypeI64:
return thrift.readI64, true
case ThriftTypeString:
return thrift.readAndQuoteString, true
case ThriftTypeList:
return thrift.readList, true
case ThriftTypeSet:
return thrift.readSet, true
case ThriftTypeMap:
return thrift.readMap, true
case ThriftTypeStruct:
return thrift.readStruct, true
default:
return nil, false
}
}
func (thrift *thriftPlugin) readField(s *thriftStream) (ok bool, complete bool, field *thriftField) {
var off int
field = new(thriftField)
if len(s.data) == 0 {
return true, false, nil // ok, not complete
}
field.Type = byte(s.data[s.parseOffset])
offset := s.parseOffset + 1
if field.Type == ThriftTypeStop {
s.parseOffset = offset
return true, true, nil // done
}
if len(s.data[offset:]) < 2 {
return true, false, nil // ok, not complete
}
field.id = common.BytesNtohs(s.data[offset : offset+2])
offset += 2
funcReader, typeFound := thrift.funcReadersByType(field.Type)
if !typeFound {
logp.Debug("thrift", "Field type %d not known", field.Type)
return false, false, nil
}
field.value, ok, complete, off = funcReader(s.data[offset:])
if !ok {
return false, false, nil
}
if !complete {
return true, false, nil
}
offset += off
s.parseOffset = offset
return true, false, field
}
func (thrift *thriftPlugin) messageParser(s *thriftStream) (bool, bool) {
var ok, complete bool
var m = s.message
logp.Debug("thriftdetailed", "messageParser called parseState=%v offset=%v",
s.parseState, s.parseOffset)
for s.parseOffset < len(s.data) {
switch s.parseState {
case thriftStartState:
m.start = s.parseOffset
if thrift.TransportType == thriftTFramed {
// read I32
if len(s.data) < 4 {
return true, false
}
m.frameSize = common.BytesNtohl(s.data[:4])
s.parseOffset = 4
}
ok, complete = thrift.readMessageBegin(s)
logp.Debug("thriftdetailed", "readMessageBegin returned: %v %v", ok, complete)
if !ok {
return false, false
}
if !complete {
return true, false
}
if !m.isRequest && !thrift.captureReply {
// don't actually read the result
logp.Debug("thrift", "Don't capture reply")
m.returnValue = ""
m.exceptions = ""
return true, true
}
s.parseState = thriftFieldState
case thriftFieldState:
ok, complete, field := thrift.readField(s)
logp.Debug("thriftdetailed", "readField returned: %v %v", ok, complete)
if !ok {
return false, false
}
if complete {
// done
var method *thriftIdlMethod
if thrift.idl != nil {
method = thrift.idl.findMethod(m.method)
}
if m.isRequest {
if method != nil {
m.params = thrift.formatStruct(m.fields, true, method.params)
m.service = method.service.Name
} else {
m.params = thrift.formatStruct(m.fields, false, nil)
}
} else {
if len(m.fields) > 1 {
logp.Warn("Thrift RPC response with more than field. Ignoring all but first")
}
if len(m.fields) > 0 {
field := m.fields[0]
if field.id == 0 {
m.returnValue = field.value
m.exceptions = ""
} else {
m.returnValue = ""
if method != nil {
m.exceptions = thrift.formatStruct(m.fields, true, method.exceptions)
} else {
m.exceptions = thrift.formatStruct(m.fields, false, nil)
}
m.hasException = true
}
}
}
return true, true
}
if field == nil {
return true, false // ok, not complete
}
m.fields = append(m.fields, *field)
}
}
return true, false
}
// messageGap is called when a gap of size `nbytes` is found in the
// tcp stream. Returns true if there is already enough data in the message
// read so far that we can use it further in the stack.
func (thrift *thriftPlugin) messageGap(s *thriftStream, nbytes int) (complete bool) {
m := s.message
switch s.parseState {
case thriftStartState:
// not enough data yet to be useful
return false
case thriftFieldState:
if !m.isRequest {
// large response case, can tolerate loss
m.notes = append(m.notes, "Packet loss while capturing the response")
return true
}
}
return false
}
func (stream *thriftStream) prepareForNewMessage(flush bool) {
if flush {
stream.data = []byte{}
} else {
stream.data = stream.data[stream.parseOffset:]
}
//logp.Debug("thrift", "remaining data: [%s]", stream.data)
stream.parseOffset = 0
stream.message = nil
stream.parseState = thriftStartState
}
type thriftPrivateData struct {
data [2]*thriftStream
}
func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8,
stream *thriftStream, priv *thriftPrivateData) {
flush := false
if stream.message.isRequest {
logp.Debug("thrift", "Thrift request message: %s", stream.message.method)
if !thrift.captureReply {
// enable the stream in the other direction to get the reply
streamRev := priv.data[1-dir]
if streamRev != nil {
streamRev.skipInput = false
}
}
} else {
logp.Debug("thrift", "Thrift response message: %s", stream.message.method)
if !thrift.captureReply {
// disable stream in this direction
stream.skipInput = true
// and flush current data
flush = true
}
}
// all ok, go to next level
stream.message.tcpTuple = *tcptuple
stream.message.direction = dir
stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort())
if stream.message.frameSize == 0 {
stream.message.frameSize = uint32(stream.parseOffset - stream.message.start)
}
thrift.handleThrift(stream.message)
// and reset message
stream.prepareForNewMessage(flush)
}
func (thrift *thriftPlugin) ConnectionTimeout() time.Duration {
return thrift.transactionTimeout
}
func (thrift *thriftPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("ParseThrift exception")
priv := thriftPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(thriftPrivateData)
if !ok {
priv = thriftPrivateData{}
}
}
stream := priv.data[dir]
if stream == nil {
stream = &thriftStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &thriftMessage{ts: pkt.Ts},
}
priv.data[dir] = stream
} else {
if stream.skipInput {
// stream currently suspended in this direction
return priv
}
// concatenate bytes
stream.data = append(stream.data, pkt.Payload...)
if len(stream.data) > tcp.TCPMaxDataInStream {
logp.Debug("thrift", "Stream data too large, dropping TCP stream")
priv.data[dir] = nil
return priv
}
}
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &thriftMessage{ts: pkt.Ts}
}
ok, complete := thrift.messageParser(priv.data[dir])
logp.Debug("thriftdetailed", "messageParser returned %v %v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.data[dir] = nil
logp.Debug("thrift", "Ignore Thrift message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
thrift.messageComplete(tcptuple, dir, stream, &priv)
} else {
// wait for more data
break
}
}
logp.Debug("thriftdetailed", "Out")
return priv
}
func (thrift *thriftPlugin) handleThrift(msg *thriftMessage) {
if msg.isRequest {
thrift.receivedRequest(msg)
} else {
thrift.receivedReply(msg)
}
}
func (thrift *thriftPlugin) receivedRequest(msg *thriftMessage) {
tuple := msg.tcpTuple
trans := thrift.getTransaction(tuple.Hashable())
if trans != nil {
logp.Debug("thrift", "Two requests without reply, assuming the old one is oneway")
unmatchedRequests.Add(1)
thrift.publishQueue <- trans
}
trans = &thriftTransaction{
tuple: tuple,
}
thrift.transactions.Put(tuple.Hashable(), trans)
trans.ts = msg.ts
trans.src = common.Endpoint{
IP: msg.tcpTuple.SrcIP.String(),
Port: msg.tcpTuple.SrcPort,
Proc: string(msg.cmdlineTuple.Src),
}
trans.dst = common.Endpoint{
IP: msg.tcpTuple.DstIP.String(),
Port: msg.tcpTuple.DstPort,
Proc: string(msg.cmdlineTuple.Dst),
}
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
}
trans.request = msg
trans.bytesIn = uint64(msg.frameSize)
}
func (thrift *thriftPlugin) receivedReply(msg *thriftMessage) {
// we need to search the request first.
tuple := msg.tcpTuple
trans := thrift.getTransaction(tuple.Hashable())
if trans == nil {
logp.Debug("thrift", "Response from unknown transaction. Ignoring: %v", tuple)
unmatchedResponses.Add(1)
return
}
if trans.request.method != msg.method {
logp.Debug("thrift", "Response from another request received '%s' '%s'"+
". Ignoring.", trans.request.method, msg.method)
unmatchedResponses.Add(1)
return
}
trans.reply = msg
trans.bytesOut = uint64(msg.frameSize)
trans.responseTime = int32(msg.ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds
thrift.publishQueue <- trans
thrift.transactions.Delete(tuple.Hashable())
logp.Debug("thrift", "Transaction queued")
}
func (thrift *thriftPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
trans := thrift.getTransaction(tcptuple.Hashable())
if trans != nil {
if trans.request != nil && trans.reply == nil {
logp.Debug("thrift", "FIN and had only one transaction. Assuming one way")
thrift.publishQueue <- trans
thrift.transactions.Delete(trans.tuple.Hashable())
}
}
return private
}
func (thrift *thriftPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(thrift) exception")
logp.Debug("thriftdetailed", "GapInStream called")
if private == nil {
return private, false
}
thriftData, ok := private.(thriftPrivateData)
if !ok {
return private, false
}
stream := thriftData.data[dir]
if stream == nil || stream.message == nil {
// nothing to do
return private, false
}
if thrift.messageGap(stream, nbytes) {
// we need to publish from here
thrift.messageComplete(tcptuple, dir, stream, &thriftData)
}
// we always drop the TCP stream. Because it's binary and len based,
// there are too few cases in which we could recover the stream (maybe
// for very large blobs, leaving that as TODO)
return private, true
}
func (thrift *thriftPlugin) publishTransactions() {
for t := range thrift.publishQueue {
event := common.MapStr{}
event["type"] = "thrift"
if t.reply != nil && t.reply.hasException {
event["status"] = common.ERROR_STATUS
} else {
event["status"] = common.OK_STATUS
}
event["responsetime"] = t.responseTime
thriftmap := common.MapStr{}
if t.request != nil {
event["method"] = t.request.method
event["path"] = t.request.service
event["query"] = fmt.Sprintf("%s%s", t.request.method, t.request.params)
event["bytes_in"] = t.bytesIn
event["bytes_out"] = t.bytesOut
thriftmap = common.MapStr{
"params": t.request.params,
}
if len(t.request.service) > 0 {
thriftmap["service"] = t.request.service
}
if thrift.sendRequest {
event["request"] = fmt.Sprintf("%s%s", t.request.method,
t.request.params)
}
}
if t.reply != nil {
thriftmap["return_value"] = t.reply.returnValue
if len(t.reply.exceptions) > 0 {
thriftmap["exceptions"] = t.reply.exceptions
}
event["bytes_out"] = uint64(t.reply.frameSize)
if thrift.sendResponse {
if !t.reply.hasException {
event["response"] = t.reply.returnValue
} else {
event["response"] = fmt.Sprintf("Exceptions: %s",
t.reply.exceptions)
}
}
if len(t.reply.notes) > 0 {
event["notes"] = t.reply.notes
}
} else {
event["bytes_out"] = 0
}
event["thrift"] = thriftmap
event["@timestamp"] = common.Time(t.ts)
event["src"] = &t.src
event["dst"] = &t.dst
if thrift.results != nil {
thrift.results.PublishTransaction(event)
}
logp.Debug("thrift", "Published event")
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v5.6.16

搜索帮助