代码拉取完成,页面将自动刷新
package http
import (
	"bytes"
	"expvar"
	"fmt"
	"net/url"
	"strings"
	"time"
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/logp"
	"github.com/elastic/beats/packetbeat/procs"
	"github.com/elastic/beats/packetbeat/protos"
	"github.com/elastic/beats/packetbeat/protos/tcp"
	"github.com/elastic/beats/packetbeat/publish"
)
var debugf = logp.MakeDebug("http")
var detailedf = logp.MakeDebug("httpdetailed")
type parserState uint8
const (
	stateStart parserState = iota
	stateFLine
	stateHeaders
	stateBody
	stateBodyChunkedStart
	stateBodyChunked
	stateBodyChunkedWaitFinalCRLF
)
var (
	unmatchedResponses = expvar.NewInt("http.unmatched_responses")
)
type stream struct {
	tcptuple *common.TCPTuple
	data []byte
	parseOffset  int
	parseState   parserState
	bodyReceived int
	message *message
}
type httpConnectionData struct {
	streams   [2]*stream
	requests  messageList
	responses messageList
}
type messageList struct {
	head, tail *message
}
// HTTP application level protocol analyser plugin.
type httpPlugin struct {
	// config
	ports               []int
	sendRequest         bool
	sendResponse        bool
	splitCookie         bool
	hideKeywords        []string
	redactAuthorization bool
	includeBodyFor      []string
	maxMessageSize      int
	parserConfig parserConfig
	transactionTimeout time.Duration
	results publish.Transactions
}
var (
	isDebug    = false
	isDetailed = false
)
func init() {
	protos.Register("http", New)
}
func New(
	testMode bool,
	results publish.Transactions,
	cfg *common.Config,
) (protos.Plugin, error) {
	p := &httpPlugin{}
	config := defaultConfig
	if !testMode {
		if err := cfg.Unpack(&config); err != nil {
			return nil, err
		}
	}
	if err := p.init(results, &config); err != nil {
		return nil, err
	}
	return p, nil
}
// Init initializes the HTTP protocol analyser.
func (http *httpPlugin) init(results publish.Transactions, config *httpConfig) error {
	http.setFromConfig(config)
	isDebug = logp.IsDebug("http")
	isDetailed = logp.IsDebug("httpdetailed")
	http.results = results
	return nil
}
func (http *httpPlugin) setFromConfig(config *httpConfig) {
	http.ports = config.Ports
	http.sendRequest = config.SendRequest
	http.sendResponse = config.SendResponse
	http.hideKeywords = config.HideKeywords
	http.redactAuthorization = config.RedactAuthorization
	http.splitCookie = config.SplitCookie
	http.parserConfig.realIPHeader = strings.ToLower(config.RealIPHeader)
	http.transactionTimeout = config.TransactionTimeout
	http.includeBodyFor = config.IncludeBodyFor
	http.maxMessageSize = config.MaxMessageSize
	if config.SendAllHeaders {
		http.parserConfig.sendHeaders = true
		http.parserConfig.sendAllHeaders = true
	} else {
		if len(config.SendHeaders) > 0 {
			http.parserConfig.sendHeaders = true
			http.parserConfig.headersWhitelist = map[string]bool{}
			for _, hdr := range config.SendHeaders {
				http.parserConfig.headersWhitelist[strings.ToLower(hdr)] = true
			}
		}
	}
}
// GetPorts lists the port numbers the HTTP protocol analyser will handle.
func (http *httpPlugin) GetPorts() []int {
	return http.ports
}
// messageGap is called when a gap of size `nbytes` is found in the
// tcp stream. Decides if we can ignore the gap or it's a parser error
// and we need to drop the stream.
func (http *httpPlugin) messageGap(s *stream, nbytes int) (ok bool, complete bool) {
	m := s.message
	switch s.parseState {
	case stateStart, stateHeaders:
		// we know we cannot recover from these
		return false, false
	case stateBody:
		if isDebug {
			debugf("gap in body: %d", nbytes)
		}
		if m.isRequest {
			m.notes = append(m.notes, "Packet loss while capturing the request")
		} else {
			m.notes = append(m.notes, "Packet loss while capturing the response")
		}
		if !m.hasContentLength && (bytes.Equal(m.connection, constClose) ||
			(isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) {
			s.bodyReceived += nbytes
			m.contentLength += nbytes
			return true, false
		} else if len(s.data[s.parseOffset:])+nbytes >= m.contentLength-s.bodyReceived {
			// we're done, but the last portion of the data is gone
			m.end = s.parseOffset
			return true, true
		} else {
			s.bodyReceived += nbytes
			return true, false
		}
	}
	// assume we cannot recover
	return false, false
}
func (st *stream) PrepareForNewMessage() {
	st.data = st.data[st.message.end:]
	st.parseState = stateStart
	st.parseOffset = 0
	st.bodyReceived = 0
	st.message = nil
}
// Called when the parser has identified the boundary
// of a message.
func (http *httpPlugin) messageComplete(
	conn *httpConnectionData,
	tcptuple *common.TCPTuple,
	dir uint8,
	st *stream,
) {
	st.message.raw = st.data[st.message.start:st.message.end]
	http.handleHTTP(conn, st.message, tcptuple, dir)
}
// ConnectionTimeout returns the configured HTTP transaction timeout.
func (http *httpPlugin) ConnectionTimeout() time.Duration {
	return http.transactionTimeout
}
// Parse function is used to process TCP payloads.
func (http *httpPlugin) Parse(
	pkt *protos.Packet,
	tcptuple *common.TCPTuple,
	dir uint8,
	private protos.ProtocolData,
) protos.ProtocolData {
	defer logp.Recover("ParseHttp exception")
	conn := ensureHTTPConnection(private)
	conn = http.doParse(conn, pkt, tcptuple, dir)
	if conn == nil {
		return nil
	}
	return conn
}
func ensureHTTPConnection(private protos.ProtocolData) *httpConnectionData {
	conn := getHTTPConnection(private)
	if conn == nil {
		conn = &httpConnectionData{}
	}
	return conn
}
func getHTTPConnection(private protos.ProtocolData) *httpConnectionData {
	if private == nil {
		return nil
	}
	priv, ok := private.(*httpConnectionData)
	if !ok {
		logp.Warn("http connection data type error")
		return nil
	}
	if priv == nil {
		logp.Warn("Unexpected: http connection data not set")
		return nil
	}
	return priv
}
// Parse function is used to process TCP payloads.
func (http *httpPlugin) doParse(
	conn *httpConnectionData,
	pkt *protos.Packet,
	tcptuple *common.TCPTuple,
	dir uint8,
) *httpConnectionData {
	if isDetailed {
		detailedf("Payload received: [%s]", pkt.Payload)
	}
	extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes
	st := conn.streams[dir]
	if st == nil {
		st = newStream(pkt, tcptuple)
		conn.streams[dir] = st
	} else {
		// concatenate bytes
		if len(st.data)+len(pkt.Payload) > http.maxMessageSize {
			if isDebug {
				debugf("Stream data too large, ignoring message")
			}
			extraMsgSize = len(pkt.Payload)
		} else {
			st.data = append(st.data, pkt.Payload...)
		}
	}
	for len(st.data) > 0 {
		if st.message == nil {
			st.message = &message{ts: pkt.Ts}
		}
		parser := newParser(&http.parserConfig)
		ok, complete := parser.parse(st, extraMsgSize)
		if !ok {
			// drop this tcp stream. Will retry parsing with the next
			// segment in it
			conn.streams[dir] = nil
			return conn
		}
		if !complete {
			// wait for more data
			break
		}
		// all ok, ship it
		http.messageComplete(conn, tcptuple, dir, st)
		// and reset stream for next message
		st.PrepareForNewMessage()
	}
	return conn
}
func newStream(pkt *protos.Packet, tcptuple *common.TCPTuple) *stream {
	return &stream{
		tcptuple: tcptuple,
		data:     pkt.Payload,
		message:  &message{ts: pkt.Ts},
	}
}
// ReceivedFin will be called when TCP transaction is terminating.
func (http *httpPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8,
	private protos.ProtocolData) protos.ProtocolData {
	debugf("Received FIN")
	conn := getHTTPConnection(private)
	if conn == nil {
		return private
	}
	stream := conn.streams[dir]
	if stream == nil {
		return conn
	}
	// send whatever data we got so far as complete. This
	// is needed for the HTTP/1.0 without Content-Length situation.
	if stream.message != nil && len(stream.data[stream.message.start:]) > 0 {
		stream.message.raw = stream.data[stream.message.start:]
		http.handleHTTP(conn, stream.message, tcptuple, dir)
		// and reset message. Probably not needed, just to be sure.
		stream.PrepareForNewMessage()
	}
	return conn
}
// GapInStream is called when a gap of nbytes bytes is found in the stream (due
// to packet loss).
func (http *httpPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
	nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
	defer logp.Recover("GapInStream(http) exception")
	conn := getHTTPConnection(private)
	if conn == nil {
		return private, false
	}
	stream := conn.streams[dir]
	if stream == nil || stream.message == nil {
		// nothing to do
		return private, false
	}
	ok, complete := http.messageGap(stream, nbytes)
	if isDetailed {
		detailedf("messageGap returned ok=%v complete=%v", ok, complete)
	}
	if !ok {
		// on errors, drop stream
		conn.streams[dir] = nil
		return conn, true
	}
	if complete {
		// Current message is complete, we need to publish from here
		http.messageComplete(conn, tcptuple, dir, stream)
	}
	// don't drop the stream, we can ignore the gap
	return private, false
}
func (http *httpPlugin) handleHTTP(
	conn *httpConnectionData,
	m *message,
	tcptuple *common.TCPTuple,
	dir uint8,
) {
	m.tcpTuple = *tcptuple
	m.direction = dir
	m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort())
	http.hideHeaders(m)
	if m.isRequest {
		if isDebug {
			debugf("Received request with tuple: %s", m.tcpTuple)
		}
		conn.requests.append(m)
	} else {
		if isDebug {
			debugf("Received response with tuple: %s", m.tcpTuple)
		}
		conn.responses.append(m)
		http.correlate(conn)
	}
}
func (http *httpPlugin) correlate(conn *httpConnectionData) {
	// drop responses with missing requests
	if conn.requests.empty() {
		for !conn.responses.empty() {
			debugf("Response from unknown transaction. Ingoring.")
			unmatchedResponses.Add(1)
			conn.responses.pop()
		}
		return
	}
	// merge requests with responses into transactions
	for !conn.responses.empty() && !conn.requests.empty() {
		requ := conn.requests.pop()
		resp := conn.responses.pop()
		trans := http.newTransaction(requ, resp)
		if isDebug {
			debugf("HTTP transaction completed")
		}
		http.publishTransaction(trans)
	}
}
func (http *httpPlugin) newTransaction(requ, resp *message) common.MapStr {
	status := common.OK_STATUS
	if resp.statusCode >= 400 {
		status = common.ERROR_STATUS
	}
	// resp_time in milliseconds
	responseTime := int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)
	path, params, err := http.extractParameters(requ, requ.raw)
	if err != nil {
		logp.Warn("Fail to parse HTTP parameters: %v", err)
	}
	src := common.Endpoint{
		IP:   requ.tcpTuple.SrcIP.String(),
		Port: requ.tcpTuple.SrcPort,
		Proc: string(requ.cmdlineTuple.Src),
	}
	dst := common.Endpoint{
		IP:   requ.tcpTuple.DstIP.String(),
		Port: requ.tcpTuple.DstPort,
		Proc: string(requ.cmdlineTuple.Dst),
	}
	if requ.direction == tcp.TCPDirectionReverse {
		src, dst = dst, src
	}
	httpDetails := common.MapStr{
		"request": common.MapStr{
			"params":  params,
			"headers": http.collectHeaders(requ),
		},
		"response": common.MapStr{
			"code":    resp.statusCode,
			"phrase":  resp.statusPhrase,
			"headers": http.collectHeaders(resp),
		},
	}
	http.setBody(httpDetails["request"].(common.MapStr), requ)
	http.setBody(httpDetails["response"].(common.MapStr), resp)
	event := common.MapStr{
		"@timestamp":   common.Time(requ.ts),
		"type":         "http",
		"status":       status,
		"responsetime": responseTime,
		"method":       requ.method,
		"path":         path,
		"query":        fmt.Sprintf("%s %s", requ.method, path),
		"http":         httpDetails,
		"bytes_out":    resp.size,
		"bytes_in":     requ.size,
		"src":          &src,
		"dst":          &dst,
	}
	if http.sendRequest {
		event["request"] = string(http.cutMessageBody(requ))
	}
	if http.sendResponse {
		event["response"] = string(http.cutMessageBody(resp))
	}
	if len(requ.notes)+len(resp.notes) > 0 {
		event["notes"] = append(requ.notes, resp.notes...)
	}
	if len(requ.realIP) > 0 {
		event["real_ip"] = requ.realIP
	}
	return event
}
func (http *httpPlugin) publishTransaction(event common.MapStr) {
	if http.results == nil {
		return
	}
	http.results.PublishTransaction(event)
}
func (http *httpPlugin) collectHeaders(m *message) interface{} {
	hdrs := map[string]interface{}{}
	hdrs["content-length"] = m.contentLength
	if len(m.contentType) > 0 {
		hdrs["content-type"] = m.contentType
	}
	if http.parserConfig.sendHeaders {
		cookie := "cookie"
		if !m.isRequest {
			cookie = "set-cookie"
		}
		for name, value := range m.headers {
			if strings.ToLower(name) == "content-type" {
				continue
			}
			if strings.ToLower(name) == "content-length" {
				continue
			}
			if http.splitCookie && name == cookie {
				hdrs[name] = splitCookiesHeader(string(value))
			} else {
				hdrs[name] = value
			}
		}
	}
	return hdrs
}
func (http *httpPlugin) setBody(result common.MapStr, m *message) {
	body := string(http.extractBody(m))
	if len(body) > 0 {
		result["body"] = body
	}
}
func splitCookiesHeader(headerVal string) map[string]string {
	cookies := map[string]string{}
	cstring := strings.Split(headerVal, ";")
	for _, cval := range cstring {
		cookie := strings.SplitN(cval, "=", 2)
		if len(cookie) == 2 {
			cookies[strings.ToLower(strings.TrimSpace(cookie[0]))] =
				parseCookieValue(strings.TrimSpace(cookie[1]))
		}
	}
	return cookies
}
func parseCookieValue(raw string) string {
	// Strip the quotes, if present.
	if len(raw) > 1 && raw[0] == '"' && raw[len(raw)-1] == '"' {
		raw = raw[1 : len(raw)-1]
	}
	return raw
}
func (http *httpPlugin) extractBody(m *message) []byte {
	body := []byte{}
	if len(m.contentType) > 0 && http.shouldIncludeInBody(m.contentType) {
		if len(m.chunkedBody) > 0 {
			body = append(body, m.chunkedBody...)
		} else {
			if isDebug {
				debugf("Body to include: [%s]", m.raw[m.bodyOffset:])
			}
			body = append(body, m.raw[m.bodyOffset:]...)
		}
	}
	return body
}
func (http *httpPlugin) cutMessageBody(m *message) []byte {
	cutMsg := []byte{}
	// add headers always
	cutMsg = m.raw[:m.bodyOffset]
	// add body
	return append(cutMsg, http.extractBody(m)...)
}
func (http *httpPlugin) shouldIncludeInBody(contenttype []byte) bool {
	includedBodies := http.includeBodyFor
	for _, include := range includedBodies {
		if bytes.Contains(contenttype, []byte(include)) {
			if isDebug {
				debugf("Should Include Body = true Content-Type %s include_body %s",
					contenttype, include)
			}
			return true
		}
		if isDebug {
			debugf("Should Include Body = false Content-Type %s include_body %s",
				contenttype, include)
		}
	}
	return false
}
func (http *httpPlugin) hideHeaders(m *message) {
	if !m.isRequest || !http.redactAuthorization {
		return
	}
	msg := m.raw
	// byte64 != encryption, so obscure it in headers in case of Basic Authentication
	redactHeaders := []string{"authorization", "proxy-authorization"}
	authText := []byte("uthorization:") // [aA] case insensitive, also catches Proxy-Authorization:
	authHeaderStartX := m.headerOffset
	authHeaderEndX := m.bodyOffset
	for authHeaderStartX < m.bodyOffset {
		if isDebug {
			debugf("looking for authorization from %d to %d",
				authHeaderStartX, authHeaderEndX)
		}
		startOfHeader := bytes.Index(msg[authHeaderStartX:m.bodyOffset], authText)
		if startOfHeader >= 0 {
			authHeaderStartX = authHeaderStartX + startOfHeader
			endOfHeader := bytes.Index(msg[authHeaderStartX:m.bodyOffset], []byte("\r\n"))
			if endOfHeader >= 0 {
				authHeaderEndX = authHeaderStartX + endOfHeader
				if authHeaderEndX > m.bodyOffset {
					authHeaderEndX = m.bodyOffset
				}
				if isDebug {
					debugf("Redact authorization from %d to %d", authHeaderStartX, authHeaderEndX)
				}
				for i := authHeaderStartX + len(authText); i < authHeaderEndX; i++ {
					msg[i] = byte('*')
				}
			}
		}
		authHeaderStartX = authHeaderEndX + len("\r\n")
		authHeaderEndX = m.bodyOffset
	}
	for _, header := range redactHeaders {
		if len(m.headers[header]) > 0 {
			m.headers[header] = []byte("*")
		}
	}
	m.raw = msg
}
func (http *httpPlugin) hideSecrets(values url.Values) url.Values {
	params := url.Values{}
	for key, array := range values {
		for _, value := range array {
			if http.isSecretParameter(key) {
				params.Add(key, "xxxxx")
			} else {
				params.Add(key, value)
			}
		}
	}
	return params
}
// extractParameters parses the URL and the form parameters and replaces the secrets
// with the string xxxxx. The parameters containing secrets are defined in http.Hide_secrets.
// Returns the Request URI path and the (adjusted) parameters.
func (http *httpPlugin) extractParameters(m *message, msg []byte) (path string, params string, err error) {
	var values url.Values
	u, err := url.Parse(string(m.requestURI))
	if err != nil {
		return
	}
	values = u.Query()
	path = u.Path
	paramsMap := http.hideSecrets(values)
	if m.contentLength > 0 && bytes.Contains(m.contentType, []byte("urlencoded")) {
		values, err = url.ParseQuery(string(msg[m.bodyOffset:]))
		if err != nil {
			return
		}
		for key, value := range http.hideSecrets(values) {
			paramsMap[key] = value
		}
	}
	params = paramsMap.Encode()
	if isDetailed {
		detailedf("Form parameters: %s", params)
	}
	return
}
func (http *httpPlugin) isSecretParameter(key string) bool {
	for _, keyword := range http.hideKeywords {
		if strings.ToLower(key) == keyword {
			return true
		}
	}
	return false
}
func (ml *messageList) append(msg *message) {
	if ml.tail == nil {
		ml.head = msg
	} else {
		ml.tail.next = msg
	}
	msg.next = nil
	ml.tail = msg
}
func (ml *messageList) empty() bool {
	return ml.head == nil
}
func (ml *messageList) pop() *message {
	if ml.head == nil {
		return nil
	}
	msg := ml.head
	ml.head = ml.head.next
	if ml.head == nil {
		ml.tail = nil
	}
	return msg
}
func (ml *messageList) last() *message {
	return ml.tail
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
 马建仓 AI 助手
马建仓 AI 助手