代码拉取完成,页面将自动刷新
package amqp
import (
"encoding/binary"
"strconv"
"strings"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
func connectionStartMethod(m *amqpMessage, args []byte) (bool, bool) {
major := args[0]
minor := args[1]
properties := make(common.MapStr)
next, err, exists := getTable(properties, args, 2)
if err {
//failed to get de peer-properties, size may be wrong, let's quit
logp.Warn("Failed to parse server properties in connection.start method")
return false, false
}
mechanisms, next, err := getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection mechanisms")
return false, false
}
locales, _, err := getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection locales")
return false, false
}
m.method = "connection.start"
m.isRequest = true
m.fields = common.MapStr{
"version-major": major,
"version-minor": minor,
"mechanisms": mechanisms,
"locales": locales,
}
//if there is a server properties table, add it
if exists {
m.fields["server-properties"] = properties
}
return true, true
}
func connectionStartOkMethod(m *amqpMessage, args []byte) (bool, bool) {
properties := make(common.MapStr)
next, err, exists := getTable(properties, args, 0)
if err {
//failed to get de peer-properties, size may be wrong, let's quit
logp.Warn("Failed to parse server properties in connection.start method")
return false, false
}
mechanism, next, err := getShortString(args, next+1, uint32(args[next]))
if err {
logp.Warn("Failed to get connection mechanism from client")
return false, false
}
_, next, err = getShortString(args, next+4, binary.BigEndian.Uint32(args[next:next+4]))
if err {
logp.Warn("Failed to get connection response from client")
return false, false
}
locale, _, err := getShortString(args, next+1, uint32(args[next]))
if err {
logp.Warn("Failed to get connection locale from client")
return false, false
}
m.isRequest = false
m.fields = common.MapStr{
"mechanism": mechanism,
"locale": locale,
}
//if there is a client properties table, add it
if exists {
m.fields["client-properties"] = properties
}
return true, true
}
func connectionTuneMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "connection.tune"
//parameters are not parsed here, they are further negotiated by the server
//in the connection.tune-ok method
return true, true
}
func connectionTuneOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.fields = common.MapStr{
"channel-max": binary.BigEndian.Uint16(args[0:2]),
"frame-max": binary.BigEndian.Uint32(args[2:6]),
"heartbeat": binary.BigEndian.Uint16(args[6:8]),
}
return true, true
}
func connectionOpenMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "connection.open"
host, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Failed to get virtual host from client")
return false, false
}
m.fields = common.MapStr{"virtual-host": host}
return true, true
}
func connectionCloseMethod(m *amqpMessage, args []byte) (bool, bool) {
err := getCloseInfo(args, m)
if err {
return false, false
}
m.method = "connection.close"
m.isRequest = true
return true, true
}
func channelOpenMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.open"
m.isRequest = true
return true, true
}
func channelFlowMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.flow"
m.isRequest = true
return true, true
}
func channelFlowOkMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[0])
m.fields = common.MapStr{"active": params[0]}
return true, true
}
func channelCloseMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "channel.close"
m.isRequest = true
err := getCloseInfo(args, m)
if err {
return false, false
}
return true, true
}
//function to fetch fields from channel close and connection close
func getCloseInfo(args []byte, m *amqpMessage) bool {
code := binary.BigEndian.Uint16(args[0:2])
m.isRequest = true
replyText, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Failed to get error reply text")
return true
}
m.fields = common.MapStr{
"reply-code": code,
"reply-text": replyText,
"class-id": binary.BigEndian.Uint16(args[nextOffset : nextOffset+2]),
"method-id": binary.BigEndian.Uint16(args[nextOffset+2 : nextOffset+4]),
}
return false
}
func queueDeclareMethod(m *amqpMessage, args []byte) (bool, bool) {
name, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue declaration")
return false, false
}
m.isRequest = true
m.method = "queue.declare"
params := getBitParams(args[offset])
m.request = name
m.fields = common.MapStr{
"queue": name,
"passive": params[0],
"durable": params[1],
"exclusive": params[2],
"auto-delete": params[3],
"no-wait": params[4],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queueDeclareOkMethod(m *amqpMessage, args []byte) (bool, bool) {
name, nextOffset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting name of queue in queue confirmation")
return false, false
}
m.method = "queue.declare-ok"
m.fields = common.MapStr{
"queue": name,
"consumer-count": binary.BigEndian.Uint32(args[nextOffset+4:]),
"message-count": binary.BigEndian.Uint32(args[nextOffset : nextOffset+4]),
}
return true, true
}
func queueBindMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
m.isRequest = true
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue bind")
return false, false
}
params := getBitParams(args[offset])
m.method = "queue.bind"
m.request = strings.Join([]string{queue, exchange}, " ")
m.fields = common.MapStr{
"queue": queue,
"routing-key": routingKey,
"no-wait": params[0],
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queueUnbindMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of queue in queue unbind")
return false, false
}
m.isRequest = true
m.method = "queue.unbind"
m.request = strings.Join([]string{queue, exchange}, " ")
m.fields = common.MapStr{
"queue": queue,
"routing-key": routingKey,
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func queuePurgeMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue purge")
return false, false
}
m.isRequest = true
params := getBitParams(args[nextOffset])
m.method = "queue.purge"
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"no-wait": params[0],
}
return true, true
}
func queuePurgeOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "queue.purge-ok"
m.fields = common.MapStr{
"message-count": binary.BigEndian.Uint32(args[0:4]),
}
return true, true
}
func queueDeleteMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in queue delete")
return false, false
}
m.isRequest = true
params := getBitParams(args[nextOffset])
m.method = "queue.delete"
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"if-unused": params[0],
"if-empty": params[1],
"no-wait": params[2],
}
return true, true
}
func queueDeleteOkMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "queue.delete-ok"
m.fields = common.MapStr{
"message-count": binary.BigEndian.Uint32(args[0:4]),
}
return true, true
}
func exchangeDeclareMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of exchange in exchange declare")
return false, false
}
exchangeType, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of routing key in exchange declare")
return false, false
}
params := getBitParams(args[offset])
m.method = "exchange.declare"
m.isRequest = true
m.request = exchange
if exchangeType == "" {
exchangeType = "direct"
}
m.fields = common.MapStr{
"exchange": exchange,
"exchange-type": exchangeType,
"passive": params[0],
"durable": params[1],
"no-wait": params[4],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func exchangeDeleteMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of exchange in exchange delete")
return false, false
}
m.method = "exchange.delete"
m.isRequest = true
params := getBitParams(args[nextOffset])
m.request = exchange
m.fields = common.MapStr{
"exchange": exchange,
"if-unused": params[0],
"no-wait": params[1],
}
return true, true
}
//this is a method exclusive to RabbitMQ
func exchangeBindMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "exchange.bind"
err := exchangeBindUnbindInfo(m, args)
if err {
return false, false
}
return true, true
}
//this is a method exclusive to RabbitMQ
func exchangeUnbindMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "exchange.unbind"
err := exchangeBindUnbindInfo(m, args)
if err {
return false, false
}
return true, true
}
func exchangeBindUnbindInfo(m *amqpMessage, args []byte) bool {
destination, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of destination in exchange bind/unbind")
return true
}
source, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of source in exchange bind/unbind")
return true
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of routing-key in exchange bind/unbind")
return true
}
m.isRequest = true
params := getBitParams(args[offset])
m.request = strings.Join([]string{source, destination}, " ")
m.fields = common.MapStr{
"destination": destination,
"source": source,
"routing-key": routingKey,
"no-wait": params[0],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return false
}
func basicQosMethod(m *amqpMessage, args []byte) (bool, bool) {
prefetchSize := binary.BigEndian.Uint32(args[0:4])
prefetchCount := binary.BigEndian.Uint16(args[4:6])
params := getBitParams(args[6])
m.isRequest = true
m.method = "basic.qos"
m.fields = common.MapStr{
"prefetch-size": prefetchSize,
"prefetch-count": prefetchCount,
"global": params[0],
}
return true, true
}
func basicConsumeMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of queue in basic consume")
return false, false
}
consumerTag, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Error getting name of consumer tag in basic consume")
return false, false
}
params := getBitParams(args[offset])
m.method = "basic.consume"
m.isRequest = true
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"consumer-tag": consumerTag,
"no-local": params[0],
"no-ack": params[1],
"exclusive": params[2],
"no-wait": params[3],
}
if args[offset+1] != frameEndOctet && m.parseArguments {
arguments := make(common.MapStr)
_, err, exists := getTable(arguments, args, offset+1)
if !err && exists {
m.fields["arguments"] = arguments
} else if err {
m.notes = append(m.notes, "Failed to parse additional arguments")
}
}
return true, true
}
func basicConsumeOkMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting name of queue in basic consume")
return false, false
}
m.method = "basic.consume-ok"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
}
return true, true
}
func basicCancelMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, offset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting consumer tag in basic cancel")
return false, false
}
m.method = "basic.cancel"
m.isRequest = true
m.request = consumerTag
params := getBitParams(args[offset])
m.fields = common.MapStr{
"consumer-tag": consumerTag,
"no-wait": params[0],
}
return true, true
}
func basicCancelOkMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, _, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Error getting consumer tag in basic cancel ok")
return false, false
}
m.method = "basic.cancel-ok"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
}
return true, true
}
func basicPublishMethod(m *amqpMessage, args []byte) (bool, bool) {
exchange, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting exchange in basic publish")
return false, false
}
routingKey, nextOffset, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting routing key in basic publish")
return false, false
}
params := getBitParams(args[nextOffset])
m.method = "basic.publish"
m.fields = common.MapStr{
"routing-key": routingKey,
"mandatory": params[0],
"immediate": params[1],
}
// is exchange not default exchange ?
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicReturnMethod(m *amqpMessage, args []byte) (bool, bool) {
code := binary.BigEndian.Uint16(args[0:2])
if code < 300 {
//not an error or exception ? not interesting
return true, false
}
replyText, nextOffset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Error getting name of reply text in basic return")
return false, false
}
exchange, nextOffset, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting name of exchange in basic return")
return false, false
}
routingKey, _, err := getShortString(args, nextOffset+1, uint32(args[nextOffset]))
if err {
logp.Warn("Error getting name of routing key in basic return")
return false, false
}
m.method = "basic.return"
m.fields = common.MapStr{
"exchange": exchange,
"routing-key": routingKey,
"reply-code": code,
"reply-text": replyText,
}
return true, false
}
func basicDeliverMethod(m *amqpMessage, args []byte) (bool, bool) {
consumerTag, offset, err := getShortString(args, 1, uint32(args[0]))
if err {
logp.Warn("Failed to get consumer tag in basic deliver")
return false, false
}
deliveryTag := binary.BigEndian.Uint64(args[offset : offset+8])
params := getBitParams(args[offset+8])
offset = offset + 9
exchange, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get exchange in basic deliver")
return false, false
}
routingKey, _, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get routing key in basic deliver")
return false, false
}
m.method = "basic.deliver"
m.fields = common.MapStr{
"consumer-tag": consumerTag,
"delivery-tag": deliveryTag,
"redelivered": params[0],
"routing-key": routingKey,
}
// is exchange not default exchange ?
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicGetMethod(m *amqpMessage, args []byte) (bool, bool) {
queue, offset, err := getShortString(args, 3, uint32(args[2]))
if err {
logp.Warn("Failed to get queue in basic get method")
return false, false
}
m.method = "basic.get"
params := getBitParams(args[offset])
m.isRequest = true
m.request = queue
m.fields = common.MapStr{
"queue": queue,
"no-ack": params[0],
}
return true, true
}
func basicGetOkMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
exchange, offset, err := getShortString(args, 10, uint32(args[9]))
if err {
logp.Warn("Failed to get queue in basic get-ok")
return false, false
}
routingKey, offset, err := getShortString(args, offset+1, uint32(args[offset]))
if err {
logp.Warn("Failed to get routing key in basic get-ok")
return false, false
}
m.method = "basic.get-ok"
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"redelivered": params[0],
"routing-key": routingKey,
"message-count": binary.BigEndian.Uint32(args[offset : offset+4]),
}
if len(exchange) > 0 {
m.fields["exchange"] = exchange
}
return true, false
}
func basicGetEmptyMethod(m *amqpMessage, args []byte) (bool, bool) {
m.method = "basic.get-empty"
return true, true
}
func basicAckMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
m.method = "basic.ack"
m.isRequest = true
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"multiple": params[0],
}
return true, true
}
//this is a rabbitMQ specific method
func basicNackMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
m.method = "basic.nack"
m.isRequest = true
m.fields = common.MapStr{
"delivery-tag": binary.BigEndian.Uint64(args[0:8]),
"multiple": params[0],
"requeue": params[1],
}
return true, true
}
func basicRejectMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[8])
tag := binary.BigEndian.Uint64(args[0:8])
m.isRequest = true
m.method = "basic.reject"
m.fields = common.MapStr{
"delivery-tag": tag,
"multiple": params[0],
}
m.request = strconv.FormatUint(tag, 10)
return true, true
}
func basicRecoverMethod(m *amqpMessage, args []byte) (bool, bool) {
params := getBitParams(args[0])
m.isRequest = true
m.method = "basic.recover"
m.fields = common.MapStr{
"requeue": params[0],
}
return true, true
}
func txSelectMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.select"
return true, true
}
func txCommitMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.commit"
return true, true
}
func txRollbackMethod(m *amqpMessage, args []byte) (bool, bool) {
m.isRequest = true
m.method = "tx.rollback"
return true, true
}
//simple function used when server/client responds to a sync method with no new info
func okMethod(m *amqpMessage, args []byte) (bool, bool) {
return true, true
}
// function to get a short string. It sends back an error if slice is too short
//for declared length. if length == 0, the function sends back an empty string and
//advances the offset. Otherwise, it returns the string and the new offset
func getShortString(data []byte, start uint32, length uint32) (short string, nextOffset uint32, err bool) {
if length == 0 {
return "", start, false
}
if uint32(len(data)) < start || uint32(len(data[start:])) < length {
return "", 0, true
}
return string(data[start : start+length]), start + length, false
}
//function to extract bit information in various AMQP methods
func getBitParams(bits byte) (ret [5]bool) {
if bits&16 == 16 {
ret[4] = true
}
if bits&8 == 8 {
ret[3] = true
}
if bits&4 == 4 {
ret[2] = true
}
if bits&2 == 2 {
ret[1] = true
}
if bits&1 == 1 {
ret[0] = true
}
return ret
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。