1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
http.go 16.47 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
package main
import (
"bytes"
"fmt"
"strconv"
"strings"
"time"
"labix.org/v2/mgo/bson"
)
const (
START = iota
FLINE
HEADERS
BODY
BODY_CHUNKED_START
BODY_CHUNKED
)
type HttpMessage struct {
Ts time.Time
hasContentLength bool
bodyOffset int
version_major uint8
version_minor uint8
connection string
transfer_encoding string
chunked_length int
chunked_body []byte
IsRequest bool
Method string
StatusCode uint16
Host string
RequestUri string
FirstLine string
TcpTuple TcpTuple
CmdlineTuple *CmdlineTuple
Direction uint8
ContentLength int
ContentType string
ReasonPhrase string
XForwardedFor string
Raw []byte
start int
end int
}
type HttpStream struct {
tcpStream *TcpStream
data []byte
parseOffset int
parseState int
bodyReceived int
message *HttpMessage
}
type HttpTransaction struct {
Type string
tuple TcpTuple
Src Endpoint
Dst Endpoint
ResponseTime int32
Ts int64
JsTs time.Time
ts time.Time
cmdline *CmdlineTuple
Http bson.M
Request_raw string
Response_raw string
timer *time.Timer
}
type Http struct {
// config
Send_request bool
Send_response bool
transactionsMap map[HashableTcpTuple]*HttpTransaction
Publisher *PublisherType
}
var HttpMod Http
func (http *Http) InitDefaults() {
http.Send_request = true
http.Send_response = true
}
func (http *Http) setFromConfig() (err error) {
if _ConfigMeta.IsDefined("protocols", "http", "send_request") {
http.Send_request = _Config.Protocols["thrift"].Send_request
}
if _ConfigMeta.IsDefined("protocols", "http", "send_response") {
http.Send_response = _Config.Protocols["thrift"].Send_response
}
return nil
}
func (http *Http) Init(test_mode bool) error {
http.InitDefaults()
if !test_mode {
err := http.setFromConfig()
if err != nil {
return err
}
}
http.transactionsMap = make(map[HashableTcpTuple]*HttpTransaction, TransactionsHashSize)
if !test_mode {
http.Publisher = &Publisher
}
return nil
}
func parseVersion(s []byte) (uint8, uint8, error) {
if len(s) < 3 {
return 0, 0, MsgError("Invalid version")
}
major, _ := strconv.Atoi(string(s[0]))
minor, _ := strconv.Atoi(string(s[2]))
return uint8(major), uint8(minor), nil
}
func parseResponseStatus(s []byte) (uint16, string, error) {
DEBUG("http", "parseResponseStatus: %s", s)
p := bytes.Index(s, []byte(" "))
if p == -1 {
return 0, "", MsgError("Not beeing able to identify status code")
}
status_code, _ := strconv.Atoi(string(s[0:p]))
p = bytes.LastIndex(s, []byte(" "))
if p == -1 {
return uint16(status_code), "", MsgError("Not beeing able to identify status code")
}
reason_phrase := s[p+1:]
return uint16(status_code), string(reason_phrase), nil
}
func httpParseHeader(m *HttpMessage, data []byte) (bool, bool, int) {
i := bytes.Index(data, []byte(":"))
if i == -1 {
// Expected \":\" in headers. Assuming incomplete"
return true, false, 0
}
DEBUG("httpdetailed", "Data: %s", data)
DEBUG("httpdetailed", "Header: %s", data[:i])
// skip folding line
for p := i + 1; p < len(data); {
q := bytes.Index(data[p:], []byte("\r\n"))
if q == -1 {
// Assuming incomplete
return true, false, 0
}
p += q
DEBUG("httpdetailed", "HV: %s\n", data[i+1:p])
if len(data) > p && (data[p+1] == ' ' || data[p+1] == '\t') {
p = p + 2
} else {
if bytes.Equal(bytes.ToLower(data[:i]), []byte("host")) {
m.Host = string(bytes.Trim(data[i+1:p], " \t"))
} else if bytes.Equal(bytes.ToLower(data[:i]), []byte("content-length")) {
m.ContentLength, _ = strconv.Atoi(string(bytes.Trim(data[i+1:p], " \t")))
m.hasContentLength = true
} else if bytes.Equal(bytes.ToLower(data[:i]), []byte("content-type")) {
m.ContentType = string(bytes.Trim(data[i+1:p], " \t"))
} else if bytes.Equal(bytes.ToLower(data[:i]), []byte("connection")) {
m.connection = string(bytes.Trim(data[i+1:p], " \t"))
} else if bytes.Equal(bytes.ToLower(data[:i]), []byte("transfer-encoding")) {
m.transfer_encoding = string(bytes.Trim(data[i+1:p], " \t"))
} else if bytes.Equal(bytes.ToLower(data[:i]), []byte("x-forwarded-for")) {
m.XForwardedFor = string(bytes.Trim(data[i+1:p], " \t"))
}
return true, true, p + 2
}
}
return true, false, len(data)
}
func httpMessageParser(s *HttpStream) (bool, bool) {
var cont, ok, complete bool
m := s.message
DEBUG("http", "Stream state=%d", s.parseState)
for s.parseOffset < len(s.data) {
switch s.parseState {
case START:
m.start = s.parseOffset
i := bytes.Index(s.data, []byte("\r\n"))
if i == -1 {
return true, false
}
// Very basic tests on the first line. Just to check that
// we have what looks as an HTTP message
var version []byte
var err error
fline := s.data[s.parseOffset:i]
if len(fline) < 8 {
DEBUG("http", "First line too small")
return false, false
}
if bytes.Equal(fline[0:5], []byte("HTTP/")) {
//RESPONSE
m.IsRequest = false
version = fline[5:8]
m.StatusCode, m.ReasonPhrase, err = parseResponseStatus(fline[9:])
if err != nil {
WARN("Failed to understand HTTP response status: %s", fline[9:])
return false, false
}
DEBUG("http", "HTTP status_code=%d, reason_phrase=%s", m.StatusCode, m.ReasonPhrase)
} else {
// REQUEST
slices := bytes.Fields(fline)
if len(slices) != 3 {
DEBUG("http", "Couldn't understand HTTP request: %s", fline)
return false, false
}
m.Method = string(slices[0])
m.RequestUri = string(slices[1])
if bytes.Equal(slices[2][:5], []byte("HTTP/")) {
m.IsRequest = true
version = slices[2][5:]
m.FirstLine = string(fline)
} else {
DEBUG("http", "Couldn't understand HTTP version: %s", fline)
return false, false
}
DEBUG("http", "HTTP Method=%s, RequestUri=%s", m.Method, m.RequestUri)
}
m.version_major, m.version_minor, err = parseVersion(version)
if err != nil {
DEBUG("http", "Failed to understand HTTP version: %s", version)
m.version_major = 1
m.version_minor = 0
}
DEBUG("http", "HTTP version %d.%d", m.version_major, m.version_minor)
// ok so far
s.parseOffset = i + 2
s.parseState = HEADERS
case HEADERS:
if len(s.data)-s.parseOffset >= 2 &&
bytes.Equal(s.data[s.parseOffset:s.parseOffset+2], []byte("\r\n")) {
// EOH
s.parseOffset += 2
m.bodyOffset = s.parseOffset
if m.ContentLength == 0 {
if m.version_major == 1 && m.version_minor == 0 &&
!m.hasContentLength {
if m.IsRequest {
// No Content-Length in a HTTP/1.0 request means
// there is no body
m.end = s.parseOffset
return true, true
} else {
// Read until FIN
}
} else if m.connection == "close" {
// Connection: close -> read until FIN
} else if !m.hasContentLength && m.transfer_encoding == "chunked" {
// support for HTTP/1.1 Chunked transfer
s.parseState = BODY_CHUNKED_START
continue
} else {
m.end = s.parseOffset
return true, true
}
}
s.parseState = BODY
} else {
ok, hfcomplete, offset := httpParseHeader(m, s.data[s.parseOffset:])
if !ok {
return false, false
}
if !hfcomplete {
return true, false
}
s.parseOffset += offset
}
case BODY:
DEBUG("http", "eat body: %d", s.parseOffset)
if !m.hasContentLength && m.connection == "close" {
// HTTP/1.0 no content length. Add until the end of the connection
DEBUG("http", "close connection, %d", len(s.data)-s.parseOffset)
s.bodyReceived += (len(s.data) - s.parseOffset)
m.ContentLength += (len(s.data) - s.parseOffset)
s.parseOffset = len(s.data)
return true, false
} else if len(s.data[s.parseOffset:]) >= m.ContentLength-s.bodyReceived {
s.parseOffset += (m.ContentLength - s.bodyReceived)
m.end = s.parseOffset
return true, true
} else {
s.bodyReceived += (len(s.data) - s.parseOffset)
s.parseOffset = len(s.data)
return true, false
}
case BODY_CHUNKED_START:
cont, ok, complete = state_body_chunked_start(s, m)
if !cont {
return ok, complete
}
case BODY_CHUNKED:
cont, ok, complete = state_body_chunked(s, m)
if !cont {
return ok, complete
}
}
}
return true, false
}
func state_body_chunked_start(s *HttpStream, m *HttpMessage) (cont bool, ok bool, complete bool) {
// read hexa length
i := bytes.Index(s.data[s.parseOffset:], []byte("\r\n"))
if i == -1 {
return false, true, false
}
line := string(s.data[s.parseOffset : s.parseOffset+i])
_, err := fmt.Sscanf(line, "%x", &m.chunked_length)
if err != nil {
WARN("Failed to understand chunked body start line")
return false, false, false
}
s.parseOffset += i + 2 //+ \r\n
if m.chunked_length == 0 {
s.parseOffset += 2 // final \r\n
m.end = s.parseOffset
return false, true, true
}
s.bodyReceived = 0
s.parseState = BODY_CHUNKED
return true, false, false
}
func state_body_chunked(s *HttpStream, m *HttpMessage) (cont bool, ok bool, complete bool) {
if len(s.data[s.parseOffset:]) >= m.chunked_length-s.bodyReceived+2 /*\r\n*/ {
// Received more data than expected
m.chunked_body = append(m.chunked_body, s.data[s.parseOffset:s.parseOffset+m.chunked_length-s.bodyReceived]...)
s.parseOffset += (m.chunked_length - s.bodyReceived + 2 /*\r\n*/)
m.ContentLength += m.chunked_length
s.parseState = BODY_CHUNKED_START
return true, false, false
} else {
if len(s.data[s.parseOffset:]) >= m.chunked_length-s.bodyReceived {
// we need need to wait for the +2, else we can crash on next call
return false, true, false
}
// Received less data than expected
m.chunked_body = append(m.chunked_body, s.data[s.parseOffset:]...)
s.bodyReceived += (len(s.data) - s.parseOffset)
s.parseOffset = len(s.data)
return false, true, false
}
return true, false, false
}
func (stream *HttpStream) PrepareForNewMessage() {
stream.data = stream.data[stream.message.end:]
stream.parseState = START
stream.parseOffset = 0
stream.bodyReceived = 0
stream.message = nil
}
func (http *Http) Parse(pkt *Packet, tcp *TcpStream, dir uint8) {
defer RECOVER("ParseHttp exception")
DEBUG("http", "Payload received: [%s]", pkt.payload)
if tcp.httpData[dir] == nil {
tcp.httpData[dir] = &HttpStream{
tcpStream: tcp,
data: pkt.payload,
message: &HttpMessage{Ts: pkt.ts},
}
} else {
// concatenate bytes
tcp.httpData[dir].data = append(tcp.httpData[dir].data, pkt.payload...)
if len(tcp.httpData[dir].data) > TCP_MAX_DATA_IN_STREAM {
DEBUG("http", "Stream data too large, dropping TCP stream")
tcp.httpData[dir] = nil
return
}
}
stream := tcp.httpData[dir]
if stream.message == nil {
stream.message = &HttpMessage{Ts: pkt.ts}
}
ok, complete := httpMessageParser(stream)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
tcp.httpData[dir] = nil
return
}
if complete {
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
censorPasswords(stream.message, msg)
http.handleHttp(stream.message, tcp, dir, msg)
// and reset message
stream.PrepareForNewMessage()
}
}
func (http *Http) ReceivedFin(tcp *TcpStream, dir uint8) {
if tcp.httpData[dir] == nil {
return
}
stream := tcp.httpData[dir]
// send whatever data we got so far as complete. This
// is needed for the HTTP/1.0 without Content-Length situation.
if stream.message != nil &&
len(stream.data[stream.message.start:]) > 0 {
DEBUG("httpdetailed", "Publish something on connection FIN")
msg := stream.data[stream.message.start:]
censorPasswords(stream.message, msg)
http.handleHttp(stream.message, tcp, dir, msg)
// and reset message. Probably not needed, just to be sure.
stream.PrepareForNewMessage()
}
}
func (http *Http) handleHttp(m *HttpMessage, tcp *TcpStream,
dir uint8, raw_msg []byte) {
m.TcpTuple = TcpTupleFromIpPort(tcp.tuple, tcp.id)
m.Direction = dir
m.CmdlineTuple = procWatcher.FindProcessesTuple(tcp.tuple)
m.Raw = raw_msg
if m.IsRequest {
http.receivedHttpRequest(m)
} else {
http.receivedHttpResponse(m)
}
}
func (http *Http) receivedHttpRequest(msg *HttpMessage) {
trans := http.transactionsMap[msg.TcpTuple.raw]
if trans != nil {
if len(trans.Http) != 0 {
WARN("Two requests without a response. Dropping old request")
}
} else {
trans = &HttpTransaction{Type: "http", tuple: msg.TcpTuple}
http.transactionsMap[msg.TcpTuple.raw] = trans
}
DEBUG("http", "Received request with tuple: %s", msg.TcpTuple)
trans.ts = msg.Ts
trans.Ts = int64(trans.ts.UnixNano() / 1000)
trans.JsTs = msg.Ts
trans.Src = Endpoint{
Ip: msg.TcpTuple.Src_ip.String(),
Port: msg.TcpTuple.Src_port,
Proc: string(msg.CmdlineTuple.Src),
}
trans.Dst = Endpoint{
Ip: msg.TcpTuple.Dst_ip.String(),
Port: msg.TcpTuple.Dst_port,
Proc: string(msg.CmdlineTuple.Dst),
}
if msg.Direction == TcpDirectionReverse {
trans.Src, trans.Dst = trans.Dst, trans.Src
}
// save Raw message
if http.Send_request {
trans.Request_raw = string(cutMessageBody(msg))
}
trans.Http = bson.M{
"host": msg.Host,
"request": bson.M{
"method": msg.Method,
"uri": msg.RequestUri,
"uri.raw": msg.RequestUri,
"line": msg.FirstLine,
"line.raw": msg.FirstLine,
"x-forwarded-for": msg.XForwardedFor,
},
}
if trans.timer != nil {
trans.timer.Stop()
}
trans.timer = time.AfterFunc(TransactionTimeout, func() { http.expireTransaction(trans) })
}
func (http *Http) expireTransaction(trans *HttpTransaction) {
// remove from map
delete(http.transactionsMap, trans.tuple.raw)
}
func (http *Http) receivedHttpResponse(msg *HttpMessage) {
// we need to search the request first.
tuple := msg.TcpTuple
DEBUG("http", "Received response with tuple: %s", tuple)
trans := http.transactionsMap[tuple.raw]
if trans == nil {
WARN("Response from unknown transaction. Ignoring: %v", tuple)
return
}
if len(trans.Http) == 0 {
WARN("Response without a known request. Ignoring.")
return
}
trans.Http = bson_concat(trans.Http, bson.M{
"content_length": msg.ContentLength,
"content_type": msg.ContentType,
"response": bson.M{
"code": msg.StatusCode,
"phrase": msg.ReasonPhrase,
},
})
trans.ResponseTime = int32(msg.Ts.Sub(trans.ts).Nanoseconds() / 1e6) // resp_time in milliseconds
// save Raw message
if http.Send_response {
trans.Response_raw = string(cutMessageBody(msg))
}
err := http.PublishTransaction(trans)
if err != nil {
WARN("Publish failure: %s", err)
}
DEBUG("http", "HTTP transaction completed: %s -> %s\n", trans.Http["request"],
trans.Http["response"])
// remove from map
delete(http.transactionsMap, trans.tuple.raw)
if trans.timer != nil {
trans.timer.Stop()
}
}
func (http *Http) PublishTransaction(t *HttpTransaction) error {
if http.Publisher == nil {
return nil
}
event := Event{}
event.Type = "http"
response := t.Http["response"].(bson.M)
code := response["code"].(uint16)
if code < 400 {
event.Status = OK_STATUS
} else {
event.Status = ERROR_STATUS
}
event.ResponseTime = t.ResponseTime
if http.Send_request {
event.RequestRaw = t.Request_raw
}
if http.Send_response {
event.ResponseRaw = t.Response_raw
}
event.Http = t.Http
return http.Publisher.PublishEvent(t.ts, &t.Src, &t.Dst, &event)
}
func cutMessageBody(m *HttpMessage) []byte {
raw_msg_cut := []byte{}
// add headers always
raw_msg_cut = m.Raw[:m.bodyOffset]
// add body
if len(m.ContentType) == 0 || shouldIncludeInBody(m.ContentType) {
if len(m.chunked_body) > 0 {
raw_msg_cut = append(raw_msg_cut, m.chunked_body...)
} else {
raw_msg_cut = append(raw_msg_cut, m.Raw[m.bodyOffset:]...)
}
}
return raw_msg_cut
}
func shouldIncludeInBody(contenttype string) bool {
return strings.Contains(contenttype, "form-urlencoded") ||
strings.Contains(contenttype, "json")
}
func censorPasswords(m *HttpMessage, msg []byte) {
keywords := _Config.Passwords.Hide_keywords
if m.IsRequest && m.ContentLength > 0 &&
strings.Contains(m.ContentType, "urlencoded") {
for _, keyword := range keywords {
index := bytes.Index(msg[m.bodyOffset:], []byte(keyword))
if index > 0 {
start_index := m.bodyOffset + index + len(keyword)
end_index := bytes.IndexAny(msg[m.bodyOffset+index+len(keyword):], "& \r\n")
if end_index > 0 {
end_index += m.bodyOffset + index
if end_index > m.end {
end_index = m.end
}
} else {
end_index = m.end
}
if end_index-start_index < 120 {
for i := start_index; i < end_index; i++ {
msg[i] = byte('*')
}
}
}
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v0.4.1

搜索帮助