diff --git a/.gitignore b/.gitignore index e7a79a4b10abb0be18fda3e6f34cee018071f536..18a081ea5b37cbb24c7c42569dd5deecf4cda7c7 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,12 @@ submodule_test # idea files *.idea + +# BPF skeletons +*.skel.* + +# Build artifacts +/bpftool +/gala-gopher +/gopher-ctl +/src/daemon/build diff --git a/src/common/ipc.h b/src/common/ipc.h index 9a500251db5e8f35672826df9102992f2dd1aa86..fc0c3464ec138ba9a547865342372a57a4085d41 100644 --- a/src/common/ipc.h +++ b/src/common/ipc.h @@ -127,6 +127,7 @@ enum probe_type_e { PROBE_SCHED, PROBE_CONTAINER, PROBE_SERMANT, + PROBE_FLOWTRACER, // If you want to add a probe, add the probe type. diff --git a/src/lib/probe/probe_mng.c b/src/lib/probe/probe_mng.c index 58931193df490dafcd415f71c51ea69e178990aa..cddb548ffe5ab4bca98693c0117e892a58a888e0 100644 --- a/src/lib/probe/probe_mng.c +++ b/src/lib/probe/probe_mng.c @@ -60,7 +60,8 @@ struct probe_define_s probe_define[] = { {"ksli", "$gala-gopher-dir/ksliprobe", PROBE_KSLI}, {"sched", "$gala-gopher-dir/schedprobe", PROBE_SCHED}, {"container", "$gala-gopher-dir/cadvisor_probe.py", PROBE_CONTAINER}, - {"sermant", "$gala-gopher-dir/sermant_probe.py", PROBE_SERMANT} + {"sermant", "$gala-gopher-dir/sermant_probe.py", PROBE_SERMANT}, + {"flowtracer", "$gala-gopher-dir/flowtracer", PROBE_FLOWTRACER} // If you want to add a probe, add the probe define. }; diff --git a/src/lib/probe/probe_params_parser.c b/src/lib/probe/probe_params_parser.c index e6bb7ea634aacc19548aab3ed272f969e4b6261d..22f692e21d52a874418c6c047c4983f9d96a52e3 100644 --- a/src/lib/probe/probe_params_parser.c +++ b/src/lib/probe/probe_params_parser.c @@ -604,7 +604,7 @@ struct param_key_s param_keys[] = { {PERF_SAMPLE_PERIOD, {10, 10, 1000, ""}, parser_perf_sample_period, set_default_params_inter_perf_sample_period, JSON_NUMBER}, {MULTI_INSTANCE, {0, 0, 1, ""}, parser_multi_instance, set_default_params_char_multi_instance_flag, JSON_NUMBER}, {NATIVE_STACK, {0, 0, 1, ""}, parser_native_stack, set_default_params_char_native_stack_flag, JSON_NUMBER}, - {CLUSTER_IP_BACKEND, {0, 0, 1, ""}, parser_cluster_ip_backend_flag, set_default_params_char_cluster_ip_backend, JSON_NUMBER}, + {CLUSTER_IP_BACKEND, {0, 0, 2, ""}, parser_cluster_ip_backend_flag, set_default_params_char_cluster_ip_backend, JSON_NUMBER}, {SVG_DIR, {0, 0, 0, "/var/log/gala-gopher/stacktrace"}, parser_svg_dir, set_default_params_str_svg_dir, JSON_STRING}, {FLAME_DIR, {0, 0, 0, "/var/log/gala-gopher/flamegraph"}, parser_flame_dir, set_default_params_str_flame_dir, JSON_STRING}, #if 0 diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index c98a3cd5d934b143835179b47143957c5273ab3c..e1fae633b7f7b1b26591c1ec948a9a75828e04ce 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -41,7 +41,10 @@ #include "event.h" #include "ipc.h" #include "hash.h" +#include "conntrack.h" #include "container.h" +#include "delaying_ring_buffer.h" +#include "flowtracer_reader.h" #include "histogram.h" #define EP_ENTITY_ID_LEN 64 @@ -139,6 +142,7 @@ struct endpoint_probe_s { int tcp_socks_num; int udp_socks_num; time_t last_report; + struct delaying_ring_buffer *drb; }; static volatile sig_atomic_t g_stop; @@ -396,6 +400,76 @@ static int process_tcp_conn_close(struct tcp_socket_s *tcp, struct tcp_socket_ev return 0; } +static void transform_cluster_ip(struct endpoint_probe_s *probe_mng, struct tcp_socket_id_s *tracker) +{ + int transform = ADDR_TRANSFORM_NONE; + + char cluster_ip_backend = probe_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { + return; + } + + struct tcp_connect_s connect; + + connect.role = (tracker->role == TCP_CLIENT) ? 1 : 0; + connect.family = tracker->client_ipaddr.family; + + if (connect.family == AF_INET) { + connect.cip_addr.c_ip = tracker->client_ipaddr.ip; + connect.sip_addr.s_ip = tracker->server_ipaddr.ip; + } else { + memcpy(&(connect.cip_addr), tracker->client_ipaddr.ip6, IP6_LEN); + memcpy(&(connect.sip_addr), tracker->server_ipaddr.ip6, IP6_LEN); + } + connect.c_port = tracker->client_ipaddr.port; + connect.s_port = tracker->server_ipaddr.port; + + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (connect.role != 1) { + return; + } + (void)get_cluster_ip_backend(&connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(&connect); + DEBUG("[EPPROBE] FlowTracer transform: %d\n", transform); + } + + if (!transform) { + return; + } + +#ifdef GOPHER_DEBUG + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; + inet_ntop(connect.family, &tracker->server_ipaddr.ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect.family, &connect.sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(connect.family, &tracker->client_ipaddr.ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect.family, &connect.cip_addr, c_ip2, sizeof(c_ip2)); + DEBUG("[EPPROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + c_ip1, tracker->client_ipaddr.port, s_ip1, tracker->server_ipaddr.port, + c_ip2, connect.c_port, s_ip2, connect.s_port); +#endif + + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker->server_ipaddr.ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker->server_ipaddr.ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker->server_ipaddr.port = connect.s_port; + } + if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker->client_ipaddr.ip = connect.cip_addr.c_ip; + } else { + memcpy(tracker->client_ipaddr.ip6, &(connect.cip_addr.c_ip6), IP6_LEN); + } + tracker->client_ipaddr.port = connect.c_port; + } + + return; +} + #define MAX_ENDPOINT_ENTITES (5 * 1024) static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_event_s* evt) { @@ -412,9 +486,19 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e memcpy(&(id.server_ipaddr), &(evt->server_ipaddr), sizeof(id.server_ipaddr)); id.is_multi = evt->is_multi; id.tgid = (id.is_multi == 1) ? getpgid(evt->tgid) : (evt->tgid); - id.client_ipaddr.port = 0; id.role = evt->role; +#ifdef GOPHER_DEBUG + char s_ip[INET6_ADDRSTRLEN] = {0}, c_ip[INET6_ADDRSTRLEN] = {0}; + (void)inet_ntop(id.server_ipaddr.family, (const void *)&(id.server_ipaddr.ip), s_ip, sizeof(s_ip)); + (void)inet_ntop(id.client_ipaddr.family, (const void *)&(id.client_ipaddr.ip), c_ip, sizeof(c_ip)); + DEBUG("[EPPROBE]: tcp socket event %d, role: %d, observed flow: (%s:%u - %s:%u), tgid: %d\n", + evt->evt, evt->role, c_ip, id.client_ipaddr.port, s_ip, id.server_ipaddr.port, id.tgid); +#endif + + transform_cluster_ip(probe, &id); + id.client_ipaddr.port = 0; // Gala aggregates all client metrics + tcp = lkup_tcp_socket(probe, (const struct tcp_socket_id_s *)&id); if (process_tcp_conn_close(tcp, evt) == 0) { return 0; @@ -516,6 +600,15 @@ err: } static int proc_tcp_sock_evt(void *ctx, void *data, u32 size) +{ + struct endpoint_probe_s *probe = ctx; + if (drb_put(probe->drb, data, size)) { + WARN("[EPPROBE] Not enough space to put event into the ring buffer. Event is discarded.\n"); + } + return 0; +} + +static int proc_tcp_sock_evt_continue(void *ctx, void *data, u32 size) { char *p = data; int remain_size = (int)size, step_size = sizeof(struct tcp_socket_event_s), offset = 0; @@ -939,6 +1032,15 @@ static int poll_endpoint_pb(struct endpoint_probe_s *probe) return 0; } +static void poll_drb(struct endpoint_probe_s *probe) +{ + const struct drb_item *item; + while ((item = drb_look(probe->drb))) { + proc_tcp_sock_evt_continue(probe, item->data, item->size); + drb_pop(probe->drb); + } +} + static char is_report_tmout(struct endpoint_probe_s *probe) { time_t current = time(NULL); @@ -1022,6 +1124,12 @@ int main(int argc, char **argv) return -1; } + g_ep_probe.drb = drb_new(4096, 500); + if (!g_ep_probe.drb) { + ERROR("[ENDPOINTPROBE] Failed to allocate delaying ring buffer.\n"); + return -1; + } + INIT_BPF_APP(endpoint, EBPF_RLIM_LIMITED); INFO("[ENDPOINTPROBE] Successfully started!\n"); @@ -1050,6 +1158,7 @@ int main(int argc, char **argv) sleep(1); } + poll_drb(&g_ep_probe); report_tcp_socks(&g_ep_probe); report_endpoint(&g_ep_probe); } @@ -1061,5 +1170,7 @@ int main(int argc, char **argv) unload_bpf_prog(&(g_ep_probe.prog)); destroy_ipc_body(&(g_ep_probe.ipc_body)); + drb_destroy(g_ep_probe.drb); + return ret; } diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/Makefile b/src/probes/extends/ebpf.probe/src/flowtracer/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..d60f4f25decea923002afa08e7209e6c455f4ffe --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/Makefile @@ -0,0 +1,46 @@ +include ../mk/var.mk +INCLUDES = $(BASE_INC) + +APP := flowtracer + +SRC_CPLUS := $(wildcard *.cpp) +SRC_CPLUS += $(CPLUSFILES) + +BPF_C := $(wildcard *.bpf.c) +DEPS := $(patsubst %.bpf.c, %.bpf.o, $(BPF_C)) +DEPS += $(patsubst %.bpf.c, %.skel.h, $(BPF_C)) +DEPS += $(patsubst %.cpp, %.o, $(SRC_CPLUS)) + +SRC_C := $(filter-out $(BPF_C), $(wildcard *.c)) +SRC_C += $(CFILES) + +.PHONY: all clean install + +all: pre deps app +pre: $(OUTPUT) +deps: $(DEPS) +# build bpf code +%.bpf.o: %.bpf.c + $(CLANG) $(CLANGFLAGS) -target bpf $(INCLUDES) -c $(filter %.c,$^) -o $@ + $(LLVM_STRIP) -g $@ + +# build skel.h +%.skel.h: %.bpf.o + $(BPFTOOL) gen skeleton $< > $@ + +# build c++ files +%.o: %.cpp + $(C++) -c $^ $(CXXFLAGS) $(INCLUDES) -o $@ + +app: $(APP) +%: %.c $(SRC_C) + $(CC) $(CFLAGS) $(patsubst %.cpp, %.o, $(SRC_CPLUS)) $(INCLUDES) $^ $(LDFLAGS) $(LINK_TARGET) -o $@ + @echo $@ "compiling completed." +clean: + rm -rf $(DEPS) + rm -rf $(APP) + +install: + mkdir -p $(INSTALL_DIR) + cp $(APP) $(INSTALL_DIR) + diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/README.md b/src/probes/extends/ebpf.probe/src/flowtracer/README.md new file mode 100644 index 0000000000000000000000000000000000000000..bd5cf93c525e8a67c4cb81b4aa468a99acae8f00 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/README.md @@ -0,0 +1,67 @@ +# FlowTracer + +## What is FlowTracer? + +Consider a network scenario where a client communicates to a server through NAT. The most common +case is Kubernetes with the default Kubeproxy where a pod talks to a virtual service IP and doesn't +know a real destination. Depending on underlying network technology the destination pod has the +symmetric issue - it sees only the translated address and not the address of the original source. + +FlowTracer is an active probe that monitors TCP connections and transfers packet source address +in-band within the connection. At a receiver end the probe extracts injected address and maintains +association from the observed address tuple to the received one. + +## How to run FlowTracer + +FlowTracer is implemented as a Gala-Gopher plugin, it can be started/stopped using REST API. +Data retrieval is integrated into TCP probe and if enabled it replaces an observed address with +an original one. + +### Prerequisites + +FlowTracer BPF program is attached to cgroup2 mounted into a host file system. It is possible +to mount all cgroups or be more specific, for example to enable FlowTracer for individual processes +or containers. + +To mount cgroup2 in OpenEuler: + 1. Create a mount point: `sudo mkdir /mnt/cgroup2`. + 2. Mount cgroup2 to the mount point: `mount -t cgroup2 none /mnt/cgroup2/`. + +### To enable FlowTracer + +FlowTracer can be controlled via Gala-Gopher's REST API (in the example below the API runs at the local host): + 1. Enable FlowTracer: `curl -X PUT -d json='{"state":"running"}' http://localhost:9999/flowtracer`. + 2. Enable FlowTracer data mapping in tcp probe: `curl -X PUT -d json='{"cmd":{"probe":["tcp_stats"]},"snoopers":{"proc_name":[{"comm":"^python.*","cmdline":""}]},"params":{"report_event":0,"report_period":10,"cluster_ip_backend":2},"state":"running"}' http://localhost:9999/tcp`. Note that this command enables only 1 of TCP probes for processes that name start with python. + +### To disable FlowTracer + +FlowTracer can be controlled via Gala-Gopher's REST API: + 1. Stop FlowTracer: `curl -X PUT -d json='{"state":"stopped"}' http://localhost:9999/flowtracer`. + 2. Disable FlowTracer data mapping in tcp probe: `curl -X PUT -d json='{"cmd":{"probe":["tcp_stats"]},"snoopers":{"proc_name":[{"comm":"^python.*","cmdline":""}]},"params":{"report_event":0,"report_period":10,"cluster_ip_backend":0},"state":"running"}' http://localhost:9999/tcp`. + +### Example + +There are 2 pods running on different nodes in Kubernetes cluster. Pod A is a client that sends HTTP request to a server running in Pod B. +The example shows metric attributes that are collected by Gala-Gopher (common attributes are removed for brevity). + +#### Metrics without FlowTracer + +Client-side - the client connects to a virtual service IP: +``` +gala_gopher_tcp_link_tx_bytes{role="client",client_ip="10.0.0.131",server_ip="10.247.204.240",server_port="8000",pod="app/a-664d757c7b-q6ds2"} 110 1704451439000 +``` +Server-side - the server receives packets from a translated address (host gateway): +``` +gala_gopher_tcp_link_rx_bytes{role="server",client_ip="192.168.3.14",server_ip="10.0.0.5",server_port="8000",pod="app/b-6758f884b4-7mgbm"} 110 1704451440000 +``` + +#### Metrics with FlowTracer + +Client-side - the virtual service IP is resolved to the address of the server: +``` +gala_gopher_tcp_link_tx_bytes{role="client",client_ip="10.0.0.131",server_ip="10.0.0.5",server_port="8000",pod="app/a-664d757c7b-q6ds2"} 110 1704451981000 +``` +Server-side - the real client address is visible too: +``` +gala_gopher_tcp_link_rx_bytes{role="server",client_ip="10.0.0.131",server_ip="10.0.0.5",server_port="8000",pod="app/b-6758f884b4-7mgbm"} 110 1704451982000 +``` diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c new file mode 100644 index 0000000000000000000000000000000000000000..fcaa652e33bcf22badd9494a0d295463d55c1222 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c @@ -0,0 +1,279 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include + +#include "flowtracer.h" + +char LICENSE[] SEC("license") = "Dual BSD/GPL"; + +// from `tools/testing/selftests/bpf/test_tcp_hdr_options.h` +#define CG_OK 1 +#define CG_ERR 0 +#define ERR2CG(err) (err)? CG_ERR: CG_OK + +#define NFCT_INFOMASK 7UL +#define NFCT_PTRMASK ~(NFCT_INFOMASK) +#define CTINFO2DIR(ctinfo) ((ctinfo) >= IP_CT_IS_REPLY ? IP_CT_DIR_REPLY : IP_CT_DIR_ORIGINAL) + +// FlowTracer data: maps from observed TCP flow tuple into received remote original address +#define MAX_ENTRIES_FLOW 65536 +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, struct flow_key); + __type(value, struct flow_data); + __uint(max_entries, MAX_ENTRIES_FLOW); + __uint(pinning, LIBBPF_PIN_BY_NAME); // map is located at /sys/fs/bpf/flowtracer_data +} flowtracer_data SEC(".maps"); + +#ifdef GOPHER_DEBUG +// ring buffer to send events to user-space (for debug purposes) +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024); +} ring_buffer SEC(".maps"); + +static void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_data *flow_data) +{ + struct flow_log *flow_log = bpf_ringbuf_reserve(&ring_buffer, sizeof(struct flow_log), 0); + if (!flow_log) { + return; + } + + flow_log->op = op; + flow_log->key = *flow_key; + if (flow_data) { + flow_log->data = *flow_data; + } + bpf_ringbuf_submit(flow_log, 0); +} +#else +static inline void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_data *flow_data) {} // NOP +#endif + +static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops, __u32 extra) +{ + long err = bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags | extra); + return ERR2CG(err); +} + +static int handle_hdr_opt_len(struct bpf_sock_ops *skops) +{ + const int bytes_to_reserve = sizeof(struct tcp_opt_source_info); + int err = bpf_reserve_hdr_opt(skops, bytes_to_reserve, 0); + return ERR2CG(err); +} + +static int handle_write_hdr_opt(struct bpf_sock_ops *skops) +{ + struct tcp_opt_source_info opt = {0}; + opt.kind = TCP_OPT_SOURCE_INFO_KIND; + opt.len = sizeof(struct tcp_opt_source_info); + opt.address.ip4 = skops -> local_ip4; /* stored in network byte order (bpf.h) */ + opt.address.port = (__be16)bpf_htons((__u16)skops -> local_port); /* stored in host byte order (bpf.h) */ + + int err = bpf_store_hdr_opt(skops, &opt, sizeof(opt), 0); + return ERR2CG(err); +} + +static int handle_hdr_parse(struct bpf_sock_ops *skops) +{ + struct tcp_opt_source_info opt = {0}; + opt.kind = TCP_OPT_SOURCE_INFO_KIND; + + int err = bpf_load_hdr_opt(skops, &opt, sizeof(opt), 0); + if (err < 0) { + return CG_ERR; + } + + __be32 local_ip4 = skops->local_ip4; /* stored in network byte order (bpf.h) */ + __be16 local_port = (__be16)bpf_htons((__u16)skops->local_port); /* stored in host byte order (bpf.h) */ + __be32 remote_ip4 = skops->remote_ip4; /* stored in network byte order (bpf.h) */ + __be16 remote_port = (__be16)(skops->remote_port >> 16); /* stored in network byte order (bpf.h) - high 16 bits of 32 bits variable */ + __be32 original_remote_ip4 = opt.address.ip4; + __be16 original_remote_port = opt.address.port; /* network byte order (as written in handle_write_hdr_opt)*/ + + if (remote_ip4 == original_remote_ip4 && remote_port == original_remote_port) { + return CG_OK; // keep only flows with changed address + } + + // check if the flow is in the flowtracer map + struct flow_key flow_key = {0}; + flow_key.local_ip4 = local_ip4; + flow_key.local_port = local_port; + flow_key.remote_ip4 = remote_ip4; + flow_key.remote_port = remote_port; + flow_key.l4_proto = IPPROTO_TCP; + + struct flow_data *flow_ptr = bpf_map_lookup_elem(&flowtracer_data, &flow_key); + if (!flow_ptr) { + // add a new flow + struct flow_data flow = {0}; + flow.original_remote_ip4 = original_remote_ip4; + flow.original_remote_port = original_remote_port; + bpf_map_update_elem(&flowtracer_data, &flow_key, &flow, BPF_NOEXIST); + + // notify userspace that a new flow was added (debug purposes only) + notify(FLOW_LOG_ADD, &flow_key, &flow); + } + + return CG_OK; +} + +SEC("sockops") +int flowtracer_sockops_fn(struct bpf_sock_ops *skops) +{ + __u32 op = skops -> op; + + switch (op) { + case BPF_SOCK_OPS_TCP_CONNECT_CB: + case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: + // enable callbacks + return set_hdr_cb_flags(skops, BPF_SOCK_OPS_PARSE_UNKNOWN_HDR_OPT_CB_FLAG | BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG); + case BPF_SOCK_OPS_HDR_OPT_LEN_CB: + // tell the kernel to allocate space for the header + return handle_hdr_opt_len(skops); + case BPF_SOCK_OPS_WRITE_HDR_OPT_CB: + // give the kernel tcp header option contents + return handle_write_hdr_opt(skops); + case BPF_SOCK_OPS_PARSE_HDR_OPT_CB: + // parse tcp header option + return handle_hdr_parse(skops); + } + + return CG_OK; +} + +SEC("kprobe/__nf_conntrack_confirm") +int BPF_KPROBE(nf_conntrack_confirm, struct sk_buff *skb) +{ + // Function '__nf_conntrack_confirm' is called when conntrack confirms a connection. For outgoing connections + // this happens immediately upon establishment (sending of SYN packet) + unsigned long nfct; + unsigned int ctinfo; + struct nf_conn *ct; + enum ip_conntrack_dir dir; + struct nf_conntrack_tuple_hash sth[2]; + + // Check if the current packet is IPv4/TCP + void *skb_head = BPF_CORE_READ(skb, head); + __u16 l3_offset = BPF_CORE_READ(skb, network_header); + + struct iphdr *iph = (struct iphdr *)(skb_head + l3_offset); + + if (BPF_CORE_READ_BITFIELD_PROBED(iph, version) != 4) + return 0; // only IPv4 is supported + + if (BPF_CORE_READ(iph, protocol) != IPPROTO_TCP) { + return 0; // only TCP is supported + } + + // Init conntrack info + BPF_CORE_READ_INTO(&nfct, skb, _nfct); + ctinfo = nfct & NFCT_INFOMASK; + ct = (struct nf_conn *)(nfct & NFCT_PTRMASK); + dir = CTINFO2DIR(ctinfo); + BPF_CORE_READ_INTO(&sth, ct, tuplehash); + + // collect conntrack entry + __u32 src_ip_f = sth[dir].tuple.src.u3.ip; + __be16 src_port_f = sth[dir].tuple.src.u.tcp.port; + __u32 dst_ip_f = sth[dir].tuple.dst.u3.ip; + __be16 dst_port_f = sth[dir].tuple.dst.u.tcp.port; + __u32 src_ip_b = sth[!dir].tuple.src.u3.ip; + __be16 src_port_b = sth[!dir].tuple.src.u.tcp.port; + __u32 dst_ip_b = sth[!dir].tuple.dst.u3.ip; + __be16 dst_port_b = sth[!dir].tuple.dst.u.tcp.port; + + if (dst_ip_b == src_ip_f && dst_port_b == src_port_f) { + return 0; // address is not translated, ignore + } + + // Socket view -> Packet view + // We can use conntrack information to map an observed socket address to a real destination address + struct flow_key flow_key = {0}; // as observed at socket level of a client + flow_key.local_ip4 = src_ip_f; + flow_key.local_port = src_port_f; + flow_key.remote_ip4 = dst_ip_f; + flow_key.remote_port = dst_port_f; + flow_key.l4_proto = IPPROTO_TCP; + + struct flow_data flow_data = {0}; // real address of a server + flow_data.original_remote_ip4 = src_ip_b; + flow_data.original_remote_port = src_port_b; + + bpf_map_update_elem(&flowtracer_data, &flow_key, &flow_data, BPF_NOEXIST); + notify(FLOW_LOG_ADD, &flow_key, &flow_data); + return 0; +} + +SEC("kprobe/nf_conntrack_destroy") +int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) +{ + // Function nf_conntrack_destroy is called when conntrack module deletes connection entry from its table, + // so it is a good time to delete the entry from flowtracer map too. + + // Function nf_conntrack_destroy gets a pointer to nf_conn.ct_general, which is the same + // as pointer to the whole structure nf_conn + struct nf_conntrack_tuple_hash th[2]; + BPF_CORE_READ_INTO(&th, (struct nf_conn *)nfct, tuplehash); + + if (th[IP_CT_DIR_ORIGINAL].tuple.dst.protonum != IPPROTO_TCP) { + return 0; + } + + // Conntrack tracks 4-tuple for original packets and 4-tuple for reply packets in tuplehash array. + // If a connection is originated from the current host (= client), then 4-tuple of original packets + // is the same as observed at the socket level. + // Example of k8s pod talking to virtual service IP: + // * original: (10.0.0.159, 34568, 10.247.204.240, 8000) --> matches client's socket view + // * reply: (10.0.0.28, 8000, 192.168.3.14, 16256) + // If a connection is incoming (= server), then 4-tuple of reply packets match the one observed + // at the socket level. + // Example of an externally accessible service running in a docker container: + // * original: (10.221.55.32, 57911, 10.82.1.33, 8001) + // * reply: (172.17.0.2, 8001, 10.221.55.32, 57911) --> matches server's socket view + + struct flow_key flow_key = {0}; + flow_key.local_ip4 = th[IP_CT_DIR_ORIGINAL].tuple.src.u3.ip; + flow_key.local_port = th[IP_CT_DIR_ORIGINAL].tuple.src.u.tcp.port; + flow_key.remote_ip4 = th[IP_CT_DIR_ORIGINAL].tuple.dst.u3.ip; + flow_key.remote_port = th[IP_CT_DIR_ORIGINAL].tuple.dst.u.tcp.port; + flow_key.l4_proto = IPPROTO_TCP; + + // Try to delete entry as if it was a client socket + int err = bpf_map_delete_elem(&flowtracer_data, &flow_key); + if (!err) { + // notify userspace that a flow was deleted (debug purposes only) + notify(FLOW_LOG_DEL, &flow_key, NULL); + } + + flow_key.local_ip4 = th[IP_CT_DIR_REPLY].tuple.src.u3.ip; + flow_key.local_port = th[IP_CT_DIR_REPLY].tuple.src.u.tcp.port; + flow_key.remote_ip4 = th[IP_CT_DIR_REPLY].tuple.dst.u3.ip; + flow_key.remote_port = th[IP_CT_DIR_REPLY].tuple.dst.u.tcp.port; + + // Try to delete entry as if it was a server socket + err = bpf_map_delete_elem(&flowtracer_data, &flow_key); + if (!err) { + // notify userspace that a flow was deleted (debug purposes only) + notify(FLOW_LOG_DEL, &flow_key, NULL); + } + + return 0; +} diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c new file mode 100644 index 0000000000000000000000000000000000000000..8533f03c6b381cd8f000f383ea02f2714419c166 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c @@ -0,0 +1,175 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "bpf.h" +#include "flowtracer.h" +#include "flowtracer.skel.h" + +static volatile sig_atomic_t stop; + +static void sig_handler(int signo) +{ + stop = 1; +} + +static void usage(char *pname) +{ + printf("USAGE:\n %s \n", pname); + printf("\tLoad and attach FlowTracer BPF program to cgroup2 mounted at the specified path\n"); + exit(1); +} + +#ifdef GOPHER_DEBUG +static int handle_evt(void *ctx, void *notification, size_t sz) +{ + struct flow_log *flow_log = notification; + + char local_ip4[16], remote_ip4[16], original_remote_ip4[16]; + inet_ntop(AF_INET, &flow_log->key.local_ip4, local_ip4, sizeof(local_ip4)); + inet_ntop(AF_INET, &flow_log->key.remote_ip4, remote_ip4, sizeof(remote_ip4)); + inet_ntop(AF_INET, &flow_log->data.original_remote_ip4, original_remote_ip4, sizeof(original_remote_ip4)); + + char log_buf[40] = "\0"; + if (flow_log->op == FLOW_LOG_ADD) { + snprintf(log_buf, sizeof(log_buf), + " original_remote=%s:%u", + original_remote_ip4, bpf_ntohs(flow_log->data.original_remote_port)); + } + DEBUG("[FlowTracer:Event] " + "op=%s" + " local=%s:%u" + " remote=%s:%u" + "%s\n", + (flow_log->op == FLOW_LOG_ADD)? "ADD": "DEL", + local_ip4, bpf_ntohs(flow_log->key.local_port), + remote_ip4, bpf_ntohs(flow_log->key.remote_port), + log_buf + ); + return 0; +} +#endif + +int main(int argc, char **argv) +{ + char *cg_path; + int cg_fd; + struct flowtracer_bpf *flowtracer_skel; + int err = 0; + struct ring_buffer *ring_buffer = NULL; + + if (argc < 2) { + cg_path = FLOWTRACER_CGROUP2_PATH; + INFO("[FlowTracer] Cgroup2 mount path is not specified, attaching to the default: %s\n", cg_path); + } else if (!strncmp(argv[argc - 1], "-h", 3)) { + usage("flowtracer"); + } else { + cg_path = argv[argc - 1]; + } + + /* Open cgroup path */ + cg_fd = open(cg_path, O_DIRECTORY, O_RDONLY); + if (cg_fd < 0) { + ERROR("[FlowTracer] Error opening cgroup2 path %s: %d (%s)\n", cg_path, cg_fd, strerror(errno)); + return 1; + } + + INIT_BPF_APP(flowtracer, EBPF_RLIM_LIMITED); + + /* Open load and verify BPF application */ + flowtracer_skel = flowtracer_bpf__open(); + if (!flowtracer_skel) { + ERROR("[FlowTracer] Failed to open BPF skeleton\n"); + goto cleanup; + } + + /* Pin FlowTracer BPF map to the file system */ + err = bpf_map__set_pin_path(flowtracer_skel->maps.flowtracer_data, FLOWTRACER_DATA_MAP_PATH); + if (err) { + ERROR("[FlowTracer] Failed to pin FlowTracer BPF map at path %s: %d (%s)\n", FLOWTRACER_DATA_MAP_PATH, err, strerror(errno)); + goto cleanup; + } + + /* Load BPF program */ + if ((err = flowtracer_bpf__load(flowtracer_skel))) { + ERROR("[FlowTracer] Failed to load BPF skeleton: %d (%s)\n", err, strerror(errno)); + goto cleanup; + } + + /* Create signal handler */ + if (signal(SIGINT, sig_handler) == SIG_ERR || signal(SIGTERM, sig_handler) == SIG_ERR) { + ERROR("[FlowTracer] Failed to set a signal handler: %s\n", strerror(errno)); + goto cleanup; + } + + /* Attach program to cgroup */ + int prog_fd = bpf_program__fd(flowtracer_skel -> progs.flowtracer_sockops_fn); + err = bpf_prog_attach(prog_fd, cg_fd, BPF_CGROUP_SOCK_OPS, 0); + if (err) { + ERROR("[FlowTracer] Failed to attach BPF program to cgroup2 mounted at %s: %d (%s)\n", cg_path, err, strerror(errno)); + goto cleanup; + } + + /* Attach tracepoint handler */ + err = flowtracer_bpf__attach(flowtracer_skel); + if (err) { + ERROR("[FlowTracer] Failed to attach BPF tracepoint: %d (%s)\n", err, strerror(errno)); + goto cleanup; + } + +#ifdef GOPHER_DEBUG + /* Setup ring buffer to poll FlowTracer map operations for debug purposes */ + ring_buffer = ring_buffer__new(bpf_map__fd(flowtracer_skel->maps.ring_buffer), handle_evt, NULL, NULL); + if (!ring_buffer) { + err = 1; + ERROR("[FlowTracer] Failed to create ring buffer!\n"); + goto cleanup; + } +#endif + + INFO("[FlowTracer] Started successfully\n"); + + /* Process events */ + while (!stop) { +#ifdef GOPHER_DEBUG + err = ring_buffer__poll(ring_buffer, THOUSAND); + if (err < 0 && err != -EINTR) { + ERROR("[FlowTracer] Error polling ring buffer: %d (%s)\n", err, strerror(errno)); + break; + } +#endif + sleep(1); + } + +cleanup: + bpf_prog_detach(cg_fd, BPF_CGROUP_SOCK_OPS); + flowtracer_bpf__detach(flowtracer_skel); + ring_buffer__free(ring_buffer); + flowtracer_bpf__destroy(flowtracer_skel); + close(cg_fd); + // Delete BPF map (pinned at the default libbpf location) + unlink("/sys/fs/bpf/flowtracer_data"); + INFO("[FlowTracer] Cleanup is completed\n"); + return -err; +} diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h new file mode 100644 index 0000000000000000000000000000000000000000..41c3b84598aef9a725539c40717e53aea1e1ec7c --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h @@ -0,0 +1,44 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_H +#define __FLOWTRACER_H + +#include +#include "flowtracer_common.h" + +// TCP option definition +#define TCP_OPT_SOURCE_INFO_KIND 0x55 +struct tcp_opt_source_info { + __u8 kind; + __u8 len; + struct address_info { + __be32 ip4; + __be16 port; + } __attribute__((packed)) address; +} __attribute__((packed)); + +// debug event definition +enum flow_log_op { + FLOW_LOG_ADD = 1, + FLOW_LOG_DEL = 2 +}; + +struct flow_log { + enum flow_log_op op; + struct flow_key key; + struct flow_data data; +}; + +#endif /* __FLOWTRACER_H */ diff --git a/src/probes/extends/ebpf.probe/src/include/conntrack.h b/src/probes/extends/ebpf.probe/src/include/conntrack.h index f8b27047d6500571a5ef59697f3211a56e09bcf5..49e55b2275bf4a0e885de2869fdb4591ca950025 100644 --- a/src/probes/extends/ebpf.probe/src/include/conntrack.h +++ b/src/probes/extends/ebpf.probe/src/include/conntrack.h @@ -17,6 +17,10 @@ #include "common.h" +#define ADDR_TRANSFORM_NONE 0 +#define ADDR_TRANSFORM_CLIENT 1 +#define ADDR_TRANSFORM_SERVER 2 + struct tcp_connect_s { union { u32 c_ip; diff --git a/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h new file mode 100644 index 0000000000000000000000000000000000000000..4ddc24ef76e9fb270adb3709a46d0ca06611141f --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h @@ -0,0 +1,40 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-03-15 + * Description: A circular (ring) buffer with a time-based delay of reading operations + ******************************************************************************/ +#ifndef __DELAYING_RING_BUFFER_H__ +#define __DELAYING_RING_BUFFER_H__ + +#include + +struct drb_item { + void *data; + int size; + struct timespec creation_time; +}; + +struct delaying_ring_buffer { + struct drb_item *storage; + int writer_idx; + int reader_idx; + int capacity; + int delay_ms; +}; + +struct delaying_ring_buffer *drb_new(int capacity, int delay_ms); +void drb_destroy(struct delaying_ring_buffer *drb); +int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size); +const struct drb_item *drb_look(struct delaying_ring_buffer *drb); +int drb_pop(struct delaying_ring_buffer *drb); + +#endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h b/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h new file mode 100644 index 0000000000000000000000000000000000000000..231a1b6b0c6fc4480e0a13314ca829d707296514 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h @@ -0,0 +1,36 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_COMMON_H__ +#define __FLOWTRACER_COMMON_H__ + +#include + +#define FLOWTRACER_DATA_MAP_PATH "/sys/fs/bpf/flowtracer_data" +#define FLOWTRACER_CGROUP2_PATH "/mnt/cgroup2" + +struct flow_key { + __be32 local_ip4; + __be32 remote_ip4; + __be16 local_port; + __be16 remote_port; + __u8 l4_proto; +}; + +struct flow_data { + __be32 original_remote_ip4; + __be16 original_remote_port; +}; + +#endif diff --git a/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h b/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h new file mode 100644 index 0000000000000000000000000000000000000000..24c87649d27b1bf99643a5030139f35b645a2a7d --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h @@ -0,0 +1,22 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_READER_H__ +#define __FLOWTRACER_READER_H__ + +#include "conntrack.h" + +int lookup_flowtracer(struct tcp_connect_s *connect); + +#endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index aa85f8fd0f722d1eecc5ace305c56e70aa354525..5d8940122f1e1ca758fcb81085c0c509fa1df1dc 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -27,10 +27,15 @@ #undef BPF_PROG_USER #endif +#ifdef GOPHER_DEBUG +#include +#endif + #include "bpf.h" #include "container.h" #include "histogram.h" #include "conntrack.h" +#include "flowtracer_reader.h" #include "include/conn_tracker.h" #include "include/connect.h" #include "protocol/expose/protocol_parser.h" @@ -350,14 +355,10 @@ static struct l7_api_statistic_s* create_l7_api_statistic(const struct api_stats static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* tracker) { - int transform = 0; - - if (l7_mng->ipc_body.probe_param.cluster_ip_backend == 0) { - return; - } + int transform = ADDR_TRANSFORM_NONE; - // Only transform Kubernetes cluster IP backend for the client TCP connection. - if (tracker->l4_role != L4_CLIENT) { + char cluster_ip_backend = l7_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { return; } @@ -376,17 +377,49 @@ static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* connect.c_port = tracker->open_info.client_addr.port; connect.s_port = tracker->open_info.server_addr.port; - (void)get_cluster_ip_backend(&connect, &transform); + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (tracker->l4_role != L4_CLIENT) { + return; + } + (void)get_cluster_ip_backend(&connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(&connect); + DEBUG("[L7PROBE] FlowTracer transform: %d\n", transform); + } + if (!transform) { return; } - if (connect.family == AF_INET) { - tracker->open_info.server_addr.ip = connect.sip_addr.s_ip; - } else { - memcpy(tracker->open_info.server_addr.ip6, &(connect.sip_addr), IP6_LEN); +#ifdef GOPHER_DEBUG + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; + inet_ntop(connect.family, &tracker->open_info.server_addr.ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect.family, &connect.sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(connect.family, &tracker->open_info.client_addr.ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect.family, &connect.cip_addr, c_ip2, sizeof(c_ip2)); + DEBUG("[L7PROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + c_ip1, tracker->open_info.client_addr.port, s_ip1, tracker->open_info.server_addr.port, + c_ip2, connect.c_port, s_ip2, connect.s_port); +#endif + + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker->open_info.server_addr.ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker->open_info.server_addr.ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker->open_info.server_addr.port = connect.s_port; + } + if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker->open_info.client_addr.ip = connect.cip_addr.c_ip; + } else { + memcpy(tracker->open_info.client_addr.ip6, &(connect.cip_addr.c_ip6), IP6_LEN); + } + tracker->open_info.client_addr.port = connect.c_port; } - tracker->open_info.server_addr.port = connect.s_port; + return; } @@ -428,6 +461,16 @@ static int proc_conn_ctl_msg(struct l7_mng_s *l7_mng, struct conn_ctl_s *conn_ct // Client port just used for cluster IP address translation. Here, client port MUST set 0. tracker->open_info.client_addr.port = 0; } +#ifdef GOPHER_DEBUG + char s_ip[IP6_STR_LEN], c_ip[IP6_STR_LEN]; + s_ip[0] = 0; + (void)inet_ntop(tracker->open_info.server_addr.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip, sizeof(s_ip)); + c_ip[0] = 0; + (void)inet_ntop(tracker->open_info.client_addr.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip, sizeof(c_ip)); + DEBUG("[L7PROBE.proc_conn_ctl_msg]: CONN_EVT_OPEN: tracker flow (%s:%u - %s:%u)\n", + c_ip, tracker->open_info.client_addr.port, s_ip, tracker->open_info.server_addr.port); +#endif + } break; } @@ -1010,6 +1053,15 @@ void l7_parser(void *ctx) } int tracker_msg(void *ctx, void *data, u32 size) +{ + struct l7_mng_s *l7_mng = ctx; + if (drb_put(l7_mng->drb, data, size)) { + WARN("[L7PROBE] Not enough space to put event into the ring buffer. Event is discarded.\n"); + } + return 0; +} + +int tracker_msg_continue(void *ctx, void *data, u32 size) { char *p = data; struct l7_mng_s *l7_mng = ctx; diff --git a/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h b/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h index c129eead7e238f14291a1a5f8a606c167946f9e2..1a3e341c9697299b889f4c7e3cbac08bc128c6d3 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h +++ b/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h @@ -195,6 +195,7 @@ void l7_parser(void *ctx); void report_l7(void *ctx); int tracker_msg(void *ctx, void *data, u32 size); +int tracker_msg_continue(void *ctx, void *data, u32 size); #endif diff --git a/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h b/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h index a7f96038aa5fb51cd26787cd2e587c419f06fe88..055660152945075630ecd912062c55bd54c7920d 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h +++ b/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h @@ -15,6 +15,7 @@ #ifndef __L7_COMMON_H__ #define __L7_COMMON_H__ +#include "delaying_ring_buffer.h" #include "ipc.h" #include "include/filter.h" #include "include/connect.h" @@ -51,6 +52,7 @@ struct l7_mng_s { struct l7_link_s *l7_links; struct conn_data_s conn_data; struct java_proc_s *java_procs; + struct delaying_ring_buffer *drb; }; #endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c index be9fd09b4be3f6f56bce66a42425303f3edb88ab..af093339f0046d885d01d7fb7b8442b56edfe470 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c @@ -350,6 +350,15 @@ static int poll_l7_pb(struct l7_ebpf_prog_s* ebpf_progs) return 0; } +static void poll_drb(struct l7_mng_s *l7_mng) +{ + const struct drb_item *item; + while ((item = drb_look(l7_mng->drb))) { + tracker_msg_continue(l7_mng, item->data, item->size); + drb_pop(l7_mng->drb); + } +} + static void save_filter_proto(int fd, u32 proto) { u32 key = 0; @@ -380,6 +389,12 @@ int main(int argc, char **argv) (void)memset(l7_mng, 0, sizeof(struct l7_mng_s)); + l7_mng->drb = drb_new(4096, 500); + if (!l7_mng->drb) { + ERROR("[L7PROBE] Failed to allocate delaying ring buffer.\n"); + goto err; + } + int msq_id = create_ipc_msg_queue(IPC_EXCL); if (msq_id < 0) { ERROR("[L7PROBE]: Create ipc msg que failed.\n"); @@ -441,6 +456,7 @@ int main(int argc, char **argv) ERROR("[L7Probe]: perf poll failed(%d).\n", ret); break; } + poll_drb(l7_mng); l7_parser(l7_mng); report_l7(l7_mng); } else { @@ -454,5 +470,7 @@ err: l7_unload_probe_jsse(l7_mng); unload_l7_prog(l7_mng); destroy_ipc_body(&(l7_mng->ipc_body)); + drb_destroy(l7_mng->drb); + INFO("[L7PROBE] Cleanup is completed"); return 0; } diff --git a/src/probes/extends/ebpf.probe/src/lib/conntrack.c b/src/probes/extends/ebpf.probe/src/lib/conntrack.c index 81ea1749192de52d242b173686aee92763628246..0085cd52cf09f5eba571d322a0f38f1995eab5a3 100644 --- a/src/probes/extends/ebpf.probe/src/lib/conntrack.c +++ b/src/probes/extends/ebpf.probe/src/lib/conntrack.c @@ -252,7 +252,7 @@ int get_cluster_ip_backend(struct tcp_connect_s *connect, int *transform) char line[LINE_BUF_LEN]; char command[COMMAND_LEN]; - *transform = 0; + *transform = ADDR_TRANSFORM_NONE; // Only transform Kubernetes cluster IP backend for the client TCP connection. if (connect->role == 0) { return 0; @@ -295,7 +295,7 @@ int get_cluster_ip_backend(struct tcp_connect_s *connect, int *transform) ret = __dnat_op((const struct tcp_conntrack_s *)conntrack, connect); if (ret == 0) { - *transform = 1; + *transform = ADDR_TRANSFORM_SERVER; __free_conntrack_tcp(conntrack); break; } diff --git a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c new file mode 100644 index 0000000000000000000000000000000000000000..42b564a1b94213bcd2fcfdac97c8afb04ca0c43a --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c @@ -0,0 +1,100 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-03-15 + * Description: A circular (ring) buffer with a time-based delay of reading operations + ******************************************************************************/ +#include +#include +#include +#include "common.h" +#include "delaying_ring_buffer.h" + +struct delaying_ring_buffer *drb_new(int capacity, int delay_ms) +{ + struct delaying_ring_buffer *drb = malloc(sizeof(struct delaying_ring_buffer)); + if (!drb) { + return NULL; + } + drb->reader_idx = drb->writer_idx = 0; + drb->capacity = capacity; + drb->delay_ms = delay_ms; + drb->storage = malloc(sizeof(struct drb_item) * capacity); + INFO("[DRB] Allocated a delaying ring buffer with capacity %d and delay %d ms\n", capacity, delay_ms); + return drb; +} + +void drb_destroy(struct delaying_ring_buffer *drb) +{ + if (drb) { + if (drb->storage) { + for (int idx = drb->reader_idx; idx < drb->writer_idx; idx++) { + free(drb->storage[idx].data); + } + free(drb->storage); + } + free(drb); + } + INFO("[DRB] Delaying ring buffer destroyed\n"); +} + +int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size) +{ + int writer_idx = drb->writer_idx; + if ((writer_idx + 1) % drb->capacity == drb->reader_idx) { + return -1; // storage is full + } + + char *copy = malloc(size); + memcpy(copy, data, size); + drb->storage[writer_idx].data = copy; + drb->storage[writer_idx].size = size; + clock_gettime(CLOCK_MONOTONIC, &drb->storage[writer_idx].creation_time); + + drb->writer_idx = (drb->writer_idx + 1) % drb->capacity; + return 0; +} + +static void timespec_diff(struct timespec *now, struct timespec *past, struct timespec *diff) { + diff->tv_sec = (now)->tv_sec - (past)->tv_sec; + diff->tv_nsec = (now)->tv_nsec - (past)->tv_nsec; + if (diff->tv_nsec < 0) { + --diff->tv_sec; + diff->tv_nsec += 1000000000; + } +} + +const struct drb_item *drb_look(struct delaying_ring_buffer *drb) +{ + if (drb->reader_idx == drb->writer_idx) { + return NULL; + } + struct timespec now, diff; + clock_gettime(CLOCK_MONOTONIC, &now); + + struct drb_item *item = &drb->storage[drb->reader_idx]; + + timespec_diff(&now, &item->creation_time, &diff); + if (diff.tv_sec == 0 && (diff.tv_nsec / 1000000) < drb->delay_ms) { + return NULL; + } + return &drb->storage[drb->reader_idx]; +} + +int drb_pop(struct delaying_ring_buffer *drb) { + if (drb->writer_idx == drb->reader_idx) { + return -1; + } + struct drb_item *item = &drb->storage[drb->reader_idx]; + free(item->data); + drb->reader_idx = (drb->reader_idx + 1) % drb->capacity; + return 0; +} diff --git a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c new file mode 100644 index 0000000000000000000000000000000000000000..f184e7bc1e5786e1847bd5feb3a6dd523aebed3d --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c @@ -0,0 +1,116 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include "common.h" +#include "flowtracer_common.h" +#include "flowtracer_reader.h" + +static int g_flow_tracer_map_fd = -1; + +int lookup_flowtracer(struct tcp_connect_s *connect) { + __u32 local_ip4, remote_ip4; + __u16 local_port, remote_port; + int fd = g_flow_tracer_map_fd; + + __u32 s_ip, c_ip; + if (connect->family == AF_INET) { + // IPv4 + s_ip = connect->sip_addr.s_ip; + c_ip = connect->cip_addr.c_ip; + } else if (connect->family == AF_INET6 && + NIP6_IS_ADDR_V4MAPPED((unsigned short *)connect->sip_addr.s_ip6) && NIP6_IS_ADDR_V4MAPPED((unsigned short *)connect->cip_addr.c_ip6)) { + // IPv4 address mapped to IPv6 + s_ip = *((__u32 *)(connect->sip_addr.s_ip6 + IP4_BYTE_1_IN_IP6)); + c_ip = *((__u32 *)(connect->cip_addr.c_ip6 + IP4_BYTE_1_IN_IP6)); + DEBUG("[lookup_flowtracer] mapped IPv4: server %x client %x\n", s_ip, c_ip); + } else { + // pure IPv6 is not supported + return ADDR_TRANSFORM_NONE; + } + + if (connect->role == 0) { + // server: local is server, remote is client + local_ip4 = s_ip; + local_port = connect->s_port; + remote_ip4 = c_ip; + remote_port = connect->c_port; + } else { + // client: local is client, remote is server + local_ip4 = c_ip; + local_port = connect->c_port; + remote_ip4 = s_ip; + remote_port = connect->s_port; + } + + if (g_flow_tracer_map_fd < 0) { + // Try to retrieve map fd + fd = bpf_obj_get(FLOWTRACER_DATA_MAP_PATH); + if (fd <= 0) { // map doesn't exist + DEBUG("[lookup_flowtracer] Failed to open FlowTracer map: %s\n", strerror(errno)); + return ADDR_TRANSFORM_NONE; + } + g_flow_tracer_map_fd = fd; + DEBUG("[lookup_flowtracer] FlowTracer map is opened successfully, fd: %d\n", g_flow_tracer_map_fd); + } + + struct flow_key key = {0}; // alignment gaps must be filled with 0 + key.local_ip4 = local_ip4; + key.local_port = htons(local_port); + key.remote_ip4 = remote_ip4; + key.remote_port = htons(remote_port); + key.l4_proto = IPPROTO_TCP; + + struct flow_data value = {0}; + + DEBUG("[lookup_flowtracer] Lookup fd %d, local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d, l4_proto: %d, key_size: %d, value_size: %d\n", + g_flow_tracer_map_fd, key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port, key.l4_proto, sizeof(key), sizeof(value)); + + int err = bpf_map_lookup_elem(g_flow_tracer_map_fd, &key, &value); + if (err) { + DEBUG("[lookup_flowtracer] Lookup fd %d, err: %d (%s)\n", g_flow_tracer_map_fd, err, strerror(errno)); + return ADDR_TRANSFORM_NONE; + } + + __u32 original_remote_ip4 = value.original_remote_ip4; + __u16 original_remote_port = ntohs(value.original_remote_port); + + DEBUG("[lookup_flowtracer] Lookup original_remote_ip4: %x, original_remote_port: %d\n", + original_remote_ip4, original_remote_port); + + if (connect->family == AF_INET) { + if (connect->role == 0) { + connect->cip_addr.c_ip = original_remote_ip4; + connect->c_port = original_remote_port; + return ADDR_TRANSFORM_CLIENT; + } else { + connect->sip_addr.s_ip = original_remote_ip4; + connect->s_port = original_remote_port; + return ADDR_TRANSFORM_SERVER; + } + } else { + if (connect->role == 0) { + *((__u32 *)(connect->cip_addr.c_ip6 + IP4_BYTE_1_IN_IP6)) = original_remote_ip4; + connect->c_port = original_remote_port; + return ADDR_TRANSFORM_CLIENT; + } else { + *((__u32 *)(connect->sip_addr.s_ip6 + IP4_BYTE_1_IN_IP6)) = original_remote_ip4; + connect->s_port = original_remote_port; + return ADDR_TRANSFORM_SERVER; + } + } +} diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c index cf0c1d53b2e8487e18ae5dc67ee1772e0ff3d0e8..cdc3a2da68eb5cf3af8c5505015d38bb71a86e56 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c @@ -38,6 +38,7 @@ #include "bpf.h" #include "ipc.h" #include "conntrack.h" +#include "flowtracer_reader.h" #include "tcpprobe.h" #include "tcp_tracker.h" @@ -177,57 +178,53 @@ static struct tcp_tracker_s* lkup_tcp_tracker(struct tcp_mng_s *tcp_mng, const s return tracker; } -static void __transform_cluster_ip(struct tcp_mng_s *tcp_mng, const struct tcp_link_s *tcp_link, struct backend_ip_s *backend_ip) +static int __transform_cluster_ip(struct tcp_mng_s *tcp_mng, const struct tcp_link_s *tcp_link, struct tcp_connect_s *connect) { - int transform = 0; + int transform = ADDR_TRANSFORM_NONE; - if (tcp_mng->ipc_body.probe_param.cluster_ip_backend == 0) { - return; + char cluster_ip_backend = tcp_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { // transform is disabled + return ADDR_TRANSFORM_NONE; } - // Only transform Kubernetes cluster IP backend for the client TCP connection. - if (tcp_link->role == 0) { - return; - } - - struct tcp_connect_s connect; - - connect.role = tcp_link->role; - connect.family = tcp_link->family; + connect->role = tcp_link->role; + connect->family = tcp_link->family; if (tcp_link->family == AF_INET) { - connect.cip_addr.c_ip = tcp_link->c_ip; - connect.sip_addr.s_ip = tcp_link->s_ip; + connect->cip_addr.c_ip = tcp_link->c_ip; + connect->sip_addr.s_ip = tcp_link->s_ip; } else { - memcpy(&(connect.cip_addr), tcp_link->c_ip6, IP6_LEN); - memcpy(&(connect.sip_addr), tcp_link->s_ip6, IP6_LEN); + memcpy(&(connect->cip_addr), tcp_link->c_ip6, IP6_LEN); + memcpy(&(connect->sip_addr), tcp_link->s_ip6, IP6_LEN); + } + connect->c_port = tcp_link->c_port; + connect->s_port = tcp_link->s_port; + + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (tcp_link->role == 0) { + return ADDR_TRANSFORM_NONE; + } + (void)get_cluster_ip_backend(connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(connect); } - connect.c_port = tcp_link->c_port; - connect.s_port = tcp_link->s_port; - (void)get_cluster_ip_backend(&connect, &transform); if (!transform) { - return; + return ADDR_TRANSFORM_NONE; } #ifdef GOPHER_DEBUG - char ip1[IP6_LEN], ip2[IP6_LEN]; - ip1[0] = 0; - (void)inet_ntop(tcp_link->family, (const void *)&(tcp_link->s_ip), ip1, IP6_LEN); - ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), ip2, IP6_LEN); - DEBUG("[TCPPROBE]: Cluster IP[%s:%u->%s:%u] transform successfully.\n", ip1, tcp_link->s_port, ip2, connect.s_port); + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; + inet_ntop(tcp_link->family, &tcp_link->s_ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect->family, &connect->sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(tcp_link->family, &tcp_link->c_ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect->family, &connect->cip_addr, c_ip2, sizeof(c_ip2)); + DEBUG("[TCPPROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port, c_ip2, connect->c_port, s_ip2, connect->s_port); #endif - backend_ip->family = tcp_link->family; - - if (backend_ip->family == AF_INET) { - backend_ip->ip_addr.ip = connect.sip_addr.s_ip; - } else { - memcpy(&(backend_ip->ip_addr), &(connect.sip_addr), IP6_LEN); - } - backend_ip->port = connect.s_port; - return; + return transform; } static void __init_tracker_id(struct tcp_tracker_id_s *tracker_id, const struct tcp_link_s *tcp_link) @@ -306,20 +303,26 @@ static void __init_flow_tracker_id(struct tcp_flow_tracker_id_s *tracker_id, con struct tcp_tracker_s* get_tcp_tracker(struct tcp_mng_s *tcp_mng, const void *link) { - struct backend_ip_s backend_ip = {0}; + struct tcp_connect_s connect = {0}; struct tcp_tracker_id_s tracker_id = {0}; const struct tcp_link_s *tcp_link = link; __init_tracker_id(&tracker_id, tcp_link); - __transform_cluster_ip(tcp_mng, tcp_link, &backend_ip); - if (backend_ip.family != 0 && backend_ip.port != 0) { - if (backend_ip.family == AF_INET) { - tracker_id.s_ip = backend_ip.ip_addr.ip; + int transform = __transform_cluster_ip(tcp_mng, tcp_link, &connect); + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker_id.s_ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker_id.s_ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker_id.port = connect.s_port; + } else if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker_id.c_ip = connect.cip_addr.c_ip; } else { - memcpy(tracker_id.s_ip6, &(backend_ip.ip_addr), IP6_LEN); + memcpy(tracker_id.c_ip6, &(connect.cip_addr.c_ip6), IP6_LEN); } - tracker_id.port = backend_ip.port; } struct tcp_tracker_s* tracker = lkup_tcp_tracker(tcp_mng, (const struct tcp_tracker_id_s *)&tracker_id); @@ -362,18 +365,18 @@ void destroy_tcp_trackers(struct tcp_mng_s *tcp_mng) struct tcp_flow_tracker_s* get_tcp_flow_tracker(struct tcp_mng_s *tcp_mng, const void *link) { - struct backend_ip_s backend_ip = {0}; + struct tcp_connect_s connect = {0}; struct tcp_flow_tracker_id_s tracker_id = {0}; const struct tcp_link_s *tcp_link = link; __init_flow_tracker_id(&tracker_id, tcp_link); - __transform_cluster_ip(tcp_mng, tcp_link, &backend_ip); - if (backend_ip.family != 0 && backend_ip.port != 0) { + int transform = __transform_cluster_ip(tcp_mng, tcp_link, &connect); + if (transform & ADDR_TRANSFORM_SERVER) { tracker_id.remote_ip[0] = 0; - ip_str(backend_ip.family, (unsigned char *)&(backend_ip.ip_addr), + ip_str(connect.family, (unsigned char *)&(connect.sip_addr), (unsigned char *)tracker_id.remote_ip, sizeof(tracker_id.remote_ip)); - tracker_id.port = backend_ip.port; + tracker_id.port = connect.s_port; } struct tcp_flow_tracker_s* tracker = lkup_tcp_flow_tracker(tcp_mng,