1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mysql.go 22.88 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930
package mysql
import (
"errors"
"fmt"
"strings"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
)
// Packet types
const (
MYSQL_CMD_QUERY = 3
)
const MAX_PAYLOAD_SIZE = 100 * 1024
type MysqlMessage struct {
start int
end int
Ts time.Time
IsRequest bool
PacketLength uint32
Seq uint8
Typ uint8
NumberOfRows int
NumberOfFields int
Size uint64
Fields []string
Rows [][]string
Tables string
IsOK bool
AffectedRows uint64
InsertId uint64
IsError bool
ErrorCode uint16
ErrorInfo string
Query string
IgnoreMessage bool
Direction uint8
IsTruncated bool
TcpTuple common.TcpTuple
CmdlineTuple *common.CmdlineTuple
Raw []byte
Notes []string
}
type MysqlTransaction struct {
Type string
tuple common.TcpTuple
Src common.Endpoint
Dst common.Endpoint
ResponseTime int32
Ts int64
JsTs time.Time
ts time.Time
Query string
Method string
Path string // for mysql, Path refers to the mysql table queried
BytesOut uint64
BytesIn uint64
Notes []string
Mysql common.MapStr
Request_raw string
Response_raw string
}
type MysqlStream struct {
tcptuple *common.TcpTuple
data []byte
parseOffset int
parseState parseState
isClient bool
message *MysqlMessage
}
type parseState int
const (
mysqlStateStart parseState = iota
mysqlStateEatMessage
mysqlStateEatFields
mysqlStateEatRows
MysqlStateMax
)
var stateStrings []string = []string{
"Start",
"EatMessage",
"EatFields",
"EatRows",
}
func (state parseState) String() string {
return stateStrings[state]
}
type Mysql struct {
// config
Ports []int
maxStoreRows int
maxRowLength int
Send_request bool
Send_response bool
transactions *common.Cache
transactionTimeout time.Duration
results publish.Transactions
// function pointer for mocking
handleMysql func(mysql *Mysql, m *MysqlMessage, tcp *common.TcpTuple,
dir uint8, raw_msg []byte)
}
func (mysql *Mysql) getTransaction(k common.HashableTcpTuple) *MysqlTransaction {
v := mysql.transactions.Get(k)
if v != nil {
return v.(*MysqlTransaction)
}
return nil
}
func (mysql *Mysql) InitDefaults() {
mysql.maxRowLength = 1024
mysql.maxStoreRows = 10
mysql.Send_request = false
mysql.Send_response = false
mysql.transactionTimeout = protos.DefaultTransactionExpiration
}
func (mysql *Mysql) setFromConfig(config config.Mysql) error {
mysql.Ports = config.Ports
if config.Max_row_length != nil {
mysql.maxRowLength = *config.Max_row_length
}
if config.Max_rows != nil {
mysql.maxStoreRows = *config.Max_rows
}
if config.SendRequest != nil {
mysql.Send_request = *config.SendRequest
}
if config.SendResponse != nil {
mysql.Send_response = *config.SendResponse
}
if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
mysql.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}
return nil
}
func (mysql *Mysql) GetPorts() []int {
return mysql.Ports
}
func (mysql *Mysql) Init(test_mode bool, results publish.Transactions) error {
mysql.InitDefaults()
if !test_mode {
err := mysql.setFromConfig(config.ConfigSingleton.Protocols.Mysql)
if err != nil {
return err
}
}
mysql.transactions = common.NewCache(
mysql.transactionTimeout,
protos.DefaultTransactionHashSize)
mysql.transactions.StartJanitor(mysql.transactionTimeout)
mysql.handleMysql = handleMysql
mysql.results = results
return nil
}
func (stream *MysqlStream) PrepareForNewMessage() {
stream.data = stream.data[stream.parseOffset:]
stream.parseState = mysqlStateStart
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}
func mysqlMessageParser(s *MysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)
m := s.message
for s.parseOffset < len(s.data) {
switch s.parseState {
case mysqlStateStart:
m.start = s.parseOffset
if len(s.data[s.parseOffset:]) < 5 {
logp.Warn("MySQL Message too short. Ignore it.")
return false, false
}
hdr := s.data[s.parseOffset : s.parseOffset+5]
m.PacketLength = uint32(hdr[0]) | uint32(hdr[1])<<8 | uint32(hdr[2])<<16
m.Seq = uint8(hdr[3])
m.Typ = uint8(hdr[4])
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d", m.PacketLength, m.Seq, m.Typ)
if m.Seq == 0 {
// starts Command Phase
if m.Typ == MYSQL_CMD_QUERY {
// parse request
m.IsRequest = true
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
} else {
// ignore command
m.IgnoreMessage = true
s.parseState = mysqlStateEatMessage
}
if !s.isClient {
s.isClient = true
}
} else if !s.isClient {
// parse response
m.IsRequest = false
if uint8(hdr[4]) == 0x00 || uint8(hdr[4]) == 0xfe {
logp.Debug("mysqldetailed", "Received OK response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.IsOK = true
} else if uint8(hdr[4]) == 0xff {
logp.Debug("mysqldetailed", "Received ERR response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.IsError = true
} else if m.PacketLength == 1 {
logp.Debug("mysqldetailed", "Query response. Number of fields %d", uint8(hdr[4]))
m.NumberOfFields = int(hdr[4])
m.start = s.parseOffset
s.parseOffset += 5
s.parseState = mysqlStateEatFields
} else {
// something else. ignore
m.IgnoreMessage = true
s.parseState = mysqlStateEatMessage
}
} else {
// something else, not expected
logp.Warn("Unexpected MySQL message of type %d received.", m.Typ)
return false, false
}
break
case mysqlStateEatMessage:
if len(s.data[s.parseOffset:]) >= int(m.PacketLength)+4 {
s.parseOffset += 4 //header
s.parseOffset += int(m.PacketLength)
m.end = s.parseOffset
if m.IsRequest {
m.Query = string(s.data[m.start+5 : m.end])
} else if m.IsOK {
// affected rows
affectedRows, off, complete, err := read_linteger(s.data, m.start+5)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.AffectedRows = affectedRows
// last insert id
insertId, off, complete, err := read_linteger(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.InsertId = insertId
} else if m.IsError {
// int<1>header (0xff)
// int<2>error code
// string[1] sql state marker
// string[5] sql state
// string<EOF> error message
m.ErrorCode = uint16(s.data[m.start+6])<<8 | uint16(s.data[m.start+5])
m.ErrorInfo = string(s.data[m.start+8:m.start+13]) + ": " + string(s.data[m.start+13:])
}
m.Size = uint64(m.end - m.start)
logp.Debug("mysqldetailed", "Message complete. remaining=%d", len(s.data[s.parseOffset:]))
return true, true
} else {
// wait for more
return true, false
}
case mysqlStateEatFields:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.PacketLength = uint32(hdr[0]) | uint32(hdr[1])<<8 | uint32(hdr[2])<<16
m.Seq = uint8(hdr[3])
logp.Debug("mysqldetailed", "Fields: packet length %d, packet number %d", m.PacketLength, m.Seq)
if len(s.data[s.parseOffset:]) >= int(m.PacketLength)+4 {
s.parseOffset += 4 // header
if uint8(s.data[s.parseOffset]) == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.PacketLength)
s.parseState = mysqlStateEatRows
} else {
_ /* catalog */, off, complete, err := read_lstring(s.data, s.parseOffset)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
db /*schema */, off, complete, err := read_lstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
table /* table */, off, complete, err := read_lstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
db_table := string(db) + "." + string(table)
if len(m.Tables) == 0 {
m.Tables = db_table
} else if !strings.Contains(m.Tables, db_table) {
m.Tables = m.Tables + ", " + db_table
}
logp.Debug("mysqldetailed", "db=%s, table=%s", db, table)
s.parseOffset += int(m.PacketLength)
// go to next field
}
} else {
// wait for more
return true, false
}
break
case mysqlStateEatRows:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.PacketLength = uint32(hdr[0]) | uint32(hdr[1])<<8 | uint32(hdr[2])<<16
m.Seq = uint8(hdr[3])
logp.Debug("mysqldetailed", "Rows: packet length %d, packet number %d", m.PacketLength, m.Seq)
if len(s.data[s.parseOffset:]) >= int(m.PacketLength)+4 {
s.parseOffset += 4 //header
if uint8(s.data[s.parseOffset]) == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.PacketLength)
if m.end == 0 {
m.end = s.parseOffset
} else {
m.IsTruncated = true
}
if !m.IsError {
// in case the response was sent successfully
m.IsOK = true
}
m.Size = uint64(m.end - m.start)
return true, true
} else {
s.parseOffset += int(m.PacketLength)
if m.end == 0 && s.parseOffset > MAX_PAYLOAD_SIZE {
// only send up to here, but read until the end
m.end = s.parseOffset
}
m.NumberOfRows += 1
// go to next row
}
} else {
// wait for more
return true, false
}
break
}
}
return true, false
}
// messageGap is called when a gap of size `nbytes` is found in the
// tcp stream. Returns true if there is already enough data in the message
// read so far that we can use it further in the stack.
func (mysql *Mysql) messageGap(s *MysqlStream, nbytes int) (complete bool) {
m := s.message
switch s.parseState {
case mysqlStateStart, mysqlStateEatMessage:
// not enough data yet to be useful
return false
case mysqlStateEatFields, mysqlStateEatRows:
// enough data here
m.end = s.parseOffset
if m.IsRequest {
m.Notes = append(m.Notes, "Packet loss while capturing the request")
} else {
m.Notes = append(m.Notes, "Packet loss while capturing the response")
}
return true
}
return true
}
type mysqlPrivateData struct {
Data [2]*MysqlStream
}
// Called when the parser has identified a full message.
func (mysql *Mysql) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *MysqlStream) {
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.IgnoreMessage {
mysql.handleMysql(mysql, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.PrepareForNewMessage()
}
func (mysql *Mysql) ConnectionTimeout() time.Duration {
return mysql.transactionTimeout
}
func (mysql *Mysql) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple,
dir uint8, private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("ParseMysql exception")
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
if priv.Data[dir] == nil {
priv.Data[dir] = &MysqlStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &MysqlMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("mysql", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}
stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &MysqlMessage{Ts: pkt.Ts}
}
ok, complete := mysqlMessageParser(priv.Data[dir])
//logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%b complete=%b", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
}
return priv
}
func (mysql *Mysql) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
defer logp.Recover("GapInStream(mysql) exception")
if private == nil {
return private, false
}
mysqlData, ok := private.(mysqlPrivateData)
if !ok {
return private, false
}
stream := mysqlData.Data[dir]
if stream == nil || stream.message == nil {
// nothing to do
return private, false
}
if mysql.messageGap(stream, nbytes) {
// we need to publish from here
mysql.messageComplete(tcptuple, dir, stream)
}
// we always drop the TCP stream. Because it's binary and len based,
// there are too few cases in which we could recover the stream (maybe
// for very large blobs, leaving that as TODO)
return private, true
}
func (mysql *Mysql) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
func handleMysql(mysql *Mysql, m *MysqlMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg []byte) {
m.TcpTuple = *tcptuple
m.Direction = dir
m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IpPort())
m.Raw = raw_msg
if m.IsRequest {
mysql.receivedMysqlRequest(m)
} else {
mysql.receivedMysqlResponse(m)
}
}
func (mysql *Mysql) receivedMysqlRequest(msg *MysqlMessage) {
tuple := msg.TcpTuple
trans := mysql.getTransaction(tuple.Hashable())
if trans != nil {
if trans.Mysql != nil {
logp.Debug("mysql", "Two requests without a Response. Dropping old request: %s", trans.Mysql)
}
} else {
trans = &MysqlTransaction{Type: "mysql", tuple: tuple}
mysql.transactions.Put(tuple.Hashable(), trans)
}
trans.ts = msg.Ts
trans.Ts = int64(trans.ts.UnixNano() / 1000) // transactions have microseconds resolution
trans.JsTs = msg.Ts
trans.Src = common.Endpoint{
Ip: msg.TcpTuple.Src_ip.String(),
Port: msg.TcpTuple.Src_port,
Proc: string(msg.CmdlineTuple.Src),
}
trans.Dst = common.Endpoint{
Ip: msg.TcpTuple.Dst_ip.String(),
Port: msg.TcpTuple.Dst_port,
Proc: string(msg.CmdlineTuple.Dst),
}
if msg.Direction == tcp.TcpDirectionReverse {
trans.Src, trans.Dst = trans.Dst, trans.Src
}
// Extract the method, by simply taking the first word and
// making it upper case.
query := strings.Trim(msg.Query, " \n\t")
index := strings.IndexAny(query, " \n\t")
var method string
if index > 0 {
method = strings.ToUpper(query[:index])
} else {
method = strings.ToUpper(query)
}
trans.Query = query
trans.Method = method
trans.Mysql = common.MapStr{}
trans.Notes = msg.Notes
// save Raw message
trans.Request_raw = msg.Query
trans.BytesIn = msg.Size
}
func (mysql *Mysql) receivedMysqlResponse(msg *MysqlMessage) {
trans := mysql.getTransaction(msg.TcpTuple.Hashable())
if trans == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
return
}
// check if the request was received
if trans.Mysql == nil {
logp.Warn("Response from unknown transaction. Ignoring.")
return
}
// save json details
trans.Mysql.Update(common.MapStr{
"affected_rows": msg.AffectedRows,
"insert_id": msg.InsertId,
"num_rows": msg.NumberOfRows,
"num_fields": msg.NumberOfFields,
"iserror": msg.IsError,
"error_code": msg.ErrorCode,
"error_message": msg.ErrorInfo,
})
trans.BytesOut = msg.Size
trans.Path = msg.Tables
trans.ResponseTime = int32(msg.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds
// save Raw message
if len(msg.Raw) > 0 {
fields, rows := mysql.parseMysqlResponse(msg.Raw)
trans.Response_raw = common.DumpInCSVFormat(fields, rows)
}
trans.Notes = append(trans.Notes, msg.Notes...)
mysql.publishTransaction(trans)
mysql.transactions.Delete(trans.tuple.Hashable())
logp.Debug("mysql", "Mysql transaction completed: %s", trans.Mysql)
logp.Debug("mysql", "%s", trans.Response_raw)
}
func (mysql *Mysql) parseMysqlResponse(data []byte) ([]string, [][]string) {
length, err := read_length(data, 0)
if err != nil {
logp.Warn("Invalid response: %v", err)
return []string{}, [][]string{}
}
if length < 1 {
logp.Warn("Warning: Skipping empty Response")
return []string{}, [][]string{}
}
fields := []string{}
rows := [][]string{}
if len(data) < 5 {
logp.Warn("Invalid response: data less than 4 bytes")
return []string{}, [][]string{}
}
if uint8(data[4]) == 0x00 {
// OK response
} else if uint8(data[4]) == 0xff {
// Error response
} else {
offset := 5
logp.Debug("mysql", "Data len: %d", len(data))
// Read fields
for {
length, err = read_length(data, offset)
if err != nil {
logp.Warn("Invalid response: %v", err)
return []string{}, [][]string{}
}
if len(data[offset:]) < 5 {
logp.Warn("Invalid response.")
return []string{}, [][]string{}
}
if uint8(data[offset+4]) == 0xfe {
// EOF
offset += length + 4
break
}
_ /* catalog */, off, complete, err := read_lstring(data, offset+4)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*database*/, off, complete, err = read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*table*/, off, complete, err = read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*org table*/, off, complete, err = read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
name, off, complete, err := read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /* org name */, off, complete, err = read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
fields = append(fields, string(name))
offset += length + 4
if len(data) < offset {
logp.Warn("Invalid response.")
return []string{}, [][]string{}
}
}
// Read rows
for offset < len(data) {
var row []string
var row_len int
if len(data[offset:]) < 5 {
logp.Warn("Invalid response.")
break
}
if uint8(data[offset+4]) == 0xfe {
// EOF
offset += length + 4
break
}
length, err = read_length(data, offset)
if err != nil {
logp.Warn("Invalid response: %v", err)
break
}
off := offset + 4 // skip length + packet number
start := off
for off < start+length {
var text []byte
if uint8(data[off]) == 0xfb {
text = []byte("NULL")
off++
} else {
var err error
var complete bool
text, off, complete, err = read_lstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Error parsing rows: %s %b", err, complete)
// nevertheless, return what we have so far
return fields, rows
}
}
if row_len < mysql.maxRowLength {
if row_len+len(text) > mysql.maxRowLength {
text = text[:mysql.maxRowLength-row_len]
}
row = append(row, string(text))
row_len += len(text)
}
}
logp.Debug("mysqldetailed", "Append row: %v", row)
rows = append(rows, row)
if len(rows) >= mysql.maxStoreRows {
break
}
offset += length + 4
}
}
return fields, rows
}
func (mysql *Mysql) publishTransaction(t *MysqlTransaction) {
if mysql.results == nil {
return
}
logp.Debug("mysql", "mysql.results exists")
event := common.MapStr{}
event["type"] = "mysql"
if t.Mysql["iserror"].(bool) {
event["status"] = common.ERROR_STATUS
} else {
event["status"] = common.OK_STATUS
}
event["responsetime"] = t.ResponseTime
if mysql.Send_request {
event["request"] = t.Request_raw
}
if mysql.Send_response {
event["response"] = t.Response_raw
}
event["method"] = t.Method
event["query"] = t.Query
event["mysql"] = t.Mysql
event["path"] = t.Path
event["bytes_out"] = t.BytesOut
event["bytes_in"] = t.BytesIn
if len(t.Notes) > 0 {
event["notes"] = t.Notes
}
event["@timestamp"] = common.Time(t.ts)
event["src"] = &t.Src
event["dst"] = &t.Dst
mysql.results.PublishTransaction(event)
}
func read_lstring(data []byte, offset int) ([]byte, int, bool, error) {
length, off, complete, err := read_linteger(data, offset)
if err != nil {
return nil, 0, false, err
}
if !complete || len(data[off:]) < int(length) {
return nil, 0, false, nil
}
return data[off : off+int(length)], off + int(length), true, nil
}
func read_linteger(data []byte, offset int) (uint64, int, bool, error) {
if len(data) < offset+1 {
return 0, 0, false, nil
}
switch uint8(data[offset]) {
case 0xfe:
if len(data[offset+1:]) < 8 {
return 0, 0, false, nil
}
return uint64(data[offset+1]) | uint64(data[offset+2])<<8 |
uint64(data[offset+2])<<16 | uint64(data[offset+3])<<24 |
uint64(data[offset+4])<<32 | uint64(data[offset+5])<<40 |
uint64(data[offset+6])<<48 | uint64(data[offset+7])<<56,
offset + 9, true, nil
case 0xfd:
if len(data[offset+1:]) < 3 {
return 0, 0, false, nil
}
return uint64(data[offset+1]) | uint64(data[offset+2])<<8 |
uint64(data[offset+3])<<16, offset + 4, true, nil
case 0xfc:
if len(data[offset+1:]) < 2 {
return 0, 0, false, nil
}
return uint64(data[offset+1]) | uint64(data[offset+2])<<8, offset + 3, true, nil
}
if uint64(data[offset]) >= 0xfb {
return 0, 0, false, fmt.Errorf("Unexpected value in read_linteger")
}
return uint64(data[offset]), offset + 1, true, nil
}
// Read a mysql length field (3 bytes LE)
func read_length(data []byte, offset int) (int, error) {
if len(data[offset:]) < 3 {
return 0, errors.New("Data too small to contain a valid length")
}
length := uint32(data[offset]) |
uint32(data[offset+1])<<8 |
uint32(data[offset+2])<<16
return int(length), nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v1.2.1

搜索帮助

A270a887 8829481 3d7a4017 8829481