1 Star 0 Fork 0

sqos/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
redis.go 7.65 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2016-11-14 14:50 . More Packetbeat cleanups (#2972)
package redis
import (
"bytes"
"expvar"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
)
type stream struct {
applayer.Stream
parser parser
tcptuple *common.TCPTuple
}
type redisConnectionData struct {
streams [2]*stream
requests messageList
responses messageList
}
type messageList struct {
head, tail *redisMessage
}
// Redis protocol plugin
type redisPlugin struct {
// config
ports []int
sendRequest bool
sendResponse bool
transactionTimeout time.Duration
results publish.Transactions
}
var (
debugf = logp.MakeDebug("redis")
isDebug = false
)
var (
unmatchedResponses = expvar.NewInt("redis.unmatched_responses")
)
func init() {
protos.Register("redis", New)
}
func New(
testMode bool,
results publish.Transactions,
cfg *common.Config,
) (protos.Plugin, error) {
p := &redisPlugin{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}
if err := p.init(results, &config); err != nil {
return nil, err
}
return p, nil
}
func (redis *redisPlugin) init(results publish.Transactions, config *redisConfig) error {
redis.setFromConfig(config)
redis.results = results
isDebug = logp.IsDebug("redis")
return nil
}
func (redis *redisPlugin) setFromConfig(config *redisConfig) {
redis.ports = config.Ports
redis.sendRequest = config.SendRequest
redis.sendResponse = config.SendResponse
redis.transactionTimeout = config.TransactionTimeout
}
func (redis *redisPlugin) GetPorts() []int {
return redis.ports
}
func (s *stream) PrepareForNewMessage() {
parser := &s.parser
s.Stream.Reset()
parser.reset()
}
func (redis *redisPlugin) ConnectionTimeout() time.Duration {
return redis.transactionTimeout
}
func (redis *redisPlugin) Parse(
pkt *protos.Packet,
tcptuple *common.TCPTuple,
dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
defer logp.Recover("ParseRedis exception")
conn := ensureRedisConnection(private)
conn = redis.doParse(conn, pkt, tcptuple, dir)
if conn == nil {
return nil
}
return conn
}
func ensureRedisConnection(private protos.ProtocolData) *redisConnectionData {
if private == nil {
return &redisConnectionData{}
}
priv, ok := private.(*redisConnectionData)
if !ok {
logp.Warn("redis connection data type error, create new one")
return &redisConnectionData{}
}
if priv == nil {
logp.Warn("Unexpected: redis connection data not set, create new one")
return &redisConnectionData{}
}
return priv
}
func (redis *redisPlugin) doParse(
conn *redisConnectionData,
pkt *protos.Packet,
tcptuple *common.TCPTuple,
dir uint8,
) *redisConnectionData {
st := conn.streams[dir]
if st == nil {
st = newStream(pkt.Ts, tcptuple)
conn.streams[dir] = st
if isDebug {
debugf("new stream: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload))
}
}
if err := st.Append(pkt.Payload); err != nil {
if isDebug {
debugf("%v, dropping TCP stream: ", err)
}
return nil
}
if isDebug {
debugf("stream add data: %p (dir=%v, len=%v)", st, dir, len(pkt.Payload))
}
for st.Buf.Len() > 0 {
if st.parser.message == nil {
st.parser.message = newMessage(pkt.Ts)
}
ok, complete := st.parser.parse(&st.Buf)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
conn.streams[dir] = nil
if isDebug {
debugf("Ignore Redis message. Drop tcp stream. Try parsing with the next segment")
}
return conn
}
if !complete {
// wait for more data
break
}
msg := st.parser.message
if isDebug {
if msg.isRequest {
debugf("REDIS (%p) request message: %s", conn, msg.message)
} else {
debugf("REDIS (%p) response message: %s", conn, msg.message)
}
}
// all ok, go to next level and reset stream for new message
redis.handleRedis(conn, msg, tcptuple, dir)
st.PrepareForNewMessage()
}
return conn
}
func newStream(ts time.Time, tcptuple *common.TCPTuple) *stream {
s := &stream{
tcptuple: tcptuple,
}
s.parser.message = newMessage(ts)
s.Stream.Init(tcp.TCPMaxDataInStream)
return s
}
func newMessage(ts time.Time) *redisMessage {
return &redisMessage{ts: ts}
}
func (redis *redisPlugin) handleRedis(
conn *redisConnectionData,
m *redisMessage,
tcptuple *common.TCPTuple,
dir uint8,
) {
m.tcpTuple = *tcptuple
m.direction = dir
m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort())
if m.isRequest {
conn.requests.append(m) // wait for response
} else {
conn.responses.append(m)
redis.correlate(conn)
}
}
func (redis *redisPlugin) correlate(conn *redisConnectionData) {
// drop responses with missing requests
if conn.requests.empty() {
for !conn.responses.empty() {
debugf("Response from unknown transaction. Ignoring")
unmatchedResponses.Add(1)
conn.responses.pop()
}
return
}
// merge requests with responses into transactions
for !conn.responses.empty() && !conn.requests.empty() {
requ := conn.requests.pop()
resp := conn.responses.pop()
if redis.results != nil {
event := redis.newTransaction(requ, resp)
redis.results.PublishTransaction(event)
}
}
}
func (redis *redisPlugin) newTransaction(requ, resp *redisMessage) common.MapStr {
error := common.OK_STATUS
if resp.isError {
error = common.ERROR_STATUS
}
var returnValue map[string]common.NetString
if resp.isError {
returnValue = map[string]common.NetString{
"error": resp.message,
}
} else {
returnValue = map[string]common.NetString{
"return_value": resp.message,
}
}
src := &common.Endpoint{
IP: requ.tcpTuple.SrcIP.String(),
Port: requ.tcpTuple.SrcPort,
Proc: string(requ.cmdlineTuple.Src),
}
dst := &common.Endpoint{
IP: requ.tcpTuple.DstIP.String(),
Port: requ.tcpTuple.DstPort,
Proc: string(requ.cmdlineTuple.Dst),
}
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
}
// resp_time in milliseconds
responseTime := int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)
event := common.MapStr{
"@timestamp": common.Time(requ.ts),
"type": "redis",
"status": error,
"responsetime": responseTime,
"redis": returnValue,
"method": common.NetString(bytes.ToUpper(requ.method)),
"resource": requ.path,
"query": requ.message,
"bytes_in": uint64(requ.size),
"bytes_out": uint64(resp.size),
"src": src,
"dst": dst,
}
if redis.sendRequest {
event["request"] = requ.message
}
if redis.sendResponse {
event["response"] = resp.message
}
return event
}
func (redis *redisPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
// tsg: being packet loss tolerant is probably not very useful for Redis,
// because most requests/response tend to fit in a single packet.
return private, true
}
func (redis *redisPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {
// TODO: check if we have pending data that we can send up the stack
return private
}
func (ml *messageList) append(msg *redisMessage) {
if ml.tail == nil {
ml.head = msg
} else {
ml.tail.next = msg
}
msg.next = nil
ml.tail = msg
}
func (ml *messageList) empty() bool {
return ml.head == nil
}
func (ml *messageList) pop() *redisMessage {
if ml.head == nil {
return nil
}
msg := ml.head
ml.head = ml.head.next
if ml.head == nil {
ml.tail = nil
}
return msg
}
func (ml *messageList) last() *redisMessage {
return ml.tail
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/sqos/beats.git
git@gitee.com:sqos/beats.git
sqos
beats
beats
v5.5.2

搜索帮助

A270a887 8829481 3d7a4017 8829481