2 Star 1 Fork 0

李玮 / trireme-lib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
datapath_tcp.go 48.65 KB
一键复制 编辑 原始数据 按行查看 历史
李玮 提交于 2020-01-29 13:23 . v1
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189
package nfqdatapath
// Go libraries
import (
"fmt"
"strconv"
"git.cloud.top/DSec/trireme-lib/collector"
"git.cloud.top/DSec/trireme-lib/common"
"git.cloud.top/DSec/trireme-lib/controller/constants"
enforcerconstants "git.cloud.top/DSec/trireme-lib/controller/internal/enforcer/constants"
"git.cloud.top/DSec/trireme-lib/controller/pkg/claimsheader"
"git.cloud.top/DSec/trireme-lib/controller/pkg/connection"
"git.cloud.top/DSec/trireme-lib/controller/pkg/packet"
"git.cloud.top/DSec/trireme-lib/controller/pkg/pucontext"
"git.cloud.top/DSec/trireme-lib/controller/pkg/tokens"
"git.cloud.top/DSec/trireme-lib/policy"
"git.cloud.top/DSec/trireme-lib/utils/cache"
"github.com/mdlayher/netlink"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// processNetworkPackets processes packets arriving from network and are destined to the application
func (d *Datapath) processNetworkTCPPackets(p *packet.Packet) (conn *connection.TCPConnection, err error) {
if d.PacketLogsEnabled() {
zap.L().Debug("Processing network packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
)
defer zap.L().Debug("Finished Processing network packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
// Retrieve connection state of SynAck packets and
// skip processing for SynAck packets that we don't have state
switch p.GetTCPFlags() & packet.TCPSynAckMask {
case packet.TCPSynMask:
conn, err = d.netSynRetrieveState(p)
if err != nil {
switch err {
// Non PU Traffic let it through
case pucontext.ToError(pucontext.ErrNonPUTraffic):
return conn, nil
default:
if d.PacketLogsEnabled() {
zap.L().Debug("Packet rejected",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
return conn, err
}
}
case packet.TCPSynAckMask:
conn, err = d.netSynAckRetrieveState(p)
//add zhaolanbao
switch err {
case pucontext.ToError(pucontext.ErrOutOfOrderSynAck):
// Drop this synack it is for a flow we know which is marked for deletion.
// We saw a FINACK and this synack has come without we seeing an appsyn for this flow again
//return conn, fmt.Errorf("%s %s:%d", err, p.SourceAddress().String(), int(p.SourcePort()))
return conn, pucontext.PuContextError(pucontext.ErrOutOfOrderSynAck, fmt.Sprintf("%s %s:%d", err, p.SourceAddress().String(), int(p.SourcePort())))
case pucontext.ToError(pucontext.ErrNonPUTraffic):
d.releaseUnmonitoredFlow(p)
return conn, nil
}
default:
conn, err = d.netRetrieveState(p)
if err != nil {
if d.PacketLogsEnabled() {
zap.L().Debug("Packet rejected",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
return conn, err
}
}
conn.Lock()
defer conn.Unlock()
p.Print(packet.PacketStageIncoming, d.PacketLogsEnabled())
if d.service != nil {
if !d.service.PreProcessTCPNetPacket(p, conn.Context, conn) {
p.Print(packet.PacketFailureService, d.PacketLogsEnabled())
//return conn, errors.New("pre service processing failed for network packet")
return conn, pucontext.PuContextError(pucontext.ErrServicePreprocessorFailed, "pre service processing failed for network packet")
}
}
p.Print(packet.PacketStageAuth, d.PacketLogsEnabled())
// Match the tags of the packet against the policy rules - drop if the lookup fails
action, claims, err := d.processNetworkTCPPacket(p, conn.Context, conn)
if err != nil {
p.Print(packet.PacketFailureAuth, d.PacketLogsEnabled())
if d.PacketLogsEnabled() {
zap.L().Debug("Rejecting packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
//return conn, fmt.Errorf("packet processing failed for network packet: %s", err)
return conn, err
}
p.Print(packet.PacketStageService, d.PacketLogsEnabled())
if d.service != nil {
// PostProcessServiceInterface
if !d.service.PostProcessTCPNetPacket(p, action, claims, conn.Context, conn) {
p.Print(packet.PacketFailureService, d.PacketLogsEnabled())
//return conn, errors.New("post service processing failed for network packet")
return conn, pucontext.PuContextError(pucontext.ErrServicePostprocessorFailed, "post service processing failed for network packet")
}
// If we received a FIN packet here means the client sent a FIN packet and we can start clearing our cache
if conn.ServiceConnection && conn.TimeOut > 0 {
d.netReplyConnectionTracker.SetTimeOut(p.L4FlowHash(), conn.TimeOut) // nolint
}
}
if p.GetTCPFlags()&packet.TCPRstMask != 0 {
// Seen a RST packet. Remove cache entries related to this connection
zap.L().Debug("Received early reset from network and cleaning up state", zap.String("Flow", p.L4FlowHash()))
if err := d.netOrigConnectionTracker.Remove(p.L4FlowHash()); err != nil {
zap.L().Debug("Received early reset from network failed to clean net origin tracker", zap.String("Flow", p.L4FlowHash()))
}
if err := d.appReplyConnectionTracker.Remove(p.L4ReverseFlowHash()); err != nil {
zap.L().Debug("Received early reset from network failed to clean net pp reply tracker", zap.String("Flow", p.L4FlowHash()))
}
}
// Accept the packet
p.UpdateTCPChecksum()
p.Print(packet.PacketStageOutgoing, d.PacketLogsEnabled())
return conn, nil
}
// processApplicationPackets processes packets arriving from an application and are destined to the network
func (d *Datapath) processApplicationTCPPackets(p *packet.Packet) (conn *connection.TCPConnection, err error) {
if d.PacketLogsEnabled() {
zap.L().Debug("Processing application packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
)
defer zap.L().Debug("Finished Processing application packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
switch p.GetTCPFlags() & packet.TCPSynAckMask {
case packet.TCPSynMask:
conn, err = d.appSynRetrieveState(p)
if err != nil {
if d.PacketLogsEnabled() {
zap.L().Debug("Packet rejected",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
return conn, err
}
case packet.TCPSynAckMask:
conn, err = d.appSynAckRetrieveState(p)
if err != nil {
if d.PacketLogsEnabled() {
zap.L().Debug("SynAckPacket Ignored",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
)
}
cid, err := d.contextIDFromTCPPort.GetSpecValueFromPort(p.SourcePort())
if err == nil {
item, err := d.puFromContextID.Get(cid.(string))
if err != nil {
// Let the packet through if the context is not found
return conn, nil
}
ctx := item.(*pucontext.PUContext)
// Syn was not seen and this synack packet is coming from a PU
// we monitor. This is possible only if IP is in the external
// networks or excluded networks. Let this packet go through
// for any of these cases. Drop for everything else.
_, policy, perr := ctx.NetworkACLPolicyFromAddr(p.DestinationAddress(), p.SourcePort())
if perr == nil && policy.Action.Accepted() {
return conn, nil
}
// Drop this synack as it belongs to PU
// for which we didn't see syn
zap.L().Error("Network Syn was not seen, and we are monitoring this PU. Dropping the syn ack packet", zap.String("contextID", cid.(string)), zap.Uint16("port", p.SourcePort()))
return conn, pucontext.PuContextError(pucontext.ErrNetSynNotSeen, fmt.Sprintf("Network Syn was not seen %s,%d", cid.(string), int(p.SourcePort())))
}
// syn ack for non aporeto traffic can be let through
return conn, nil
}
default:
conn, err = d.appRetrieveState(p)
if err != nil {
if d.PacketLogsEnabled() {
zap.L().Debug("Packet rejected",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
return conn, err
}
}
conn.Lock()
defer conn.Unlock()
p.Print(packet.PacketStageIncoming, d.PacketLogsEnabled())
if d.service != nil {
// PreProcessServiceInterface
if !d.service.PreProcessTCPAppPacket(p, conn.Context, conn) {
p.Print(packet.PacketFailureService, d.PacketLogsEnabled())
//return conn, errors.New("pre service processing failed for application packet")
return conn, conn.Context.PuContextError(pucontext.ErrServicePreprocessorFailed, fmt.Sprintf("%s:%s:%s", conn.Context.ID(), p.SourceAddress().String(), p.DestinationAddress().String()))
}
}
p.Print(packet.PacketStageAuth, d.PacketLogsEnabled())
// Match the tags of the packet against the policy rules - drop if the lookup fails
action, err := d.processApplicationTCPPacket(p, conn.Context, conn)
if err != nil {
if d.PacketLogsEnabled() {
zap.L().Debug("Dropping packet ",
zap.String("flow", p.L4FlowHash()),
zap.String("Flags", packet.TCPFlagsToStr(p.GetTCPFlags())),
zap.Error(err),
)
}
p.Print(packet.PacketFailureAuth, d.PacketLogsEnabled())
//return conn, fmt.Errorf("processing failed for application packet: %s", err)
return conn, err
}
p.Print(packet.PacketStageService, d.PacketLogsEnabled())
if d.service != nil {
// PostProcessServiceInterface
if !d.service.PostProcessTCPAppPacket(p, action, conn.Context, conn) {
p.Print(packet.PacketFailureService, d.PacketLogsEnabled())
//return conn, errors.New("post service processing failed for application packet")
return conn, conn.Context.PuContextError(pucontext.ErrServicePostprocessorFailed, fmt.Sprintf("%s:%s:%s", conn.Context.ID(), p.SourceAddress().String(), p.DestinationAddress().String()))
}
}
if p.GetTCPFlags()&packet.TCPRstMask != 0 {
// Seen a RST packet. Remove cache entries related to this connection
zap.L().Debug("Received early reset by application and cleaning up state", zap.String("Flow", p.L4FlowHash()))
if err := d.sourcePortConnectionCache.Remove(p.SourcePortHash(packet.PacketTypeApplication)); err != nil {
zap.L().Debug("Received early reset by application and failed in source port cache", zap.String("Flow", p.L4FlowHash()))
}
if err := d.appOrigConnectionTracker.Remove(p.L4FlowHash()); err != nil {
zap.L().Debug("Received early reset by application and failed in app origin tracker", zap.String("Flow", p.L4FlowHash()))
}
}
// Accept the packet
p.UpdateTCPChecksum()
p.Print(packet.PacketStageOutgoing, d.PacketLogsEnabled())
return conn, nil
}
// processApplicationTCPPacket processes a TCP packet and dispatches it to other methods based on the flags
func (d *Datapath) processApplicationTCPPacket(tcpPacket *packet.Packet, context *pucontext.PUContext, conn *connection.TCPConnection) (interface{}, error) {
if conn == nil {
return nil, nil
}
// State machine based on the flags
switch tcpPacket.GetTCPFlags() & packet.TCPSynAckMask {
case packet.TCPSynMask: //Processing SYN packet from Application
return d.processApplicationSynPacket(tcpPacket, context, conn)
case packet.TCPAckMask:
return nil, d.processApplicationAckPacket(tcpPacket, context, conn)
case packet.TCPSynAckMask:
return nil, d.processApplicationSynAckPacket(tcpPacket, context, conn)
default:
return nil, nil
}
}
// processApplicationSynPacket processes a single Syn Packet
func (d *Datapath) processApplicationSynPacket(tcpPacket *packet.Packet, context *pucontext.PUContext, conn *connection.TCPConnection) (interface{}, error) {
// If the packet is not in target networks then look into the external services application cache to
// make a decision whether the packet should be forwarded. For target networks with external services
// network syn/ack accepts the packet if it belongs to external services.
dstAddr := tcpPacket.DestinationAddress()
dstPort := tcpPacket.DestPort()
_, pkt, perr := d.targetNetworks.GetMatchingAction(dstAddr, dstPort)
if perr != nil {
report, policy, perr := context.ApplicationACLPolicyFromAddr(dstAddr, dstPort)
if perr == nil && policy.Action.Accepted() {
return nil, nil
}
d.reportExternalServiceFlow(context, report, pkt, true, tcpPacket)
//return nil, fmt.Errorf("No acls found for external services. Dropping application syn packet")
return nil, conn.Context.PuContextError(pucontext.ErrDroppedExternalService, fmt.Sprintf("Dropping external service application syn packet %s:%d", tcpPacket.DestinationAddress().String(), int(tcpPacket.DestPort())))
}
if policy, err := context.RetrieveCachedExternalFlowPolicy(tcpPacket.DestinationAddress().String() + ":" + strconv.Itoa(int(tcpPacket.DestPort()))); err == nil {
d.appOrigConnectionTracker.AddOrUpdate(tcpPacket.L4FlowHash(), conn)
d.sourcePortConnectionCache.AddOrUpdate(tcpPacket.SourcePortHash(packet.PacketTypeApplication), conn)
return policy, nil
}
// We are now processing as a Trireme packet that needs authorization headers
// Create TCP Option
tcpOptions := d.createTCPAuthenticationOption([]byte{})
// Create a token
tcpData, err := d.tokenAccessor.CreateSynPacketToken(context, &conn.Auth)
if err != nil {
return nil, err
}
// Set the state indicating that we send out a Syn packet
conn.SetState(connection.TCPSynSend)
// Poplate the caches to track the connection
d.appOrigConnectionTracker.AddOrUpdate(tcpPacket.L4FlowHash(), conn)
d.sourcePortConnectionCache.AddOrUpdate(tcpPacket.SourcePortHash(packet.PacketTypeApplication), conn)
// Attach the tags to the packet and accept the packet
return nil, tcpPacket.TCPDataAttach(tcpOptions, tcpData)
}
// processApplicationSynAckPacket processes an application SynAck packet
func (d *Datapath) processApplicationSynAckPacket(tcpPacket *packet.Packet, context *pucontext.PUContext, conn *connection.TCPConnection) error {
// if the traffic belongs to the same pu, let it go
if conn.GetState() == connection.TCPData && conn.IsLoopbackConnection() {
return nil
}
// If we are already in the connection.TCPData, it means that this is an external flow
// At this point we can release the flow to the kernel by updating conntrack
// We can also clean up the state since we are not going to see any more
// packets from this connection.
if conn.GetState() == connection.TCPData && !conn.ServiceConnection {
err1 := d.netOrigConnectionTracker.Remove(tcpPacket.L4ReverseFlowHash())
err2 := d.appReplyConnectionTracker.Remove(tcpPacket.L4FlowHash())
if err1 != nil || err2 != nil {
zap.L().Debug("Failed to remove cache entries")
}
if err := d.conntrack.UpdateApplicationFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultExternalConnMark,
); err != nil {
if !netlink.IsNotExist(errors.Cause(err)) { // nolint
zap.L().Error("Failed to update conntrack entry for flow at SynAck packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err))
} else {
zap.L().Debug("Failed to update conntrack entry for flow at SynAck packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err))
}
}
return nil
}
// We now process packets that need authorization options
// Create TCP Option
tcpOptions := d.createTCPAuthenticationOption([]byte{})
claimsHeader := claimsheader.NewClaimsHeader(
claimsheader.OptionEncrypt(conn.PacketFlowPolicy.Action.Encrypted()),
)
// never returns error
tcpData, err := d.tokenAccessor.CreateSynAckPacketToken(context, &conn.Auth, claimsHeader)
if err != nil {
return err
}
// Set the state for future reference
conn.SetState(connection.TCPSynAckSend)
// Attach the tags to the packet
return tcpPacket.TCPDataAttach(tcpOptions, tcpData)
}
// processApplicationAckPacket processes an application ack packet
func (d *Datapath) processApplicationAckPacket(tcpPacket *packet.Packet, context *pucontext.PUContext, conn *connection.TCPConnection) error {
// Only process the first Ack of a connection. This means that we have received
// as SynAck packet and we can now process the ACK.
if conn.GetState() == connection.TCPSynAckReceived {
// Special case. We are handling an AP packet with data, but the ACK has been lost
// somewhere. In this case, we drop the payload and send our authorization data.
// The TCP stack will try again.
if !tcpPacket.IsEmptyTCPPayload() {
if err := tcpPacket.TCPDataDetach(0); err != nil {
return fmt.Errorf("unable to detach data from packet: %s", err)
}
}
// Create a new token that includes the source and destinatio nonse
// These are both challenges signed by the secret key and random for every
// connection minimizing the chances of a replay attack
token, err := d.tokenAccessor.CreateAckPacketToken(context, &conn.Auth)
if err != nil {
return err
}
tcpOptions := d.createTCPAuthenticationOption([]byte{})
// Attach the tags to the packet
if err := tcpPacket.TCPDataAttach(tcpOptions, token); err != nil {
return err
}
conn.SetState(connection.TCPAckSend)
return nil
}
// If we are already in the connection.TCPData connection just forward the packet
if conn.GetState() == connection.TCPData {
return nil
}
if conn.GetState() == connection.UnknownState {
// Check if the destination is in the external services approved cache
// and if yes, allow the packet to go and release the flow.
_, policy, perr := context.ApplicationACLPolicyFromAddr(tcpPacket.DestinationAddress(), tcpPacket.DestPort())
if perr != nil {
// If it is an SSH PU, we let the connection go through
// It means it is a new SSH Session connection, we mark
// the packet and let it go through.
if context.Type() == common.SSHSessionPU {
if err := d.conntrack.UpdateApplicationFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultConnMark,
); err != nil {
if !netlink.IsNotExist(errors.Cause(err)) {
zap.L().Error("Failed to update conntrack entry for flow at Ack packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err))
} else {
zap.L().Debug("Failed to update conntrack entry for flow at Ack packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err))
}
}
return nil
}
conn.Context.PuContextError(pucontext.ErrAckInUnknownState, fmt.Sprintf("Ack Received %s:%d", tcpPacket.DestinationAddress().String(), int(tcpPacket.DestPort()))) // nolint
// Convert Ack to FinAck for all other pus
return tcpPacket.ConvertAcktoFinAck()
}
if policy.Action.Rejected() {
return conn.Context.PuContextError(pucontext.ErrRejectPacket, fmt.Sprintf("Rejected due to policy %s", policy.PolicyID))
}
if err := d.conntrack.UpdateApplicationFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultExternalConnMark,
); err != nil {
zap.L().Error("Failed to update conntrack entry for flow at Ack packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err),
)
}
return nil
}
// Here we capture the first data packet after an ACK packet by modyfing the
// state. We will not release the caches though to deal with re-transmissions.
// We will let the caches expire.
if conn.GetState() == connection.TCPAckSend {
if !conn.ServiceConnection && tcpPacket.SourceAddress().String() != tcpPacket.DestinationAddress().String() &&
!(tcpPacket.SourceAddress().IsLoopback() && tcpPacket.DestinationAddress().IsLoopback()) {
go func() {
if err := d.conntrack.UpdateApplicationFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultConnMark,
); err != nil {
zap.L().Error("Failed to update conntrack table for flow after ack packet",
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.Error(err),
)
}
context.PuContextError(pucontext.ErrConnectionsProcessed, "") // nolint
}()
}
conn.SetState(connection.TCPData)
return nil
}
return fmt.Errorf("received application ack packet in the wrong state: %d", conn.GetState())
}
// processNetworkTCPPacket processes a network TCP packet and dispatches it to different methods based on the flags
func (d *Datapath) processNetworkTCPPacket(tcpPacket *packet.Packet, context *pucontext.PUContext, conn *connection.TCPConnection) (action interface{}, claims *tokens.ConnectionClaims, err error) {
if conn == nil {
return nil, nil, nil
}
// Update connection state in the internal state machine tracker
switch tcpPacket.GetTCPFlags() & packet.TCPSynAckMask {
case packet.TCPSynMask:
return d.processNetworkSynPacket(context, conn, tcpPacket)
case packet.TCPAckMask:
return d.processNetworkAckPacket(context, conn, tcpPacket)
case packet.TCPSynAckMask:
return d.processNetworkSynAckPacket(context, conn, tcpPacket)
default: // Ignore any other packet
return nil, nil, nil
}
}
// processNetworkSynPacket processes a syn packet arriving from the network
func (d *Datapath) processNetworkSynPacket(context *pucontext.PUContext, conn *connection.TCPConnection, tcpPacket *packet.Packet) (action interface{}, claims *tokens.ConnectionClaims, err error) {
// Incoming packets that don't have our options are candidates to be processed
// as external services.
if err = tcpPacket.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
// If there is no auth option, attempt the ACLs
report, pkt, perr := context.NetworkACLPolicy(tcpPacket)
d.reportExternalServiceFlow(context, report, pkt, false, tcpPacket)
if perr != nil || pkt.Action.Rejected() {
return nil, nil, fmt.Errorf("no auth or acls: outgoing connection dropped: %s", perr)
}
conn.SetState(connection.TCPData)
d.netOrigConnectionTracker.AddOrUpdate(tcpPacket.L4FlowHash(), conn)
d.appReplyConnectionTracker.AddOrUpdate(tcpPacket.L4ReverseFlowHash(), conn)
return pkt, nil, nil
}
// Packets that have authorization information go through the auth path
// Decode the JWT token using the context key
claims, err = d.tokenAccessor.ParsePacketToken(&conn.Auth, tcpPacket.ReadTCPData())
// If the token signature is not valid, we must drop the connection and we drop the Syn packet.
// The source will retry but we have no state to maintain here.
if err != nil {
d.reportRejectedFlow(tcpPacket, conn, collector.DefaultEndPoint, context.ManagementID(), context, tokens.CodeFromErr(err), nil, nil, false)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynDroppedInvalidToken, fmt.Sprintf("contextID %s SourceAddress %s DestPort %d", context.ManagementID(), tcpPacket.SourceAddress().String(), int(tcpPacket.DestPort())))
}
// if there are no claims we must drop the connection and we drop the Syn
// packet. The source will retry but we have no state to maintain here.
if claims == nil {
d.reportRejectedFlow(tcpPacket, conn, collector.DefaultEndPoint, context.ManagementID(), context, collector.InvalidToken, nil, nil, false)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynDroppedNoClaims, fmt.Sprintf("contextID %s SourceAddress %s DestPort %d", context.ManagementID(), tcpPacket.SourceAddress().String(), int(tcpPacket.DestPort())))
}
txLabel, ok := claims.T.Get(enforcerconstants.TransmitterLabel)
if err := tcpPacket.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); !ok || err != nil {
d.reportRejectedFlow(tcpPacket, conn, txLabel, context.ManagementID(), context, collector.InvalidFormat, nil, nil, false)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynDroppedTCPOption, fmt.Sprintf("contextID %s SourceAddress %s DestPort %d", context.ManagementID(), tcpPacket.SourceAddress().String(), int(tcpPacket.DestPort())))
}
// Remove any of our data from the packet. No matter what we don't need the
// metadata any more.
if err := tcpPacket.TCPDataDetach(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
d.reportRejectedFlow(tcpPacket, conn, txLabel, context.ManagementID(), context, collector.InvalidHeader, nil, nil, false)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynDroppedInvalidFormat, fmt.Sprintf("contextID %s SourceAddress %s DestPort %d", context.ManagementID(), tcpPacket.SourceAddress().String(), int(tcpPacket.DestPort())))
}
tcpPacket.DropTCPDetachedBytes()
// Add the port as a label with an @ prefix. These labels are invalid otherwise
// If all policies are restricted by port numbers this will allow port-specific policies
tags := claims.T.Copy()
tags.AppendKeyValue(constants.PortNumberLabelString, fmt.Sprintf("%s/%s", constants.TCPProtoString, strconv.Itoa(int(tcpPacket.DestPort()))))
report, pkt := context.SearchRcvRules(tags)
if pkt.Action.Rejected() && (txLabel != context.ManagementID()) {
d.reportRejectedFlow(tcpPacket, conn, txLabel, context.ManagementID(), context, collector.PolicyDrop, report, pkt, false)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynRejectPacket, fmt.Sprintf("contextID %s SourceAddress %s DestPort %d PolicyID %s", context.ManagementID(), tcpPacket.SourceAddress().String(), int(tcpPacket.DestPort()), pkt.PolicyID))
}
hash := tcpPacket.L4FlowHash()
// Update the connection state and store the Nonse send to us by the host.
// We use the nonse in the subsequent packets to achieve randomization.
conn.SetState(connection.TCPSynReceived)
// conntrack
d.netOrigConnectionTracker.AddOrUpdate(hash, conn)
d.appReplyConnectionTracker.AddOrUpdate(tcpPacket.L4ReverseFlowHash(), conn)
// Cache the action
conn.ReportFlowPolicy = report
conn.PacketFlowPolicy = pkt
if txLabel == context.ManagementID() {
zap.L().Debug("Traffic to the same pu", zap.String("flow", tcpPacket.L4FlowHash()))
conn.SetLoopbackConnection(true)
}
// Accept the connection
return pkt, claims, nil
}
// policyPair stores both reporting and actual action taken on packet.
type policyPair struct {
report *policy.FlowPolicy
packet *policy.FlowPolicy
}
// processNetworkSynAckPacket processes a SynAck packet arriving from the network
func (d *Datapath) processNetworkSynAckPacket(context *pucontext.PUContext, conn *connection.TCPConnection, tcpPacket *packet.Packet) (action interface{}, claims *tokens.ConnectionClaims, err error) {
// Packets with no authorization are processed as external services based on the ACLS
if err = tcpPacket.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
if _, err := d.puFromContextID.Get(conn.Context.ID()); err != nil {
// PU has been deleted. Ignore these packets
return nil, nil, conn.Context.PuContextError(pucontext.ErrInvalidSynAck, fmt.Sprintf("Pu with ID delete %s", conn.Context.ID()))
}
flowHash := tcpPacket.SourceAddress().String() + ":" + strconv.Itoa(int(tcpPacket.SourcePort()))
if plci, plerr := context.RetrieveCachedExternalFlowPolicy(flowHash); plerr == nil {
plc := plci.(*policyPair)
d.releaseFlow(context, plc.report, plc.packet, tcpPacket)
return plc.packet, nil, nil
}
// Never seen this IP before, let's parse them.
report, pkt, perr := context.ApplicationACLPolicyFromAddr(tcpPacket.SourceAddress(), tcpPacket.SourcePort())
if perr != nil || pkt.Action.Rejected() {
d.reportReverseExternalServiceFlow(context, report, pkt, true, tcpPacket)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckDroppedExternalService, fmt.Sprintf("drop external service synack Source %s:%d:%s", tcpPacket.SourceAddress().String(), int(tcpPacket.SourcePort()), pkt.Action.ActionString()))
}
// Added to the cache if we can accept it
context.CacheExternalFlowPolicy(
tcpPacket,
&policyPair{
report: report,
packet: pkt,
},
)
// Set the state to Data so the other state machines ignore subsequent packets
conn.SetState(connection.TCPData)
d.releaseFlow(context, report, pkt, tcpPacket)
return pkt, nil, nil
}
// This is a corner condition. We are receiving a SynAck packet and we are in
// a state that indicates that we have already processed one. This means that
// our ack packet was lost. We need to revert conntrack in this case and get
// back into the picture.
if conn.GetState() != connection.TCPSynSend {
// Revert the connmarks - dealing with retransmissions
if cerr := d.conntrack.UpdateApplicationFlowMark(
tcpPacket.DestinationAddress(),
tcpPacket.SourceAddress(),
tcpPacket.IPProto(),
tcpPacket.DestPort(),
tcpPacket.SourcePort(),
uint32(1), // We cannot put it back to zero. We need something other value.
); cerr != nil {
zap.L().Error("Failed to update conntrack table for flow after synack packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err),
)
}
}
// Now we can process the SynAck packet with its options
tcpData := tcpPacket.ReadTCPData()
if len(tcpData) == 0 {
d.reportRejectedFlow(tcpPacket, nil, context.ManagementID(), collector.DefaultEndPoint, context, collector.MissingToken, nil, nil, true)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckMissingToken, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
claims, err = d.tokenAccessor.ParsePacketToken(&conn.Auth, tcpPacket.ReadTCPData())
if err != nil {
d.reportRejectedFlow(tcpPacket, nil, context.ManagementID(), collector.DefaultEndPoint, context, collector.MissingToken, nil, nil, true)
//return nil, nil, fmt.Errorf("SynAck packet dropped because of bad claims: %s", err)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckBadClaims, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
if claims == nil {
d.reportRejectedFlow(tcpPacket, nil, context.ManagementID(), collector.DefaultEndPoint, context, collector.MissingToken, nil, nil, true)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckMissingClaims, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
tcpPacket.ConnectionMetadata = &conn.Auth
if err := tcpPacket.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.InvalidHeader, nil, nil, true)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckNoTCPAuthOption, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
// Remove any of our data
if err := tcpPacket.TCPDataDetach(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.InvalidPayload, nil, nil, true)
//return nil, nil, fmt.Errorf("SynAck packet dropped because of invalid format: %s", err)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckInvalidFormat, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
tcpPacket.DropTCPDetachedBytes()
if !d.mutualAuthorization {
// If we dont do mutual authorization, dont lookup txt rules.
conn.SetState(connection.TCPSynAckReceived)
// conntrack
d.netReplyConnectionTracker.AddOrUpdate(tcpPacket.L4FlowHash(), conn)
return nil, claims, nil
}
report, pkt := context.SearchTxtRules(claims.T, !d.mutualAuthorization)
// Report and release traffic belonging to the same pu
if conn.Auth.RemoteContextID == context.ManagementID() {
conn.SetState(connection.TCPData)
conn.SetLoopbackConnection(true)
d.reportAcceptedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, nil, nil, true)
d.releaseUnmonitoredFlow(tcpPacket)
return nil, nil, nil
}
// NOTE: For backward compatibility, remove this check later
if claims.H != nil {
if claims.H.ToClaimsHeader().Encrypt() != pkt.Action.Encrypted() {
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.EncryptionMismatch, nil, nil, true)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckClaimsMisMatch, fmt.Sprintf("contextID %s SourceAddress %s", context.ManagementID(), tcpPacket.SourceAddress().String()))
}
}
if pkt.Action.Rejected() {
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.PolicyDrop, report, pkt, true)
return nil, nil, conn.Context.PuContextError(pucontext.ErrSynAckRejected, fmt.Sprintf("contextID %s Claims %s", context.ManagementID(), claims.T.String()))
}
conn.SetState(connection.TCPSynAckReceived)
// conntrack
d.netReplyConnectionTracker.AddOrUpdate(tcpPacket.L4FlowHash(), conn)
return pkt, claims, nil
}
// processNetworkAckPacket processes an Ack packet arriving from the network
func (d *Datapath) processNetworkAckPacket(context *pucontext.PUContext, conn *connection.TCPConnection, tcpPacket *packet.Packet) (action interface{}, claims *tokens.ConnectionClaims, err error) {
if conn.GetState() == connection.TCPData || conn.GetState() == connection.TCPAckSend {
return nil, nil, nil
}
if conn.IsLoopbackConnection() {
conn.SetState(connection.TCPData)
d.releaseUnmonitoredFlow(tcpPacket)
return nil, nil, nil
}
if conn.GetState() == connection.UnknownState {
// Check if the destination is in the external servicess approved cache
// and if yes, allow the packet to go and release the flow.
_, plcy, perr := context.NetworkACLPolicy(tcpPacket)
// Ignore FIN packets. Let them go through.
if tcpPacket.GetTCPFlags()&packet.TCPFinMask != 0 {
return nil, nil, nil
}
if perr != nil {
err := tcpPacket.ConvertAcktoFinAck()
return nil, nil, err
}
if plcy.Action.Rejected() {
return nil, nil, conn.Context.PuContextError(pucontext.ErrAckRejected, fmt.Sprintf("contextID %s", conn.Context.ID()))
}
if err := d.conntrack.UpdateNetworkFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultExternalConnMark,
); err != nil {
zap.L().Error("Failed to update conntrack entry for flow at network Ack packet",
zap.String("context", string(conn.Auth.LocalContext)),
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.Error(err),
)
}
return nil, nil, nil
}
hash := tcpPacket.L4FlowHash()
// Validate that the source/destination nonse matches. The signature has validated both directions
if conn.GetState() == connection.TCPSynAckSend || conn.GetState() == connection.TCPSynReceived {
if err := tcpPacket.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
return nil, nil, conn.Context.PuContextError(pucontext.ErrAckTCPNoTCPAuthOption, fmt.Sprintf("contextID %s destPort %d", context.ManagementID(), int(tcpPacket.DestPort())))
}
if _, err := d.tokenAccessor.ParseAckToken(&conn.Auth, tcpPacket.ReadTCPData()); err != nil {
d.reportRejectedFlow(tcpPacket, conn, collector.DefaultEndPoint, context.ManagementID(), context, collector.InvalidToken, nil, nil, false)
zap.L().Error("Ack Packet dropped because signature validation failed", zap.Error(err))
return nil, nil, conn.Context.PuContextError(pucontext.ErrAckSigValidationFailed, fmt.Sprintf("contextID %s destPort %d", context.ManagementID(), int(tcpPacket.DestPort())))
}
// Remove any of our data - adjust the sequence numbers
if err := tcpPacket.TCPDataDetach(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
d.reportRejectedFlow(tcpPacket, conn, collector.DefaultEndPoint, context.ManagementID(), context, collector.InvalidPayload, nil, nil, false)
zap.L().Error("Error: Ack packet dropped because of invalid format", zap.Error(err))
return nil, nil, conn.Context.PuContextError(pucontext.ErrAckInvalidFormat, fmt.Sprintf("contextID %s destPort %d", context.ManagementID(), int(tcpPacket.DestPort())))
}
tcpPacket.DropTCPDetachedBytes()
if conn.PacketFlowPolicy != nil && conn.PacketFlowPolicy.Action.Rejected() {
if !conn.PacketFlowPolicy.ObserveAction.Observed() {
zap.L().Error("Flow rejected but not observed", zap.String("conn", context.ManagementID()))
}
// Flow has been allowed because we are observing a deny rule's impact on the system. Packets are forwarded, reported as dropped + observed.
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.PolicyDrop, conn.ReportFlowPolicy, conn.PacketFlowPolicy, false)
} else {
// We accept the packet as a new flow
d.reportAcceptedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, conn.ReportFlowPolicy, conn.PacketFlowPolicy, false)
}
conn.SetState(connection.TCPData)
if !conn.ServiceConnection {
go func() {
if err := d.conntrack.UpdateNetworkFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultConnMark,
); err != nil {
zap.L().Error("Failed to update conntrack table after ack packet",
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()))
}
}()
}
// Accept the packet
return nil, nil, nil
}
if conn.ServiceConnection {
conn.Context.PuContextError(pucontext.ErrEncrConnectionsProcessed, "") // nolint
return nil, nil, nil
}
// Everything else is dropped - ACK received in the Syn state without a SynAck
d.reportRejectedFlow(tcpPacket, conn, conn.Auth.RemoteContextID, context.ManagementID(), context, collector.InvalidState, nil, nil, false)
zap.L().Error("Invalid state reached",
zap.String("state", fmt.Sprintf("%d", conn.GetState())),
zap.String("context", context.ManagementID()),
zap.String("net-conn", hash),
)
return nil, nil, conn.Context.PuContextError(pucontext.ErrInvalidConnState, fmt.Sprintf("contextID %s destPort %d", context.ManagementID(), int(tcpPacket.DestPort())))
}
// createTCPAuthenticationOption creates the TCP authentication option -
func (d *Datapath) createTCPAuthenticationOption(token []byte) []byte {
tokenLen := uint8(len(token))
options := []byte{packet.TCPAuthenticationOption, enforcerconstants.TCPAuthenticationOptionBaseLen + tokenLen, 0, 0}
if tokenLen != 0 {
options = append(options, token...)
}
return options
}
// appSynRetrieveState retrieves state for the the application Syn packet.
// It creates a new connection by default
func (d *Datapath) appSynRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
context, err := d.contextFromIP(true, p.Mark, p.SourcePort(), packet.IPProtocolTCP)
if err != nil {
return nil, pucontext.PuContextError(pucontext.ErrSynUnexpectedPacket, fmt.Sprintf("Received unexpected syn %s sourceport %d", p.Mark, int(p.SourcePort())))
}
if conn, err := d.appOrigConnectionTracker.GetReset(p.L4FlowHash(), 0); err == nil && !conn.(*connection.TCPConnection).MarkForDeletion {
// return this connection only if we are not deleting this
// this is marked only when we see a FINACK for this l4flowhash
// this should not have happened for a connection while we are processing a appSyn for this connection
// The addorupdate for this cache will happen outside in processtcppacket
return conn.(*connection.TCPConnection), nil
}
return connection.NewTCPConnection(context, p), nil
}
// appSynAckRetrieveState retrieves the state for application syn/ack packet.
func (d *Datapath) appSynAckRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
hash := p.L4FlowHash()
// Did we see a network syn for this server PU?
conn, err := d.appReplyConnectionTracker.GetReset(hash, 0)
if err != nil {
return nil, pucontext.PuContextError(pucontext.ErrNetSynNotSeen, err.Error())
}
if uerr := updateTimer(d.appReplyConnectionTracker, hash, conn.(*connection.TCPConnection)); uerr != nil {
zap.L().Error("entry expired just before updating the timer", zap.String("flow", hash))
return nil, uerr
}
return conn.(*connection.TCPConnection), nil
}
// appRetrieveState retrieves the state for the rest of the application packets. It
// returns an error if it cannot find the state
func (d *Datapath) appRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
hash := p.L4FlowHash()
// If this ack packet is from Server, Did we see a network Syn for this server PU?
conn, err := d.appReplyConnectionTracker.GetReset(hash, 0)
if err == nil {
if uerr := updateTimer(d.appReplyConnectionTracker, hash, conn.(*connection.TCPConnection)); uerr != nil {
zap.L().Error("entry expired just before updating the timer", zap.String("flow", hash))
return nil, uerr
}
return conn.(*connection.TCPConnection), nil
}
// If this ack packet is from client, Did we see an Application Syn packet before?
conn, err = d.appOrigConnectionTracker.GetReset(hash, 0)
if err == nil {
if uerr := updateTimer(d.appOrigConnectionTracker, hash, conn.(*connection.TCPConnection)); uerr != nil {
return nil, uerr
}
return conn.(*connection.TCPConnection), nil
}
if p.GetTCPFlags()&packet.TCPSynAckMask == packet.TCPAckMask {
// Let's try if its an existing connection
context, err := d.contextFromIP(true, p.Mark, p.SourcePort(), packet.IPProtocolTCP)
if err != nil {
return nil, errors.New("No context in app processing")
}
conn = connection.NewTCPConnection(context, p)
conn.(*connection.TCPConnection).SetState(connection.UnknownState)
return conn.(*connection.TCPConnection), nil
}
return nil, pucontext.PuContextError(pucontext.ErrNoConnFound, fmt.Sprintf("SourcePort %d %s", int(p.SourcePort()), p.Mark))
}
// netSynRetrieveState retrieves the state for the Syn packets on the network.
// Obviously if no state is found, it generates a new connection record.
func (d *Datapath) netSynRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
context, err := d.contextFromIP(false, p.Mark, p.DestPort(), packet.IPProtocolTCP)
if err == nil {
if conn, err := d.netOrigConnectionTracker.GetReset(p.L4FlowHash(), 0); err == nil && !conn.(*connection.TCPConnection).MarkForDeletion {
// Only if we havent seen FINACK on this connection
return conn.(*connection.TCPConnection), nil
}
return connection.NewTCPConnection(context, p), nil
}
//This needs to hit only for local processes never for containers
//Don't return an error create a dummy context and return it so we truncate the packet before we send it up
if d.mode != constants.RemoteContainer {
//we will create the bare minimum needed to exercise our stack
//We need this syn to look similar to what we will pass on the retry
//so we setup enough for us to identify this request in the later stages
// Remove any of our data from the packet.
if err = p.CheckTCPAuthenticationOption(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
zap.L().Error("Syn received with tcp option not set", zap.Error(err))
return nil, pucontext.PuContextError(pucontext.ErrNonPUTraffic, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
if err = p.TCPDataDetach(enforcerconstants.TCPAuthenticationOptionBaseLen); err != nil {
zap.L().Error("Error removing TCP Data", zap.Error(err))
return nil, pucontext.PuContextError(pucontext.ErrNonPUTraffic, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
p.DropTCPDetachedBytes()
p.UpdateTCPChecksum()
return nil, pucontext.PuContextError(pucontext.ErrNonPUTraffic, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
return nil, pucontext.PuContextError(pucontext.ErrInvalidNetState, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
// netSynAckRetrieveState retrieves the state for SynAck packets at the network
// It relies on the source port cache for that
func (d *Datapath) netSynAckRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
conn, err := d.sourcePortConnectionCache.GetReset(p.SourcePortHash(packet.PacketTypeNetwork), 0)
if err != nil {
return nil, pucontext.PuContextError(pucontext.ErrNonPUTraffic, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
if conn.(*connection.TCPConnection).MarkForDeletion {
return nil, pucontext.PuContextError(pucontext.ErrOutOfOrderSynAck, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
return conn.(*connection.TCPConnection), nil
}
// netRetrieveState retrieves the state of a network connection. Use the flow caches for that
func (d *Datapath) netRetrieveState(p *packet.Packet) (*connection.TCPConnection, error) {
hash := p.L4FlowHash()
// ignore conn.MarkFordeletion here since these could be ack packets arriving out of order
// Did we see a network syn/ack packet? (PU is a client)
conn, err := d.netReplyConnectionTracker.GetReset(hash, 0)
if err == nil {
if err = updateTimer(d.netReplyConnectionTracker, hash, conn.(*connection.TCPConnection)); err != nil {
return nil, err
}
return conn.(*connection.TCPConnection), nil
}
// Did we see a network Syn packet before? (PU is a server)
conn, err = d.netOrigConnectionTracker.GetReset(hash, 0)
if err == nil {
if err = updateTimer(d.netOrigConnectionTracker, hash, conn.(*connection.TCPConnection)); err != nil {
return nil, err
}
return conn.(*connection.TCPConnection), nil
}
// We reach in this state for a client PU only when the service connection(encrypt) sends data sparsely.
// Packets are dropped when this happens, and that is a BUG!!!
// For the server PU we mark the connection in the unknown state.
if p.GetTCPFlags()&packet.TCPSynAckMask == packet.TCPAckMask {
// Let's try if its an existing connection
context, cerr := d.contextFromIP(false, p.Mark, p.DestPort(), packet.IPProtocolTCP)
if cerr != nil {
return nil, err
}
conn = connection.NewTCPConnection(context, p)
conn.(*connection.TCPConnection).SetState(connection.UnknownState)
return conn.(*connection.TCPConnection), nil
}
return nil, pucontext.PuContextError(pucontext.ErrInvalidNetState, fmt.Sprintf("DestPort %d %s", int(p.DestPort()), p.SourceAddress().String()))
}
// updateTimer updates the timers for the service connections
func updateTimer(c cache.DataStore, hash string, conn *connection.TCPConnection) error {
conn.RLock()
defer conn.RUnlock()
if conn.ServiceConnection && conn.TimeOut > 0 {
return c.SetTimeOut(hash, conn.TimeOut)
}
return nil
}
// releaseFlow releases the flow and updates the conntrack table
func (d *Datapath) releaseFlow(context *pucontext.PUContext, report *policy.FlowPolicy, action *policy.FlowPolicy, tcpPacket *packet.Packet) {
if err := d.appOrigConnectionTracker.Remove(tcpPacket.L4ReverseFlowHash()); err != nil {
zap.L().Debug("Failed to clean cache appOrigConnectionTracker", zap.Error(err))
}
if err := d.sourcePortConnectionCache.Remove(tcpPacket.SourcePortHash(packet.PacketTypeNetwork)); err != nil {
zap.L().Debug("Failed to clean cache sourcePortConnectionCache", zap.Error(err))
}
if err := d.conntrack.UpdateNetworkFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultExternalConnMark,
); err != nil {
zap.L().Error("Failed to update conntrack table",
zap.String("app-conn", tcpPacket.L4ReverseFlowHash()),
zap.Error(err))
}
d.reportReverseExternalServiceFlow(context, report, action, true, tcpPacket)
}
// releaseUnmonitoredFlow releases the flow and updates the conntrack table
func (d *Datapath) releaseUnmonitoredFlow(tcpPacket *packet.Packet) {
zap.L().Debug("Releasing flow", zap.String("flow", tcpPacket.L4FlowHash()))
if err := d.conntrack.UpdateNetworkFlowMark(
tcpPacket.SourceAddress(),
tcpPacket.DestinationAddress(),
tcpPacket.IPProto(),
tcpPacket.SourcePort(),
tcpPacket.DestPort(),
constants.DefaultExternalConnMark,
); err != nil {
if !netlink.IsNotExist(errors.Cause(err)) {
zap.L().Error("Failed to update conntrack table", zap.Error(err))
} else {
zap.L().Debug("Failed to update conntrack table", zap.Error(err))
}
}
}
1
https://gitee.com/emmoblin/trireme-lib.git
git@gitee.com:emmoblin/trireme-lib.git
emmoblin
trireme-lib
trireme-lib
7726874a2b9a

搜索帮助