1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
amqp_methods.go 21.45 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2016-11-14 14:50 . More Packetbeat cleanups (#2972)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
package amqp
import (
"encoding/binary"
"strconv"
"strings"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
func connectionStartMethod(m *amqpMessage, args []byte) (bool, bool) {
major := args[0]
minor := args[1]
properties := make(common.MapStr)
next, err, exists := getTable(properties, args, 2)
if err {
//failed to get de peer-properties, size may be wrong, let's quit
logp.Warn("Failed to parse server properties in connection.start method")
return false, false
}
mechanisms, next, err := getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection mechanisms")
return false, false
}
locales, _, err := getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection locales")
return false, false
}
m.method = "connection.start"
m.isRequest = true
m.fields = common.MapStr{
"version-major": major,
"version-minor": minor,
"mechanisms": mechanisms,
"locales": locales,
}
//if there is a server properties table, add it
if exists {
m.fields["server-properties"] = properties
}
return true, true
}
func connectionStartOkMethod(m *amqpMessage, args []byte) (bool, bool) {
properties := make(common.MapStr)
next, err, exists := getTable(properties, args, 0)
if err {
//failed to get de peer-properties, size may be wrong, let's quit
logp.Warn("Failed to parse server properties in connection.start method")
return false, false
}
mechanism, next, err := getShortString(args, next+1, uint32(args[next]))
if err {
logp.Warn("Failed to get connection mechanism from client")
return false, false
}
_, next, err = getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection response from client")
return false, false
}
locale, _, err := getShortString(args, next+1, uint32(args[next]))
if err {
logp.Warn("Failed to get connection locale from client")
return false, false
}
m.isRequest = false
m.fields = common.MapStr{
"mechanism": mechanism,
"locale": locale,
}
//if there is a client properties table, add it
if exists {
m.fields["client-properties"] = properties
}
return true, true
}
func connectionTuneMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "connection.tune"
//parameters are not parsed here, they are further negotiated by the server
//in the connection.tune-ok method
return true, true
}
func connectionTuneOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.fields = common.MapStr{
"channel-max": binary.BigEndian.Uint16(args[0:2]),
"frame-max": binary.BigEndian.Uint32(args[2:6]),
"heartbeat": binary.BigEndian.Uint16(args[6:8]),
}
return true, true
}
func connectionOpenMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "connection.open"
host, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Failed to get virtual host from client")
return false, false
}
m.fields = common.MapStr{"virtual-host": host}
return true, true
}
func connectionCloseMethod(m *amqpMessage, args []byte) (bool, bool) {
err := getCloseInfo(args, m)
if err {
return false, false
}
m.method = "connection.close"
m.isRequest = true
return true, true
}
func channelOpenMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.open"
m.isRequest = true
return true, true
}
func channelFlowMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.flow"
m.isRequest = true
return true, true
}
func channelFlowOkMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[0])
m.fields = common.MapStr{"active": params[0]}
return true, true
}
func channelCloseMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.close"
m.isRequest = true
err := getCloseInfo(args, m)
if err {
return false, false
}
return true, true
}
//function to fetch fields from channel close and connection close
func getCloseInfo(args []byte, m *amqpMessage) bool {
code := binary.BigEndian.Uint16(args[0:2])
m.isRequest = true
replyText, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Failed to get error reply text")
return true
}
m.fields = common.MapStr{
"reply-code": code,
"reply-text": replyText,
"class-id": binary.BigEndian.Uint16(args[nextOffset : nextOffset+2]),
"method-id": binary.BigEndian.Uint16(args[nextOffset+2 : nextOffset+4]),
}
return false
}
func queueDeclareMethod(m *amqpMessage, args []byte) (bool, bool) {
name, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue declaration")
return false, false
}
m.isRequest = true
m.method = "queue.declare"
params := getBitParams(args[offset])
m.request = name
m.fields = common.MapStr{
"queue": name,
"passive": params[0],
"durable": params[1],
"exclusive": params[2],
"auto-delete": params[3],
"no-wait": params[4],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queueDeclareOkMethod(m *amqpMessage, args []byte) (bool, bool) {
name, nextOffset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting name of queue in queue confirmation")
return false, false
}
m.method = "queue.declare-ok"
m.fields = common.MapStr{
"queue": name,
"consumer-count": binary.BigEndian.Uint32(args[nextOffset+4:]),
"message-count": binary.BigEndian.Uint32(args[nextOffset : nextOffset+4]),
}
return true, true
}
func queueBindMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
m.isRequest = true
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
params := getBitParams(args[offset])
m.method = "queue.bind"
m.request = strings.Join([]string{queue, exchange}, " ")
m.fields = common.MapStr{
"queue": queue,
"routing-key": routingKey,
"no-wait": params[0],
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queueUnbindMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
m.isRequest = true
m.method = "queue.unbind"
m.request = strings.Join([]string{queue, exchange}, " ")
m.fields = common.MapStr{
"queue": queue,
"routing-key": routingKey,
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queuePurgeMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue purge")
return false, false
}
m.isRequest = true
params := getBitParams(args[nextOffset])
m.method = "queue.purge"
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"no-wait": params[0],
}
return true, true
}
func queuePurgeOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "queue.purge-ok"
m.fields = common.MapStr{
"message-count": binary.BigEndian.Uint32(args[0:4]),
}
return true, true
}
func queueDeleteMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue delete")
return false, false
}
m.isRequest = true
params := getBitParams(args[nextOffset])
m.method = "queue.delete"
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"if-unused": params[0],
"if-empty": params[1],
"no-wait": params[2],
}
return true, true
}
func queueDeleteOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "queue.delete-ok"
m.fields = common.MapStr{
"message-count": binary.BigEndian.Uint32(args[0:4]),
}
return true, true
}
func exchangeDeclareMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of exchange in exchange declare")
return false, false
}
exchangeType, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of routing key in exchange declare")
return false, false
}
params := getBitParams(args[offset])
m.method = "exchange.declare"
m.isRequest = true
m.request = exchange
if exchangeType == "" {
exchangeType = "direct"
}
m.fields = common.MapStr{
"exchange": exchange,
"exchange-type": exchangeType,
"passive": params[0],
"durable": params[1],
"no-wait": params[4],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func exchangeDeleteMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of exchange in exchange delete")
return false, false
}
m.method = "exchange.delete"
m.isRequest = true
params := getBitParams(args[nextOffset])
m.request = exchange
m.fields = common.MapStr{
"exchange": exchange,
"if-unused": params[0],
"no-wait": params[1],
}
return true, true
}
//this is a method exclusive to RabbitMQ
func exchangeBindMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "exchange.bind"
err := exchangeBindUnbindInfo(m, args)
if err {
return false, false
}
return true, true
}
//this is a method exclusive to RabbitMQ
func exchangeUnbindMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "exchange.unbind"
err := exchangeBindUnbindInfo(m, args)
if err {
return false, false
}
return true, true
}
func exchangeBindUnbindInfo(m *amqpMessage, args []byte) bool {
destination, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of destination in exchange bind/unbind")
return true
}
source, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of source in exchange bind/unbind")
return true
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of routing-key in exchange bind/unbind")
return true
}
m.isRequest = true
params := getBitParams(args[offset])
m.request = strings.Join([]string{source, destination}, " ")
m.fields = common.MapStr{
"destination": destination,
"source": source,
"routing-key": routingKey,
"no-wait": params[0],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return false
}
func basicQosMethod(m *amqpMessage, args []byte) (bool, bool) {
prefetchSize := binary.BigEndian.Uint32(args[0:4])
prefetchCount := binary.BigEndian.Uint16(args[4:6])
params := getBitParams(args[6])
m.isRequest = true
m.method = "basic.qos"
m.fields = common.MapStr{
"prefetch-size": prefetchSize,
"prefetch-count": prefetchCount,
"global": params[0],
}
return true, true
}
func basicConsumeMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in basic consume")
return false, false
}
consumerTag, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of consumer tag in basic consume")
return false, false
}
params := getBitParams(args[offset])
m.method = "basic.consume"
m.isRequest = true
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"consumer-tag": consumerTag,
"no-local": params[0],
"no-ack": params[1],
"exclusive": params[2],
"no-wait": params[3],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func basicConsumeOkMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting name of queue in basic consume")
return false, false
}
m.method = "basic.consume-ok"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
}
return true, true
}
func basicCancelMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, offset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting consumer tag in basic cancel")
return false, false
}
m.method = "basic.cancel"
m.isRequest = true
m.request = consumerTag
params := getBitParams(args[offset])
m.fields = common.MapStr{
"consumer-tag": consumerTag,
"no-wait": params[0],
}
return true, true
}
func basicCancelOkMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting consumer tag in basic cancel ok")
return false, false
}
m.method = "basic.cancel-ok"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
}
return true, true
}
func basicPublishMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting exchange in basic publish")
return false, false
}
routingKey, nextOffset, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting routing key in basic publish")
return false, false
}
params := getBitParams(args[nextOffset])
m.method = "basic.publish"
m.fields = common.MapStr{
"routing-key": routingKey,
"mandatory": params[0],
"immediate": params[1],
}
// is exchange not default exchange ?
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicReturnMethod(m *amqpMessage, args []byte) (bool, bool) {
code := binary.BigEndian.Uint16(args[0:2])
if code < 300 {
//not an error or exception ? not interesting
return true, false
}
replyText, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of reply text in basic return")
return false, false
}
exchange, nextOffset, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting name of exchange in basic return")
return false, false
}
routingKey, _, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting name of routing key in basic return")
return false, false
}
m.method = "basic.return"
m.fields = common.MapStr{
"exchange": exchange,
"routing-key": routingKey,
"reply-code": code,
"reply-text": replyText,
}
return true, false
}
func basicDeliverMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, offset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Failed to get consumer tag in basic deliver")
return false, false
}
deliveryTag := binary.BigEndian.Uint64(args[offset : offset+8])
params := getBitParams(args[offset+8])
offset = offset + 9
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get exchange in basic deliver")
return false, false
}
routingKey, _, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get routing key in basic deliver")
return false, false
}
m.method = "basic.deliver"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
"delivery-tag": deliveryTag,
"redelivered": params[0],
"routing-key": routingKey,
}
// is exchange not default exchange ?
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicGetMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Failed to get queue in basic get method")
return false, false
}
m.method = "basic.get"
params := getBitParams(args[offset])
m.isRequest = true
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"no-ack": params[0],
}
return true, true
}
func basicGetOkMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
exchange, offset, err := getShortString(args, 10, uint32(args[9]))
if err {
logp.Warn("Failed to get queue in basic get-ok")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get routing key in basic get-ok")
return false, false
}
m.method = "basic.get-ok"
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"redelivered": params[0],
"routing-key": routingKey,
"message-count": binary.BigEndian.Uint32(args[offset : offset+4]),
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicGetEmptyMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "basic.get-empty"
return true, true
}
func basicAckMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
m.method = "basic.ack"
m.isRequest = true
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"multiple": params[0],
}
return true, true
}
//this is a rabbitMQ specific method
func basicNackMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
m.method = "basic.nack"
m.isRequest = true
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"multiple": params[0],
"requeue": params[1],
}
return true, true
}
func basicRejectMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
tag := binary.BigEndian.Uint64(args[0:8])
m.isRequest = true
m.method = "basic.reject"
m.fields = common.MapStr{
"delivery-tag": tag,
"multiple": params[0],
}
m.request = strconv.FormatUint(tag, 10)
return true, true
}
func basicRecoverMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[0])
m.isRequest = true
m.method = "basic.recover"
m.fields = common.MapStr{
"requeue": params[0],
}
return true, true
}
func txSelectMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.select"
return true, true
}
func txCommitMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.commit"
return true, true
}
func txRollbackMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.rollback"
return true, true
}
//simple function used when server/client responds to a sync method with no new info
func okMethod(m *amqpMessage, args []byte) (bool, bool) {
return true, true
}
// function to get a short string. It sends back an error if slice is too short
//for declared length. if length == 0, the function sends back an empty string and
//advances the offset. Otherwise, it returns the string and the new offset
func getShortString(data []byte, start uint32, length uint32) (short string, nextOffset uint32, err bool) {
if length == 0 {
return "", start, false
}
if uint32(len(data)) < start || uint32(len(data[start:])) < length {
return "", 0, true
}
return string(data[start : start+length]), start + length, false
}
//function to extract bit information in various AMQP methods
func getBitParams(bits byte) (ret [5]bool) {
if bits&16 == 16 {
ret[4] = true
}
if bits&8 == 8 {
ret[3] = true
}
if bits&4 == 4 {
ret[2] = true
}
if bits&2 == 2 {
ret[1] = true
}
if bits&1 == 1 {
ret[0] = true
}
return ret
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.0.0-rc1

搜索帮助