From 9a6742931d700ae4b14ebff28436c8726f0715b5 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Fri, 5 Jan 2024 12:05:40 +0000 Subject: [PATCH 01/17] feat: add FlowTracer plugin FlowTracer is an active probe plugin that tracks TCP connections and transfers packet source address in-band within a connection. Its main purpose is to address different NAT scenarios including virtual service IP in Kubernetes. --- src/common/ipc.h | 1 + src/lib/probe/probe_mng.c | 3 +- src/lib/probe/probe_params_parser.c | 2 +- .../ebpf.probe/src/flowtracer/Makefile | 46 +++++ .../ebpf.probe/src/flowtracer/README.md | 67 +++++++ .../src/flowtracer/flowtracer.bpf.c | 176 ++++++++++++++++++ .../ebpf.probe/src/flowtracer/flowtracer.c | 124 ++++++++++++ .../ebpf.probe/src/flowtracer/flowtracer.h | 32 ++++ .../ebpf.probe/src/include/conntrack.h | 4 + .../src/include/flowtracer_common.h | 36 ++++ .../src/include/flowtracer_reader.h | 22 +++ .../extends/ebpf.probe/src/lib/conntrack.c | 4 +- .../ebpf.probe/src/lib/flowtracer_reader.c | 85 +++++++++ .../ebpf.probe/src/tcpprobe/tcp_tracker.c | 105 ++++++----- 14 files changed, 654 insertions(+), 53 deletions(-) create mode 100644 src/probes/extends/ebpf.probe/src/flowtracer/Makefile create mode 100644 src/probes/extends/ebpf.probe/src/flowtracer/README.md create mode 100644 src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c create mode 100644 src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c create mode 100644 src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h create mode 100644 src/probes/extends/ebpf.probe/src/include/flowtracer_common.h create mode 100644 src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h create mode 100644 src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c diff --git a/src/common/ipc.h b/src/common/ipc.h index eeb625dc..a48dc371 100644 --- a/src/common/ipc.h +++ b/src/common/ipc.h @@ -125,6 +125,7 @@ enum probe_type_e { PROBE_SCHED, PROBE_CONTAINER, PROBE_SERMANT, + PROBE_FLOWTRACER, // If you want to add a probe, add the probe type. diff --git a/src/lib/probe/probe_mng.c b/src/lib/probe/probe_mng.c index 9f8d69c3..e6c72617 100644 --- a/src/lib/probe/probe_mng.c +++ b/src/lib/probe/probe_mng.c @@ -60,7 +60,8 @@ struct probe_define_s probe_define[] = { {"ksli", "$gala-gopher-dir/ksliprobe", PROBE_KSLI}, {"sched", "$gala-gopher-dir/schedprobe", PROBE_SCHED}, {"container", "$gala-gopher-dir/cadvisor_probe.py", PROBE_CONTAINER}, - {"sermant", "$gala-gopher-dir/sermant_probe.py", PROBE_SERMANT} + {"sermant", "$gala-gopher-dir/sermant_probe.py", PROBE_SERMANT}, + {"flowtracer", "$gala-gopher-dir/flowtracer", PROBE_FLOWTRACER} // If you want to add a probe, add the probe define. }; diff --git a/src/lib/probe/probe_params_parser.c b/src/lib/probe/probe_params_parser.c index c9ed5fca..81aac823 100644 --- a/src/lib/probe/probe_params_parser.c +++ b/src/lib/probe/probe_params_parser.c @@ -591,7 +591,7 @@ struct param_key_s param_keys[] = { {PERF_SAMPLE_PERIOD, {10, 10, 1000, ""}, parser_perf_sample_period, set_default_params_inter_perf_sample_period, JSON_NUMBER}, {MULTI_INSTANCE, {0, 0, 1, ""}, parser_multi_instance, set_default_params_char_multi_instance_flag, JSON_NUMBER}, {NATIVE_STACK, {0, 0, 1, ""}, parser_native_stack, set_default_params_char_native_stack_flag, JSON_NUMBER}, - {CLUSTER_IP_BACKEND, {0, 0, 1, ""}, parser_cluster_ip_backend_flag, set_default_params_char_cluster_ip_backend, JSON_NUMBER}, + {CLUSTER_IP_BACKEND, {0, 0, 2, ""}, parser_cluster_ip_backend_flag, set_default_params_char_cluster_ip_backend, JSON_NUMBER}, {SVG_DIR, {0, 0, 0, "/var/log/gala-gopher/stacktrace"}, parser_svg_dir, set_default_params_str_svg_dir, JSON_STRING}, {FLAME_DIR, {0, 0, 0, "/var/log/gala-gopher/flamegraph"}, parser_flame_dir, set_default_params_str_flame_dir, JSON_STRING}, #if 0 diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/Makefile b/src/probes/extends/ebpf.probe/src/flowtracer/Makefile new file mode 100644 index 00000000..d60f4f25 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/Makefile @@ -0,0 +1,46 @@ +include ../mk/var.mk +INCLUDES = $(BASE_INC) + +APP := flowtracer + +SRC_CPLUS := $(wildcard *.cpp) +SRC_CPLUS += $(CPLUSFILES) + +BPF_C := $(wildcard *.bpf.c) +DEPS := $(patsubst %.bpf.c, %.bpf.o, $(BPF_C)) +DEPS += $(patsubst %.bpf.c, %.skel.h, $(BPF_C)) +DEPS += $(patsubst %.cpp, %.o, $(SRC_CPLUS)) + +SRC_C := $(filter-out $(BPF_C), $(wildcard *.c)) +SRC_C += $(CFILES) + +.PHONY: all clean install + +all: pre deps app +pre: $(OUTPUT) +deps: $(DEPS) +# build bpf code +%.bpf.o: %.bpf.c + $(CLANG) $(CLANGFLAGS) -target bpf $(INCLUDES) -c $(filter %.c,$^) -o $@ + $(LLVM_STRIP) -g $@ + +# build skel.h +%.skel.h: %.bpf.o + $(BPFTOOL) gen skeleton $< > $@ + +# build c++ files +%.o: %.cpp + $(C++) -c $^ $(CXXFLAGS) $(INCLUDES) -o $@ + +app: $(APP) +%: %.c $(SRC_C) + $(CC) $(CFLAGS) $(patsubst %.cpp, %.o, $(SRC_CPLUS)) $(INCLUDES) $^ $(LDFLAGS) $(LINK_TARGET) -o $@ + @echo $@ "compiling completed." +clean: + rm -rf $(DEPS) + rm -rf $(APP) + +install: + mkdir -p $(INSTALL_DIR) + cp $(APP) $(INSTALL_DIR) + diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/README.md b/src/probes/extends/ebpf.probe/src/flowtracer/README.md new file mode 100644 index 00000000..bd5cf93c --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/README.md @@ -0,0 +1,67 @@ +# FlowTracer + +## What is FlowTracer? + +Consider a network scenario where a client communicates to a server through NAT. The most common +case is Kubernetes with the default Kubeproxy where a pod talks to a virtual service IP and doesn't +know a real destination. Depending on underlying network technology the destination pod has the +symmetric issue - it sees only the translated address and not the address of the original source. + +FlowTracer is an active probe that monitors TCP connections and transfers packet source address +in-band within the connection. At a receiver end the probe extracts injected address and maintains +association from the observed address tuple to the received one. + +## How to run FlowTracer + +FlowTracer is implemented as a Gala-Gopher plugin, it can be started/stopped using REST API. +Data retrieval is integrated into TCP probe and if enabled it replaces an observed address with +an original one. + +### Prerequisites + +FlowTracer BPF program is attached to cgroup2 mounted into a host file system. It is possible +to mount all cgroups or be more specific, for example to enable FlowTracer for individual processes +or containers. + +To mount cgroup2 in OpenEuler: + 1. Create a mount point: `sudo mkdir /mnt/cgroup2`. + 2. Mount cgroup2 to the mount point: `mount -t cgroup2 none /mnt/cgroup2/`. + +### To enable FlowTracer + +FlowTracer can be controlled via Gala-Gopher's REST API (in the example below the API runs at the local host): + 1. Enable FlowTracer: `curl -X PUT -d json='{"state":"running"}' http://localhost:9999/flowtracer`. + 2. Enable FlowTracer data mapping in tcp probe: `curl -X PUT -d json='{"cmd":{"probe":["tcp_stats"]},"snoopers":{"proc_name":[{"comm":"^python.*","cmdline":""}]},"params":{"report_event":0,"report_period":10,"cluster_ip_backend":2},"state":"running"}' http://localhost:9999/tcp`. Note that this command enables only 1 of TCP probes for processes that name start with python. + +### To disable FlowTracer + +FlowTracer can be controlled via Gala-Gopher's REST API: + 1. Stop FlowTracer: `curl -X PUT -d json='{"state":"stopped"}' http://localhost:9999/flowtracer`. + 2. Disable FlowTracer data mapping in tcp probe: `curl -X PUT -d json='{"cmd":{"probe":["tcp_stats"]},"snoopers":{"proc_name":[{"comm":"^python.*","cmdline":""}]},"params":{"report_event":0,"report_period":10,"cluster_ip_backend":0},"state":"running"}' http://localhost:9999/tcp`. + +### Example + +There are 2 pods running on different nodes in Kubernetes cluster. Pod A is a client that sends HTTP request to a server running in Pod B. +The example shows metric attributes that are collected by Gala-Gopher (common attributes are removed for brevity). + +#### Metrics without FlowTracer + +Client-side - the client connects to a virtual service IP: +``` +gala_gopher_tcp_link_tx_bytes{role="client",client_ip="10.0.0.131",server_ip="10.247.204.240",server_port="8000",pod="app/a-664d757c7b-q6ds2"} 110 1704451439000 +``` +Server-side - the server receives packets from a translated address (host gateway): +``` +gala_gopher_tcp_link_rx_bytes{role="server",client_ip="192.168.3.14",server_ip="10.0.0.5",server_port="8000",pod="app/b-6758f884b4-7mgbm"} 110 1704451440000 +``` + +#### Metrics with FlowTracer + +Client-side - the virtual service IP is resolved to the address of the server: +``` +gala_gopher_tcp_link_tx_bytes{role="client",client_ip="10.0.0.131",server_ip="10.0.0.5",server_port="8000",pod="app/a-664d757c7b-q6ds2"} 110 1704451981000 +``` +Server-side - the real client address is visible too: +``` +gala_gopher_tcp_link_rx_bytes{role="server",client_ip="10.0.0.131",server_ip="10.0.0.5",server_port="8000",pod="app/b-6758f884b4-7mgbm"} 110 1704451982000 +``` diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c new file mode 100644 index 00000000..27e529ff --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c @@ -0,0 +1,176 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include + +#include "flowtracer.h" + +char LICENSE[] SEC("license") = "Dual BSD/GPL"; + +// from `tools/testing/selftests/bpf/test_tcp_hdr_options.h` +#define CG_OK 1 +#define CG_ERR 0 +#define ERR2CG(err) (err)? CG_ERR: CG_OK + +// FlowTracer data: maps from observed TCP flow tuple into received remote original address +#define MAX_ENTRIES_FLOW 65536 +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, struct flow_key); + __type(value, struct flow_data); + __uint(max_entries, MAX_ENTRIES_FLOW); + __uint(pinning, LIBBPF_PIN_BY_NAME); // map is located at /sys/fs/bpf/flowtracer_data +} flowtracer_data SEC(".maps"); + +static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops, __u32 extra) +{ + long err = bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags | extra); + return ERR2CG(err); +} + +static int handle_hdr_opt_len(struct bpf_sock_ops *skops) +{ + const int bytes_to_reserve = sizeof(struct tcp_opt_source_info); + int err = bpf_reserve_hdr_opt(skops, bytes_to_reserve, 0); + return ERR2CG(err); +} + +static int handle_write_hdr_opt(struct bpf_sock_ops *skops) +{ + struct tcp_opt_source_info opt = {0}; + opt.kind = TCP_OPT_SOURCE_INFO_KIND; + opt.len = sizeof(struct tcp_opt_source_info); + opt.address.ip4 = skops -> local_ip4; /* stored in network byte order (bpf.h) */ + opt.address.port = (__be16)bpf_htons((__u16)skops -> local_port); /* stored in host byte order (bpf.h) */ + + int err = bpf_store_hdr_opt(skops, &opt, sizeof(opt), 0); + return ERR2CG(err); +} + +static int handle_hdr_parse(struct bpf_sock_ops *skops) +{ + struct tcp_opt_source_info opt = {0}; + opt.kind = TCP_OPT_SOURCE_INFO_KIND; + + int err = bpf_load_hdr_opt(skops, &opt, sizeof(opt), 0); + if (err < 0) { + return CG_ERR; + } + + __be32 local_ip4 = skops->local_ip4; /* stored in network byte order (bpf.h) */ + __be16 local_port = (__be16)bpf_htons((__u16)skops->local_port); /* stored in host byte order (bpf.h) */ + __be32 remote_ip4 = skops->remote_ip4; /* stored in network byte order (bpf.h) */ + __be16 remote_port = (__be16)(skops->remote_port >> 16); /* stored in network byte order (bpf.h) - high 16 bits of 32 bits variable */ + __be32 original_remote_ip4 = opt.address.ip4; + __be16 original_remote_port = opt.address.port; /* network byte order (as written in handle_write_hdr_opt)*/ + + if (remote_ip4 == original_remote_ip4 && remote_port == original_remote_port) { + return CG_OK; // keep only flows with changed address + } + + // check if the flow is in the flowtracer map + struct flow_key flow_key = {0}; + flow_key.local_ip4 = local_ip4; + flow_key.local_port = local_port; + flow_key.remote_ip4 = remote_ip4; + flow_key.remote_port = remote_port; + flow_key.l4_proto = IPPROTO_TCP; + + struct flow_data *flow_ptr = bpf_map_lookup_elem(&flowtracer_data, &flow_key); + if (!flow_ptr) { + // add a new flow + struct flow_data flow = {0}; + flow.original_remote_ip4 = original_remote_ip4; + flow.original_remote_port = original_remote_port; + bpf_map_update_elem(&flowtracer_data, &flow_key, &flow, BPF_NOEXIST); + } + + return CG_OK; +} + +SEC("sockops") +int flowtracer_sockops_fn(struct bpf_sock_ops *skops) +{ + __u32 op = skops -> op; + + switch (op) { + case BPF_SOCK_OPS_TCP_CONNECT_CB: + case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB: + // enable callbacks + return set_hdr_cb_flags(skops, BPF_SOCK_OPS_PARSE_UNKNOWN_HDR_OPT_CB_FLAG | BPF_SOCK_OPS_WRITE_HDR_OPT_CB_FLAG); + case BPF_SOCK_OPS_HDR_OPT_LEN_CB: + // tell the kernel to allocate space for the header + return handle_hdr_opt_len(skops); + case BPF_SOCK_OPS_WRITE_HDR_OPT_CB: + // give the kernel tcp header option contents + return handle_write_hdr_opt(skops); + case BPF_SOCK_OPS_PARSE_HDR_OPT_CB: + // parse tcp header option + return handle_hdr_parse(skops); + } + + return CG_OK; +} + +SEC("kprobe/nf_conntrack_destroy") +int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) +{ + // Function nf_conntrack_destroy is called when conntrack module deletes connection entry from its table, + // so it is a good time to delete the entry from flowtracer map too. + + // Function nf_conntrack_destroy gets a pointer to nf_conn.ct_general, which is the same + // as pointer to the whole structure nf_conn + struct nf_conntrack_tuple_hash th[2]; + BPF_CORE_READ_INTO(&th, (struct nf_conn *)nfct, tuplehash); + + if (th[IP_CT_DIR_ORIGINAL].tuple.dst.protonum != IPPROTO_TCP) { + return 0; + } + + // Conntrack tracks 4-tuple for original packets and 4-tuple for reply packets in tuplehash array. + // If a connection is originated from the current host (= client), then 4-tuple of original packets + // is the same as observed at the socket level. + // Example of k8s pod talking to virtual service IP: + // * original: (10.0.0.159, 34568, 10.247.204.240, 8000) --> matches client's socket view + // * reply: (10.0.0.28, 8000, 192.168.3.14, 16256) + // If a connection is incoming (= server), then 4-tuple of reply packets match the one observed + // at the socket level. + // Example of an externally accessible service running in a docker container: + // * original: (10.221.55.32, 57911, 10.82.1.33, 8001) + // * reply: (172.17.0.2, 8001, 10.221.55.32, 57911) --> matches server's socket view + + struct flow_key flow_key = {0}; + flow_key.local_ip4 = th[IP_CT_DIR_ORIGINAL].tuple.src.u3.ip; + flow_key.local_port = th[IP_CT_DIR_ORIGINAL].tuple.src.u.tcp.port; + flow_key.remote_ip4 = th[IP_CT_DIR_ORIGINAL].tuple.dst.u3.ip; + flow_key.remote_port = th[IP_CT_DIR_ORIGINAL].tuple.dst.u.tcp.port; + flow_key.l4_proto = IPPROTO_TCP; + + // Try to delete entry as if it was a client socket + bpf_map_delete_elem(&flowtracer_data, &flow_key); + + flow_key.local_ip4 = th[IP_CT_DIR_REPLY].tuple.src.u3.ip; + flow_key.local_port = th[IP_CT_DIR_REPLY].tuple.src.u.tcp.port; + flow_key.remote_ip4 = th[IP_CT_DIR_REPLY].tuple.dst.u3.ip; + flow_key.remote_port = th[IP_CT_DIR_REPLY].tuple.dst.u.tcp.port; + + // Try to delete entry as if it was a server socket + bpf_map_delete_elem(&flowtracer_data, &flow_key); + + return 0; +} diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c new file mode 100644 index 00000000..a7edfaf7 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c @@ -0,0 +1,124 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "bpf.h" +#include "flowtracer.h" +#include "flowtracer.skel.h" + +static volatile sig_atomic_t stop; + +static void sig_handler(int signo) +{ + stop = 1; +} + +static void usage(char *pname) +{ + printf("USAGE:\n %s \n", pname); + printf("\tLoad and attach FlowTracer BPF program to cgroup2 mounted at the specified path\n"); + exit(1); +} + +int main(int argc, char **argv) +{ + char *cg_path; + int cg_fd; + struct flowtracer_bpf *flowtracer_skel; + int err = 0; + + if (argc < 2) { + cg_path = FLOWTRACER_CGROUP2_PATH; + INFO("[FlowTracer] Cgroup2 mount path is not specified, attaching to the default: %s\n", cg_path); + } else if (!strncmp(argv[argc - 1], "-h", 3)) { + usage("flowtracer"); + } else { + cg_path = argv[argc - 1]; + } + + /* Open cgroup path */ + cg_fd = open(cg_path, O_DIRECTORY, O_RDONLY); + if (cg_fd < 0) { + ERROR("[FlowTracer] Error opening cgroup2 path %s: %d (%s)\n", cg_path, cg_fd, strerror(errno)); + return 1; + } + + INIT_BPF_APP(flowtracer, EBPF_RLIM_LIMITED); + + /* Open load and verify BPF application */ + flowtracer_skel = flowtracer_bpf__open(); + if (!flowtracer_skel) { + ERROR("[FlowTracer] Failed to open BPF skeleton\n"); + goto cleanup; + } + + /* Pin FlowTracer BPF map to the file system */ + err = bpf_map__set_pin_path(flowtracer_skel->maps.flowtracer_data, FLOWTRACER_DATA_MAP_PATH); + if (err) { + ERROR("[FlowTracer] Failed to pin FlowTracer BPF map at path %s: %d (%s)\n", FLOWTRACER_DATA_MAP_PATH, err, strerror(errno)); + goto cleanup; + } + + /* Load BPF program */ + if ((err = flowtracer_bpf__load(flowtracer_skel))) { + ERROR("[FlowTracer] Failed to load BPF skeleton: %d (%s)\n", err, strerror(errno)); + goto cleanup; + } + + /* Create signal handler */ + if (signal(SIGINT, sig_handler) == SIG_ERR || signal(SIGTERM, sig_handler) == SIG_ERR) { + ERROR("[FlowTracer] Failed to set a signal handler: %s\n", strerror(errno)); + goto cleanup; + } + + /* Attach program to cgroup */ + int prog_fd = bpf_program__fd(flowtracer_skel -> progs.flowtracer_sockops_fn); + err = bpf_prog_attach(prog_fd, cg_fd, BPF_CGROUP_SOCK_OPS, 0); + if (err) { + ERROR("[FlowTracer] Failed to attach BPF program to cgroup2 mounted at %s: %d (%s)\n", cg_path, err, strerror(errno)); + goto cleanup; + } + + /* Attach tracepoint handler */ + err = flowtracer_bpf__attach(flowtracer_skel); + if (err) { + ERROR("[FlowTracer] Failed to attach BPF tracepoint: %d (%s)\n", err, strerror(errno)); + goto cleanup; + } + + INFO("[FlowTracer] Started successfully\n"); + + /* Process events */ + while (!stop) { + sleep(1); + } + +cleanup: + bpf_prog_detach(cg_fd, BPF_CGROUP_SOCK_OPS); + flowtracer_bpf__detach(flowtracer_skel); + flowtracer_bpf__destroy(flowtracer_skel); + close(cg_fd); + INFO("[FlowTracer] Cleanup is completed\n"); + return -err; +} diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h new file mode 100644 index 00000000..f0f4c1ca --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h @@ -0,0 +1,32 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_H +#define __FLOWTRACER_H + +#include +#include "flowtracer_common.h" + +// TCP option definition +#define TCP_OPT_SOURCE_INFO_KIND 0x55 +struct tcp_opt_source_info { + __u8 kind; + __u8 len; + struct address_info { + __be32 ip4; + __be16 port; + } __attribute__((packed)) address; +} __attribute__((packed)); + +#endif /* __FLOWTRACER_H */ diff --git a/src/probes/extends/ebpf.probe/src/include/conntrack.h b/src/probes/extends/ebpf.probe/src/include/conntrack.h index f8b27047..49e55b22 100644 --- a/src/probes/extends/ebpf.probe/src/include/conntrack.h +++ b/src/probes/extends/ebpf.probe/src/include/conntrack.h @@ -17,6 +17,10 @@ #include "common.h" +#define ADDR_TRANSFORM_NONE 0 +#define ADDR_TRANSFORM_CLIENT 1 +#define ADDR_TRANSFORM_SERVER 2 + struct tcp_connect_s { union { u32 c_ip; diff --git a/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h b/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h new file mode 100644 index 00000000..231a1b6b --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/flowtracer_common.h @@ -0,0 +1,36 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_COMMON_H__ +#define __FLOWTRACER_COMMON_H__ + +#include + +#define FLOWTRACER_DATA_MAP_PATH "/sys/fs/bpf/flowtracer_data" +#define FLOWTRACER_CGROUP2_PATH "/mnt/cgroup2" + +struct flow_key { + __be32 local_ip4; + __be32 remote_ip4; + __be16 local_port; + __be16 remote_port; + __u8 l4_proto; +}; + +struct flow_data { + __be32 original_remote_ip4; + __be16 original_remote_port; +}; + +#endif diff --git a/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h b/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h new file mode 100644 index 00000000..24c87649 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/flowtracer_reader.h @@ -0,0 +1,22 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#ifndef __FLOWTRACER_READER_H__ +#define __FLOWTRACER_READER_H__ + +#include "conntrack.h" + +int lookup_flowtracer(struct tcp_connect_s *connect); + +#endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/lib/conntrack.c b/src/probes/extends/ebpf.probe/src/lib/conntrack.c index ebb2c7d7..e4cb750c 100644 --- a/src/probes/extends/ebpf.probe/src/lib/conntrack.c +++ b/src/probes/extends/ebpf.probe/src/lib/conntrack.c @@ -252,7 +252,7 @@ int get_cluster_ip_backend(struct tcp_connect_s *connect, int *transform) char line[LINE_BUF_LEN]; char command[COMMAND_LEN]; - *transform = 0; + *transform = ADDR_TRANSFORM_NONE; // Only transform Kubernetes cluster IP backend for the client TCP connection. if (connect->role == 0) { return 0; @@ -295,7 +295,7 @@ int get_cluster_ip_backend(struct tcp_connect_s *connect, int *transform) ret = __dnat_op((const struct tcp_conntrack_s *)conntrack, connect); if (ret == 0) { - *transform = 1; + *transform = ADDR_TRANSFORM_SERVER; __free_conntrack_tcp(conntrack); break; } diff --git a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c new file mode 100644 index 00000000..27b9fecb --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c @@ -0,0 +1,85 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-01-05 + * Description: FlowTracer plugin + ******************************************************************************/ +#include +#include +#include +#include +#include "common.h" +#include "flowtracer_common.h" +#include "flowtracer_reader.h" + +static int g_flow_tracer_map_fd = -1; + +int lookup_flowtracer(struct tcp_connect_s *connect) { + __u32 local_ip4, remote_ip4; + __u16 local_port, remote_port; + int fd = g_flow_tracer_map_fd; + + if (connect->family != AF_INET) { + return ADDR_TRANSFORM_NONE; // only IPv4 is supported + } + + if (connect->role == 0) { + // server: local is server, remote is client + local_ip4 = connect->sip_addr.s_ip; + local_port = connect->s_port; + remote_ip4 = connect->cip_addr.c_ip; + remote_port = connect->c_port; + } else { + // client: local is client, remote is server + local_ip4 = connect->cip_addr.c_ip; + local_port = connect->c_port; + remote_ip4 = connect->sip_addr.s_ip; + remote_port = connect->s_port; + } + + if (g_flow_tracer_map_fd < 0) { + // Try to retrieve map fd + fd = bpf_obj_get(FLOWTRACER_DATA_MAP_PATH); + if (fd <= 0) { // map doesn't exist + DEBUG("[lookup_flowtracer] Failed to open FlowTracer map: %s\n", strerror(errno)); + return ADDR_TRANSFORM_NONE; + } + g_flow_tracer_map_fd = fd; + DEBUG("[lookup_flowtracer] FlowTracer map is opened successfully, fd: %d\n", g_flow_tracer_map_fd); + } + + struct flow_key key = { + local_ip4: local_ip4, + local_port: htons(local_port), + remote_ip4: remote_ip4, + remote_port: htons(remote_port), + l4_proto: IPPROTO_TCP + }; + struct flow_data value = {0}; + + int err = bpf_map_lookup_elem(g_flow_tracer_map_fd, &key, &value); + if (err) { + return ADDR_TRANSFORM_NONE; + } + + __u32 original_remote_ip4 = value.original_remote_ip4; + __u16 original_remote_port = ntohs(value.original_remote_port); + + if (connect->role == 0) { + connect->cip_addr.c_ip = original_remote_ip4; + connect->c_port = original_remote_port; + return ADDR_TRANSFORM_CLIENT; + } else { + connect->sip_addr.s_ip = original_remote_ip4; + connect->s_port = original_remote_port; + return ADDR_TRANSFORM_SERVER; + } +} diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c index cf0c1d53..c6c41976 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c @@ -38,6 +38,7 @@ #include "bpf.h" #include "ipc.h" #include "conntrack.h" +#include "flowtracer_reader.h" #include "tcpprobe.h" #include "tcp_tracker.h" @@ -177,57 +178,57 @@ static struct tcp_tracker_s* lkup_tcp_tracker(struct tcp_mng_s *tcp_mng, const s return tracker; } -static void __transform_cluster_ip(struct tcp_mng_s *tcp_mng, const struct tcp_link_s *tcp_link, struct backend_ip_s *backend_ip) +static int __transform_cluster_ip(struct tcp_mng_s *tcp_mng, const struct tcp_link_s *tcp_link, struct tcp_connect_s *connect) { - int transform = 0; + int transform = ADDR_TRANSFORM_NONE; - if (tcp_mng->ipc_body.probe_param.cluster_ip_backend == 0) { - return; + char cluster_ip_backend = tcp_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { // transform is disabled + return ADDR_TRANSFORM_NONE; } - // Only transform Kubernetes cluster IP backend for the client TCP connection. - if (tcp_link->role == 0) { - return; - } - - struct tcp_connect_s connect; - - connect.role = tcp_link->role; - connect.family = tcp_link->family; + connect->role = tcp_link->role; + connect->family = tcp_link->family; if (tcp_link->family == AF_INET) { - connect.cip_addr.c_ip = tcp_link->c_ip; - connect.sip_addr.s_ip = tcp_link->s_ip; + connect->cip_addr.c_ip = tcp_link->c_ip; + connect->sip_addr.s_ip = tcp_link->s_ip; } else { - memcpy(&(connect.cip_addr), tcp_link->c_ip6, IP6_LEN); - memcpy(&(connect.sip_addr), tcp_link->s_ip6, IP6_LEN); + memcpy(&(connect->cip_addr), tcp_link->c_ip6, IP6_LEN); + memcpy(&(connect->sip_addr), tcp_link->s_ip6, IP6_LEN); } - connect.c_port = tcp_link->c_port; - connect.s_port = tcp_link->s_port; + connect->c_port = tcp_link->c_port; + connect->s_port = tcp_link->s_port; - (void)get_cluster_ip_backend(&connect, &transform); - if (!transform) { - return; + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (tcp_link->role == 0) { + return ADDR_TRANSFORM_NONE; + } + (void)get_cluster_ip_backend(connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(connect); } #ifdef GOPHER_DEBUG - char ip1[IP6_LEN], ip2[IP6_LEN]; - ip1[0] = 0; - (void)inet_ntop(tcp_link->family, (const void *)&(tcp_link->s_ip), ip1, IP6_LEN); - ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), ip2, IP6_LEN); - DEBUG("[TCPPROBE]: Cluster IP[%s:%u->%s:%u] transform successfully.\n", ip1, tcp_link->s_port, ip2, connect.s_port); -#endif - - backend_ip->family = tcp_link->family; - - if (backend_ip->family == AF_INET) { - backend_ip->ip_addr.ip = connect.sip_addr.s_ip; + char c_ip1[IP6_LEN], s_ip1[IP6_LEN]; + inet_ntop(tcp_link->family, (const void *)&(tcp_link->c_ip), c_ip1, IP6_LEN); + inet_ntop(tcp_link->family, (const void *)&(tcp_link->s_ip), s_ip1, IP6_LEN); + + if (transform) { + char c_ip2[IP6_LEN], s_ip2[IP6_LEN]; + inet_ntop(connect->family, (const void *)&(connect->cip_addr), c_ip2, IP6_LEN); + inet_ntop(connect->family, (const void *)&(connect->sip_addr), s_ip2, IP6_LEN); + DEBUG("[TCPPROBE]: Flow (%s:%u-%s:%u) is transformed into (%s:%u-%s:%u)\n", + c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port, + c_ip2, connect->c_port, s_ip2, connect->s_port); } else { - memcpy(&(backend_ip->ip_addr), &(connect.sip_addr), IP6_LEN); + DEBUG("[TCPPROBE]: Flow (%s:%u-%s:%u) is not transformed\n", + c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port); } - backend_ip->port = connect.s_port; - return; +#endif + + return transform; } static void __init_tracker_id(struct tcp_tracker_id_s *tracker_id, const struct tcp_link_s *tcp_link) @@ -306,20 +307,26 @@ static void __init_flow_tracker_id(struct tcp_flow_tracker_id_s *tracker_id, con struct tcp_tracker_s* get_tcp_tracker(struct tcp_mng_s *tcp_mng, const void *link) { - struct backend_ip_s backend_ip = {0}; + struct tcp_connect_s connect = {0}; struct tcp_tracker_id_s tracker_id = {0}; const struct tcp_link_s *tcp_link = link; __init_tracker_id(&tracker_id, tcp_link); - __transform_cluster_ip(tcp_mng, tcp_link, &backend_ip); - if (backend_ip.family != 0 && backend_ip.port != 0) { - if (backend_ip.family == AF_INET) { - tracker_id.s_ip = backend_ip.ip_addr.ip; + int transform = __transform_cluster_ip(tcp_mng, tcp_link, &connect); + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker_id.s_ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker_id.s_ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker_id.port = connect.s_port; + } else if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker_id.c_ip = connect.cip_addr.c_ip; } else { - memcpy(tracker_id.s_ip6, &(backend_ip.ip_addr), IP6_LEN); + memcpy(tracker_id.c_ip6, &(connect.cip_addr.c_ip6), IP6_LEN); } - tracker_id.port = backend_ip.port; } struct tcp_tracker_s* tracker = lkup_tcp_tracker(tcp_mng, (const struct tcp_tracker_id_s *)&tracker_id); @@ -362,18 +369,18 @@ void destroy_tcp_trackers(struct tcp_mng_s *tcp_mng) struct tcp_flow_tracker_s* get_tcp_flow_tracker(struct tcp_mng_s *tcp_mng, const void *link) { - struct backend_ip_s backend_ip = {0}; + struct tcp_connect_s connect = {0}; struct tcp_flow_tracker_id_s tracker_id = {0}; const struct tcp_link_s *tcp_link = link; __init_flow_tracker_id(&tracker_id, tcp_link); - __transform_cluster_ip(tcp_mng, tcp_link, &backend_ip); - if (backend_ip.family != 0 && backend_ip.port != 0) { + int transform = __transform_cluster_ip(tcp_mng, tcp_link, &connect); + if (transform & ADDR_TRANSFORM_SERVER) { tracker_id.remote_ip[0] = 0; - ip_str(backend_ip.family, (unsigned char *)&(backend_ip.ip_addr), + ip_str(connect.family, (unsigned char *)&(connect.sip_addr), (unsigned char *)tracker_id.remote_ip, sizeof(tracker_id.remote_ip)); - tracker_id.port = backend_ip.port; + tracker_id.port = connect.s_port; } struct tcp_flow_tracker_s* tracker = lkup_tcp_flow_tracker(tcp_mng, -- Gitee From e95271f1699d8394c453a18b4031d56c627807d8 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 6 Feb 2024 13:07:06 +0000 Subject: [PATCH 02/17] Support transformation of IPv4 addresses inside of IPv6 addresses --- .../ebpf.probe/src/lib/flowtracer_reader.c | 57 ++++++++++++++----- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c index 27b9fecb..a6165860 100644 --- a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c +++ b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c @@ -27,21 +27,33 @@ int lookup_flowtracer(struct tcp_connect_s *connect) { __u16 local_port, remote_port; int fd = g_flow_tracer_map_fd; - if (connect->family != AF_INET) { - return ADDR_TRANSFORM_NONE; // only IPv4 is supported + __u32 s_ip, c_ip; + if (connect->family == AF_INET) { + // IPv4 + s_ip = connect->sip_addr.s_ip; + c_ip = connect->cip_addr.c_ip; + } else if (connect->family == AF_INET6 && + NIP6_IS_ADDR_V4MAPPED((unsigned short *)connect->sip_addr.s_ip6) && NIP6_IS_ADDR_V4MAPPED((unsigned short *)connect->cip_addr.c_ip6)) { + // IPv4 address mapped to IPv6 + s_ip = *((__u32 *)(connect->sip_addr.s_ip6 + IP4_BYTE_1_IN_IP6)); + c_ip = *((__u32 *)(connect->cip_addr.c_ip6 + IP4_BYTE_1_IN_IP6)); + DEBUG("[lookup_flowtracer] mapped IPv4: server %x client %x\n", s_ip, c_ip); + } else { + // pure IPv6 is not supported + return ADDR_TRANSFORM_NONE; } if (connect->role == 0) { // server: local is server, remote is client - local_ip4 = connect->sip_addr.s_ip; + local_ip4 = s_ip; local_port = connect->s_port; - remote_ip4 = connect->cip_addr.c_ip; + remote_ip4 = c_ip; remote_port = connect->c_port; } else { // client: local is client, remote is server - local_ip4 = connect->cip_addr.c_ip; + local_ip4 = c_ip; local_port = connect->c_port; - remote_ip4 = connect->sip_addr.s_ip; + remote_ip4 = s_ip; remote_port = connect->s_port; } @@ -65,21 +77,40 @@ int lookup_flowtracer(struct tcp_connect_s *connect) { }; struct flow_data value = {0}; + DEBUG("[lookup_flowtracer] Lookup local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d\n", + key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port); + int err = bpf_map_lookup_elem(g_flow_tracer_map_fd, &key, &value); if (err) { + DEBUG("[lookup_flowtracer] Lookup fd %d, err: %d (%s)\n", g_flow_tracer_map_fd, err, strerror(errno)); return ADDR_TRANSFORM_NONE; } __u32 original_remote_ip4 = value.original_remote_ip4; __u16 original_remote_port = ntohs(value.original_remote_port); - if (connect->role == 0) { - connect->cip_addr.c_ip = original_remote_ip4; - connect->c_port = original_remote_port; - return ADDR_TRANSFORM_CLIENT; + DEBUG("[lookup_flowtracer] Lookup original_remote_ip4: %x, original_remote_port: %d\n", + original_remote_ip4, original_remote_port); + + if (connect->family == AF_INET) { + if (connect->role == 0) { + connect->cip_addr.c_ip = original_remote_ip4; + connect->c_port = original_remote_port; + return ADDR_TRANSFORM_CLIENT; + } else { + connect->sip_addr.s_ip = original_remote_ip4; + connect->s_port = original_remote_port; + return ADDR_TRANSFORM_SERVER; + } } else { - connect->sip_addr.s_ip = original_remote_ip4; - connect->s_port = original_remote_port; - return ADDR_TRANSFORM_SERVER; + if (connect->role == 0) { + *((__u32 *)(connect->cip_addr.c_ip6 + IP4_BYTE_1_IN_IP6)) = original_remote_ip4; + connect->c_port = original_remote_port; + return ADDR_TRANSFORM_CLIENT; + } else { + *((__u32 *)(connect->sip_addr.s_ip6 + IP4_BYTE_1_IN_IP6)) = original_remote_ip4; + connect->s_port = original_remote_port; + return ADDR_TRANSFORM_SERVER; + } } } -- Gitee From 1caf8e7f1732c885fd471797daf07edc516026a2 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 18 Jan 2024 12:42:30 +0000 Subject: [PATCH 03/17] Map L7 connections using FlowTracer data --- .../ebpf.probe/src/l7probe/conn_tracker.c | 73 +++++++++++++++---- 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index aa85f8fd..5fc8e722 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -27,10 +27,15 @@ #undef BPF_PROG_USER #endif +#ifdef GOPHER_DEBUG +#include +#endif + #include "bpf.h" #include "container.h" #include "histogram.h" #include "conntrack.h" +#include "flowtracer_reader.h" #include "include/conn_tracker.h" #include "include/connect.h" #include "protocol/expose/protocol_parser.h" @@ -350,14 +355,10 @@ static struct l7_api_statistic_s* create_l7_api_statistic(const struct api_stats static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* tracker) { - int transform = 0; - - if (l7_mng->ipc_body.probe_param.cluster_ip_backend == 0) { - return; - } + int transform = ADDR_TRANSFORM_NONE; - // Only transform Kubernetes cluster IP backend for the client TCP connection. - if (tracker->l4_role != L4_CLIENT) { + char cluster_ip_backend = l7_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { return; } @@ -376,17 +377,53 @@ static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* connect.c_port = tracker->open_info.client_addr.port; connect.s_port = tracker->open_info.server_addr.port; - (void)get_cluster_ip_backend(&connect, &transform); + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (tracker->l4_role != L4_CLIENT) { + return; + } + (void)get_cluster_ip_backend(&connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(&connect); + DEBUG("[L7PROBE] FlowTracer transform: %d\n", transform); + } + if (!transform) { return; } - if (connect.family == AF_INET) { - tracker->open_info.server_addr.ip = connect.sip_addr.s_ip; - } else { - memcpy(tracker->open_info.server_addr.ip6, &(connect.sip_addr), IP6_LEN); +#ifdef GOPHER_DEBUG + char s_ip1[IP6_LEN], s_ip2[IP6_LEN], c_ip1[IP6_LEN], c_ip2[IP6_LEN]; + s_ip1[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip1, IP6_LEN); + s_ip2[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, IP6_LEN); + c_ip1[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip1, IP6_LEN); + c_ip2[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, IP6_LEN); + DEBUG("[L7PROBE]: Flow (%s:%u-%s:%u) is transformed into (%s:%u-%s:%u)\n", + c_ip1, tracker->open_info.client_addr.port, s_ip1, tracker->open_info.server_addr.port, + c_ip2, connect.c_port, s_ip2, connect.s_port); +#endif + + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker->open_info.server_addr.ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker->open_info.server_addr.ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker->open_info.server_addr.port = connect.s_port; + } + if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker->open_info.client_addr.ip = connect.cip_addr.c_ip; + } else { + memcpy(tracker->open_info.client_addr.ip6, &(connect.cip_addr.c_ip6), IP6_LEN); + } + tracker->open_info.client_addr.port = connect.c_port; } - tracker->open_info.server_addr.port = connect.s_port; + return; } @@ -428,6 +465,16 @@ static int proc_conn_ctl_msg(struct l7_mng_s *l7_mng, struct conn_ctl_s *conn_ct // Client port just used for cluster IP address translation. Here, client port MUST set 0. tracker->open_info.client_addr.port = 0; } +#ifdef GOPHER_DEBUG + char s_ip[IP6_LEN], c_ip[IP6_LEN]; + s_ip[0] = 0; + (void)inet_ntop(tracker->open_info.server_addr.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip, IP6_LEN); + c_ip[0] = 0; + (void)inet_ntop(tracker->open_info.client_addr.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip, IP6_LEN); + DEBUG("[L7PROBE.proc_conn_ctl_msg]: Tracker flow (%s:%u-%s:%u)\n", + c_ip, tracker->open_info.client_addr.port, s_ip, tracker->open_info.server_addr.port); +#endif + } break; } -- Gitee From 688d21ffcb80e01c86199799742d179b1b555ec3 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 6 Feb 2024 16:04:25 +0000 Subject: [PATCH 04/17] Improve logging in l7 probe --- .../ebpf.probe/src/l7probe/conn_tracker.c | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index 5fc8e722..2739688c 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -393,16 +393,16 @@ static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* } #ifdef GOPHER_DEBUG - char s_ip1[IP6_LEN], s_ip2[IP6_LEN], c_ip1[IP6_LEN], c_ip2[IP6_LEN]; + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; s_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip1, IP6_LEN); + (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip1, sizeof(s_ip1)); s_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, IP6_LEN); + (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, sizeof(s_ip2)); c_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip1, IP6_LEN); + (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip1, sizeof(c_ip1)); c_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, IP6_LEN); - DEBUG("[L7PROBE]: Flow (%s:%u-%s:%u) is transformed into (%s:%u-%s:%u)\n", + (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, sizeof(c_ip2)); + DEBUG("[L7PROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", c_ip1, tracker->open_info.client_addr.port, s_ip1, tracker->open_info.server_addr.port, c_ip2, connect.c_port, s_ip2, connect.s_port); #endif @@ -466,12 +466,12 @@ static int proc_conn_ctl_msg(struct l7_mng_s *l7_mng, struct conn_ctl_s *conn_ct tracker->open_info.client_addr.port = 0; } #ifdef GOPHER_DEBUG - char s_ip[IP6_LEN], c_ip[IP6_LEN]; + char s_ip[IP6_STR_LEN], c_ip[IP6_STR_LEN]; s_ip[0] = 0; - (void)inet_ntop(tracker->open_info.server_addr.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip, IP6_LEN); + (void)inet_ntop(tracker->open_info.server_addr.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip, sizeof(s_ip)); c_ip[0] = 0; - (void)inet_ntop(tracker->open_info.client_addr.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip, IP6_LEN); - DEBUG("[L7PROBE.proc_conn_ctl_msg]: Tracker flow (%s:%u-%s:%u)\n", + (void)inet_ntop(tracker->open_info.client_addr.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip, sizeof(c_ip)); + DEBUG("[L7PROBE.proc_conn_ctl_msg]: CONN_EVT_OPEN: tracker flow (%s:%u - %s:%u)\n", c_ip, tracker->open_info.client_addr.port, s_ip, tracker->open_info.server_addr.port); #endif -- Gitee From 4a922fffa6f2d0662c1bdcf1130c8e5b2024e6c9 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Tue, 6 Feb 2024 16:49:41 +0000 Subject: [PATCH 05/17] Add debug logs to FlowTracer BPF program --- .../src/flowtracer/flowtracer.bpf.c | 40 ++++++++++++++- .../ebpf.probe/src/flowtracer/flowtracer.c | 49 +++++++++++++++++++ .../ebpf.probe/src/flowtracer/flowtracer.h | 12 +++++ 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c index 27e529ff..17d81ea9 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c @@ -37,6 +37,31 @@ struct { __uint(pinning, LIBBPF_PIN_BY_NAME); // map is located at /sys/fs/bpf/flowtracer_data } flowtracer_data SEC(".maps"); +#ifdef GOPHER_DEBUG +// ring buffer to send events to user-space (for debug purposes) +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024); +} ring_buffer SEC(".maps"); + +static void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_data *flow_data) +{ + struct flow_log *flow_log = bpf_ringbuf_reserve(&ring_buffer, sizeof(struct flow_log), 0); + if (!flow_log) { + return; + } + + flow_log->op = op; + flow_log->key = *flow_key; + if (flow_data) { + flow_log->data = *flow_data; + } + bpf_ringbuf_submit(flow_log, 0); +} +#else +static inline void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_data *flow_data) {} // NOP +#endif + static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops, __u32 extra) { long err = bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags | extra); @@ -98,6 +123,9 @@ static int handle_hdr_parse(struct bpf_sock_ops *skops) flow.original_remote_ip4 = original_remote_ip4; flow.original_remote_port = original_remote_port; bpf_map_update_elem(&flowtracer_data, &flow_key, &flow, BPF_NOEXIST); + + // notify userspace that a new flow was added (debug purposes only) + notify(FLOW_LOG_ADD, &flow_key, &flow); } return CG_OK; @@ -162,7 +190,11 @@ int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) flow_key.l4_proto = IPPROTO_TCP; // Try to delete entry as if it was a client socket - bpf_map_delete_elem(&flowtracer_data, &flow_key); + int err = bpf_map_delete_elem(&flowtracer_data, &flow_key); + if (!err) { + // notify userspace that a flow was deleted (debug purposes only) + notify(FLOW_LOG_DEL, &flow_key, NULL); + } flow_key.local_ip4 = th[IP_CT_DIR_REPLY].tuple.src.u3.ip; flow_key.local_port = th[IP_CT_DIR_REPLY].tuple.src.u.tcp.port; @@ -170,7 +202,11 @@ int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) flow_key.remote_port = th[IP_CT_DIR_REPLY].tuple.dst.u.tcp.port; // Try to delete entry as if it was a server socket - bpf_map_delete_elem(&flowtracer_data, &flow_key); + err = bpf_map_delete_elem(&flowtracer_data, &flow_key); + if (!err) { + // notify userspace that a flow was deleted (debug purposes only) + notify(FLOW_LOG_DEL, &flow_key, NULL); + } return 0; } diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c index a7edfaf7..cc6c735e 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c @@ -41,12 +41,43 @@ static void usage(char *pname) exit(1); } +#ifdef GOPHER_DEBUG +static int handle_evt(void *ctx, void *notification, size_t sz) +{ + struct flow_log *flow_log = notification; + + char local_ip4[16], remote_ip4[16], original_remote_ip4[16]; + inet_ntop(AF_INET, &flow_log->key.local_ip4, local_ip4, sizeof(local_ip4)); + inet_ntop(AF_INET, &flow_log->key.remote_ip4, remote_ip4, sizeof(remote_ip4)); + inet_ntop(AF_INET, &flow_log->data.original_remote_ip4, original_remote_ip4, sizeof(original_remote_ip4)); + + char log_buf[40] = "\0"; + if (flow_log->op == FLOW_LOG_ADD) { + snprintf(log_buf, sizeof(log_buf), + " original_remote=%s:%u", + original_remote_ip4, bpf_ntohs(flow_log->data.original_remote_port)); + } + DEBUG("[FlowTracer:Event] " + "op=%s" + " local=%s:%u" + " remote=%s:%u" + "%s\n", + (flow_log->op == FLOW_LOG_ADD)? "ADD": "DEL", + local_ip4, bpf_ntohs(flow_log->key.local_port), + remote_ip4, bpf_ntohs(flow_log->key.remote_port), + log_buf + ); + return 0; +} +#endif + int main(int argc, char **argv) { char *cg_path; int cg_fd; struct flowtracer_bpf *flowtracer_skel; int err = 0; + struct ring_buffer *ring_buffer = NULL; if (argc < 2) { cg_path = FLOWTRACER_CGROUP2_PATH; @@ -107,16 +138,34 @@ int main(int argc, char **argv) goto cleanup; } +#ifdef GOPHER_DEBUG + /* Setup ring buffer to poll FlowTracer map operations for debug purposes */ + ring_buffer = ring_buffer__new(bpf_map__fd(flowtracer_skel->maps.ring_buffer), handle_evt, NULL, NULL); + if (!ring_buffer) { + err = 1; + ERROR("[FlowTracer] Failed to create ring buffer!\n"); + goto cleanup; + } +#endif + INFO("[FlowTracer] Started successfully\n"); /* Process events */ while (!stop) { +#ifdef GOPHER_DEBUG + err = ring_buffer__poll(ring_buffer, THOUSAND); + if (err < 0 && err != -EINTR) { + ERROR("[FlowTracer] Error polling ring buffer: %d (%s)\n", err, strerror(errno)); + break; + } +#endif sleep(1); } cleanup: bpf_prog_detach(cg_fd, BPF_CGROUP_SOCK_OPS); flowtracer_bpf__detach(flowtracer_skel); + ring_buffer__free(ring_buffer); flowtracer_bpf__destroy(flowtracer_skel); close(cg_fd); INFO("[FlowTracer] Cleanup is completed\n"); diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h index f0f4c1ca..41c3b845 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.h @@ -29,4 +29,16 @@ struct tcp_opt_source_info { } __attribute__((packed)) address; } __attribute__((packed)); +// debug event definition +enum flow_log_op { + FLOW_LOG_ADD = 1, + FLOW_LOG_DEL = 2 +}; + +struct flow_log { + enum flow_log_op op; + struct flow_key key; + struct flow_data data; +}; + #endif /* __FLOWTRACER_H */ -- Gitee From 447a17b5f9c1f0f5e51d81d842f35d848382fc5b Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 7 Feb 2024 14:55:39 +0000 Subject: [PATCH 06/17] Delete FlowTracer map when the probe finishes --- src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c index cc6c735e..808fce9b 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c @@ -168,6 +168,8 @@ cleanup: ring_buffer__free(ring_buffer); flowtracer_bpf__destroy(flowtracer_skel); close(cg_fd); + // Delete BPF map (pinned at the default libbpf location) + unlink("/sys/fs/bpf/flowtracer_data"); INFO("[FlowTracer] Cleanup is completed\n"); return -err; } -- Gitee From 5b2f9c8eddba138a4d101d339d5cab9d4560feb7 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 7 Feb 2024 14:56:52 +0000 Subject: [PATCH 07/17] Fix FlowTracer lookup --- .../ebpf.probe/src/lib/flowtracer_reader.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c index a6165860..a1af16cd 100644 --- a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c +++ b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c @@ -68,17 +68,17 @@ int lookup_flowtracer(struct tcp_connect_s *connect) { DEBUG("[lookup_flowtracer] FlowTracer map is opened successfully, fd: %d\n", g_flow_tracer_map_fd); } - struct flow_key key = { - local_ip4: local_ip4, - local_port: htons(local_port), - remote_ip4: remote_ip4, - remote_port: htons(remote_port), - l4_proto: IPPROTO_TCP - }; + struct flow_key key = {0}; // alignment gaps must be filled with 0 + key.local_ip4 = local_ip4; + key.local_port = htons(local_port); + key.remote_ip4 = remote_ip4; + key.remote_port = htons(remote_port); + key.l4_proto = IPPROTO_TCP; + struct flow_data value = {0}; - DEBUG("[lookup_flowtracer] Lookup local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d\n", - key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port); + DEBUG("[lookup_flowtracer] Lookup local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d, l4_proto: %d, key-size: %d, value-size: %d\n", + key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port, key.l4_proto, sizeof(key), sizeof(value)); int err = bpf_map_lookup_elem(g_flow_tracer_map_fd, &key, &value); if (err) { -- Gitee From f80cc5666730ae77a2b3caad89d8331d6948bbbe Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 7 Feb 2024 14:58:31 +0000 Subject: [PATCH 08/17] Lookup FlowTracer map in Endpoint probe --- .../ebpf.probe/src/endpointprobe/endpoint.c | 88 ++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 18ddf8f7..20c7303e 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -41,7 +41,9 @@ #include "event.h" #include "ipc.h" #include "hash.h" +#include "conntrack.h" #include "container.h" +#include "flowtracer_reader.h" #include "histogram.h" #define EP_ENTITY_ID_LEN 64 @@ -394,6 +396,80 @@ static int process_tcp_conn_close(struct tcp_socket_s *tcp, struct tcp_socket_ev return 0; } +static void transform_cluster_ip(struct endpoint_probe_s *probe_mng, struct tcp_socket_id_s *tracker) +{ + int transform = ADDR_TRANSFORM_NONE; + + char cluster_ip_backend = probe_mng->ipc_body.probe_param.cluster_ip_backend; + if (cluster_ip_backend == 0) { + return; + } + + struct tcp_connect_s connect; + + connect.role = (tracker->role == TCP_CLIENT) ? 1 : 0; + connect.family = tracker->client_ipaddr.family; + + if (connect.family == AF_INET) { + connect.cip_addr.c_ip = tracker->client_ipaddr.ip; + connect.sip_addr.s_ip = tracker->server_ipaddr.ip; + } else { + memcpy(&(connect.cip_addr), tracker->client_ipaddr.ip6, IP6_LEN); + memcpy(&(connect.sip_addr), tracker->server_ipaddr.ip6, IP6_LEN); + } + connect.c_port = tracker->client_ipaddr.port; + connect.s_port = tracker->server_ipaddr.port; + + if (cluster_ip_backend == 1) { // use conntrack + // Only transform Kubernetes cluster IP backend for the client TCP connection. + if (connect.role != 1) { + return; + } + (void)get_cluster_ip_backend(&connect, &transform); + } else if (cluster_ip_backend == 2) { // use FlowTracer + transform = lookup_flowtracer(&connect); + DEBUG("[ENDPOINT] FlowTracer transform: %d\n", transform); + } + + if (!transform) { + return; + } + +#ifdef GOPHER_DEBUG + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; + s_ip1[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(tracker->server_ipaddr.ip), s_ip1, sizeof(s_ip1)); + s_ip2[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, sizeof(s_ip2)); + c_ip1[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(tracker->client_ipaddr.ip), c_ip1, sizeof(c_ip1)); + c_ip2[0] = 0; + (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, sizeof(c_ip2)); + DEBUG("[ENDPOINT]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + c_ip1, tracker->client_ipaddr.port, s_ip1, tracker->server_ipaddr.port, + c_ip2, connect.c_port, s_ip2, connect.s_port); +#endif + + if (transform & ADDR_TRANSFORM_SERVER) { + if (connect.family == AF_INET) { + tracker->server_ipaddr.ip = connect.sip_addr.s_ip; + } else { + memcpy(tracker->server_ipaddr.ip6, &(connect.sip_addr.s_ip6), IP6_LEN); + } + tracker->server_ipaddr.port = connect.s_port; + } + if (transform & ADDR_TRANSFORM_CLIENT) { + if (connect.family == AF_INET) { + tracker->client_ipaddr.ip = connect.cip_addr.c_ip; + } else { + memcpy(tracker->client_ipaddr.ip6, &(connect.cip_addr.c_ip6), IP6_LEN); + } + tracker->client_ipaddr.port = connect.c_port; + } + + return; +} + #define MAX_ENDPOINT_ENTITES (5 * 1024) static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_event_s* evt) { @@ -409,9 +485,17 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e memcpy(&(id.client_ipaddr), &(evt->client_ipaddr), sizeof(id.client_ipaddr)); memcpy(&(id.server_ipaddr), &(evt->server_ipaddr), sizeof(id.server_ipaddr)); id.tgid = evt->tgid; - id.client_ipaddr.port = 0; + // id.client_ipaddr.port = 0; id.role = evt->role; +#ifdef GOPHER_DEBUG + char s_ip[INET6_ADDRSTRLEN] = {0}, c_ip[INET6_ADDRSTRLEN] = {0}; + (void)inet_ntop(id.server_ipaddr.family, (const void *)&(id.server_ipaddr.ip), s_ip, sizeof(s_ip)); + (void)inet_ntop(id.client_ipaddr.family, (const void *)&(id.client_ipaddr.ip), c_ip, sizeof(c_ip)); + DEBUG("[ENDPOINT]: tcp socket event %d, role: %d, flow: (%s:%u - %s:%u), tgid: %d\n", + evt->evt, evt->role, c_ip, id.client_ipaddr.port, s_ip, id.server_ipaddr.port, id.tgid); +#endif + tcp = lkup_tcp_socket(probe, (const struct tcp_socket_id_s *)&id); if (process_tcp_conn_close(tcp, evt) == 0) { return 0; @@ -441,6 +525,8 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e new_tcp->stats[evt->evt] += 1; new_tcp->last_rcv_data = time(NULL); + transform_cluster_ip(probe, &new_tcp->id); + ip_str(new_tcp->id.client_ipaddr.family, (unsigned char *)&(new_tcp->id.client_ipaddr.ip), client_ip_str, INET6_ADDRSTRLEN); ip_str(new_tcp->id.server_ipaddr.family, (unsigned char *)&(new_tcp->id.server_ipaddr.ip), server_ip_str, INET6_ADDRSTRLEN); new_tcp->client_ip = strdup((const char *)client_ip_str); -- Gitee From 8432c2686eb53fc52dd78652f79d9bfc01470051 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 8 Feb 2024 14:18:23 +0000 Subject: [PATCH 09/17] Unify log message prefix --- src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 20c7303e..294e895e 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -428,7 +428,7 @@ static void transform_cluster_ip(struct endpoint_probe_s *probe_mng, struct tcp_ (void)get_cluster_ip_backend(&connect, &transform); } else if (cluster_ip_backend == 2) { // use FlowTracer transform = lookup_flowtracer(&connect); - DEBUG("[ENDPOINT] FlowTracer transform: %d\n", transform); + DEBUG("[EPPROBE] FlowTracer transform: %d\n", transform); } if (!transform) { @@ -445,7 +445,7 @@ static void transform_cluster_ip(struct endpoint_probe_s *probe_mng, struct tcp_ (void)inet_ntop(connect.family, (const void *)&(tracker->client_ipaddr.ip), c_ip1, sizeof(c_ip1)); c_ip2[0] = 0; (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, sizeof(c_ip2)); - DEBUG("[ENDPOINT]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + DEBUG("[EPPROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", c_ip1, tracker->client_ipaddr.port, s_ip1, tracker->server_ipaddr.port, c_ip2, connect.c_port, s_ip2, connect.s_port); #endif @@ -492,7 +492,7 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e char s_ip[INET6_ADDRSTRLEN] = {0}, c_ip[INET6_ADDRSTRLEN] = {0}; (void)inet_ntop(id.server_ipaddr.family, (const void *)&(id.server_ipaddr.ip), s_ip, sizeof(s_ip)); (void)inet_ntop(id.client_ipaddr.family, (const void *)&(id.client_ipaddr.ip), c_ip, sizeof(c_ip)); - DEBUG("[ENDPOINT]: tcp socket event %d, role: %d, flow: (%s:%u - %s:%u), tgid: %d\n", + DEBUG("[EPPROBE]: tcp socket event %d, role: %d, flow: (%s:%u - %s:%u), tgid: %d\n", evt->evt, evt->role, c_ip, id.client_ipaddr.port, s_ip, id.server_ipaddr.port, id.tgid); #endif -- Gitee From 5a769cdec2a6e73f70955e140885a4682ebd46f5 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Fri, 9 Feb 2024 14:41:23 +0000 Subject: [PATCH 10/17] Update FlowTracer map when conntrack confirms a connection For a client side sockops program gets peer's original address only from PSH-ACK packet. This can be too late for some of socket related metrics; also this may not happen at all in case of connection establishment issues. With this patch the expected peer's address is extracted from conntrack data (expected address matches the real in case of k8s service). When a real peer's address received it will overwrite the expected one. --- .../src/flowtracer/flowtracer.bpf.c | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c index 17d81ea9..b609ca24 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c @@ -27,6 +27,10 @@ char LICENSE[] SEC("license") = "Dual BSD/GPL"; #define CG_ERR 0 #define ERR2CG(err) (err)? CG_ERR: CG_OK +#define NFCT_INFOMASK 7UL +#define NFCT_PTRMASK ~(NFCT_INFOMASK) +#define CTINFO2DIR(ctinfo) ((ctinfo) >= IP_CT_IS_REPLY ? IP_CT_DIR_REPLY : IP_CT_DIR_ORIGINAL) + // FlowTracer data: maps from observed TCP flow tuple into received remote original address #define MAX_ENTRIES_FLOW 65536 struct { @@ -155,6 +159,69 @@ int flowtracer_sockops_fn(struct bpf_sock_ops *skops) return CG_OK; } +SEC("kprobe/__nf_conntrack_confirm") +int BPF_KPROBE(nf_conntrack_confirm, struct sk_buff *skb) +{ + // Function '__nf_conntrack_confirm' is called when conntrack confirms a connection. For outgoing connections + // this happens immediately upon establishment (sending of SYN packet) + unsigned long nfct; + unsigned int ctinfo; + struct nf_conn *ct; + enum ip_conntrack_dir dir; + struct nf_conntrack_tuple_hash sth[2]; + + // Check if the current packet is IPv4/TCP + void *skb_head = BPF_CORE_READ(skb, head); + __u16 l3_offset = BPF_CORE_READ(skb, network_header); + + struct iphdr *iph = (struct iphdr *)(skb_head + l3_offset); + + if (BPF_CORE_READ_BITFIELD_PROBED(iph, version) != 4) + return 0; // only IPv4 is supported + + if (BPF_CORE_READ(iph, protocol) != IPPROTO_TCP) { + return 0; // only TCP is supported + } + + // Init conntrack info + BPF_CORE_READ_INTO(&nfct, skb, _nfct); + ctinfo = nfct & NFCT_INFOMASK; + ct = (struct nf_conn *)(nfct & NFCT_PTRMASK); + dir = CTINFO2DIR(ctinfo); + BPF_CORE_READ_INTO(&sth, ct, tuplehash); + + // collect conntrack entry + __u32 src_ip_f = sth[dir].tuple.src.u3.ip; + __be16 src_port_f = sth[dir].tuple.src.u.tcp.port; + __u32 dst_ip_f = sth[dir].tuple.dst.u3.ip; + __be16 dst_port_f = sth[dir].tuple.dst.u.tcp.port; + __u32 src_ip_b = sth[!dir].tuple.src.u3.ip; + __be16 src_port_b = sth[!dir].tuple.src.u.tcp.port; + __u32 dst_ip_b = sth[!dir].tuple.dst.u3.ip; + __be16 dst_port_b = sth[!dir].tuple.dst.u.tcp.port; + + if (dst_ip_b == src_ip_f && dst_port_b == src_port_f) { + return 0; // address is not translated, ignore + } + + // Socket view -> Packet view + // We can use conntrack information to map observed socket addresses to real destination original ones + struct flow_key flow_key = {0}; // as observed at socket level of a client + flow_key.local_ip4 = src_ip_f; + flow_key.local_port = src_port_f; + flow_key.remote_ip4 = dst_ip_f; + flow_key.remote_port = dst_port_f; + flow_key.l4_proto = IPPROTO_TCP; + + struct flow_data flow_data = {0}; // real address of a server + flow_data.original_remote_ip4 = src_ip_b; + flow_data.original_remote_port = src_port_b; + + bpf_map_update_elem(&flowtracer_data, &flow_key, &flow_data, BPF_NOEXIST); + notify(FLOW_LOG_ADD, &flow_key, &flow_data); + return 0; +} + SEC("kprobe/nf_conntrack_destroy") int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) { -- Gitee From 66c5c870aa1623e9858df389440ea1f5745d263d Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 13 Mar 2024 10:56:39 +0000 Subject: [PATCH 11/17] Process TCP socket events with delay to let FlowTracer receive address mapping data --- .../ebpf.probe/src/endpointprobe/endpoint.c | 31 ++++++- .../src/include/delaying_ring_buffer.h | 50 +++++++++++ .../ebpf.probe/src/lib/delaying_ring_buffer.c | 90 +++++++++++++++++++ 3 files changed, 167 insertions(+), 4 deletions(-) create mode 100644 src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h create mode 100644 src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 294e895e..c3e5fb0d 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -43,6 +43,7 @@ #include "hash.h" #include "conntrack.h" #include "container.h" +#include "delaying_ring_buffer.h" #include "flowtracer_reader.h" #include "histogram.h" @@ -140,6 +141,7 @@ struct endpoint_probe_s { int tcp_socks_num; int udp_socks_num; time_t last_report; + struct delaying_ring_buffer *drb; }; static volatile sig_atomic_t g_stop; @@ -485,17 +487,19 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e memcpy(&(id.client_ipaddr), &(evt->client_ipaddr), sizeof(id.client_ipaddr)); memcpy(&(id.server_ipaddr), &(evt->server_ipaddr), sizeof(id.server_ipaddr)); id.tgid = evt->tgid; - // id.client_ipaddr.port = 0; id.role = evt->role; #ifdef GOPHER_DEBUG char s_ip[INET6_ADDRSTRLEN] = {0}, c_ip[INET6_ADDRSTRLEN] = {0}; (void)inet_ntop(id.server_ipaddr.family, (const void *)&(id.server_ipaddr.ip), s_ip, sizeof(s_ip)); (void)inet_ntop(id.client_ipaddr.family, (const void *)&(id.client_ipaddr.ip), c_ip, sizeof(c_ip)); - DEBUG("[EPPROBE]: tcp socket event %d, role: %d, flow: (%s:%u - %s:%u), tgid: %d\n", + DEBUG("[EPPROBE]: tcp socket event %d, role: %d, observed flow: (%s:%u - %s:%u), tgid: %d\n", evt->evt, evt->role, c_ip, id.client_ipaddr.port, s_ip, id.server_ipaddr.port, id.tgid); #endif + transform_cluster_ip(probe, &id); + id.client_ipaddr.port = 0; // Gala aggregates all client metrics + tcp = lkup_tcp_socket(probe, (const struct tcp_socket_id_s *)&id); if (process_tcp_conn_close(tcp, evt) == 0) { return 0; @@ -525,8 +529,6 @@ static int add_tcp_sock_evt(struct endpoint_probe_s * probe, struct tcp_socket_e new_tcp->stats[evt->evt] += 1; new_tcp->last_rcv_data = time(NULL); - transform_cluster_ip(probe, &new_tcp->id); - ip_str(new_tcp->id.client_ipaddr.family, (unsigned char *)&(new_tcp->id.client_ipaddr.ip), client_ip_str, INET6_ADDRSTRLEN); ip_str(new_tcp->id.server_ipaddr.family, (unsigned char *)&(new_tcp->id.server_ipaddr.ip), server_ip_str, INET6_ADDRSTRLEN); new_tcp->client_ip = strdup((const char *)client_ip_str); @@ -599,6 +601,13 @@ err: } static int proc_tcp_sock_evt(void *ctx, void *data, u32 size) +{ + struct endpoint_probe_s *probe = ctx; + drb_put(probe->drb, data, size); + return 0; +} + +static int proc_tcp_sock_evt_continue(void *ctx, void *data, u32 size) { char *p = data; int remain_size = (int)size, step_size = sizeof(struct tcp_socket_event_s), offset = 0; @@ -1017,6 +1026,15 @@ static int poll_endpoint_pb(struct endpoint_probe_s *probe) return 0; } +static void poll_drb(struct endpoint_probe_s *probe) +{ + const struct drb_item *item; + while ((item = drb_look(probe->drb))) { + proc_tcp_sock_evt_continue(probe, item->data, item->size); + drb_pop(probe->drb); + } +} + static char is_report_tmout(struct endpoint_probe_s *probe) { time_t current = time(NULL); @@ -1103,6 +1121,8 @@ int main(int argc, char **argv) INIT_BPF_APP(endpoint, EBPF_RLIM_LIMITED); INFO("[ENDPOINTPROBE] Successfully started!\n"); + g_ep_probe.drb = drb_new(4096, 500); + while(!g_stop) { ret = recv_ipc_msg(msq_id, (long)PROBE_SOCKET, &ipc_body); if (ret == 0) { @@ -1128,6 +1148,7 @@ int main(int argc, char **argv) sleep(1); } + poll_drb(&g_ep_probe); report_tcp_socks(&g_ep_probe); report_endpoint(&g_ep_probe); } @@ -1139,5 +1160,7 @@ int main(int argc, char **argv) unload_bpf_prog(&(g_ep_probe.prog)); destroy_ipc_body(&(g_ep_probe.ipc_body)); + drb_destroy(g_ep_probe.drb); + return ret; } diff --git a/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h new file mode 100644 index 00000000..329cc127 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h @@ -0,0 +1,50 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-03-15 + * Description: A circular (ring) buffer with time-based delay of reading operations + ******************************************************************************/ +#ifndef __DELAYING_RING_BUFFER_H__ +#define __DELAYING_RING_BUFFER_H__ + +#include + +struct drb_item { + void *data; + int size; + struct timespec creation_time; +}; + +struct delaying_ring_buffer { + struct drb_item *storage; + int writer_idx; + int reader_idx; + int capacity; + int delay_ms; +}; + +struct delaying_ring_buffer *drb_new(int capacity, int delay_ms); +void drb_destroy(struct delaying_ring_buffer *drb); +int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size); +const struct drb_item *drb_look(struct delaying_ring_buffer *drb); +int drb_pop(struct delaying_ring_buffer *drb); + +#define DELAYING_RING_BUFFER(var,capacity,delay) \ + uint8_t x##_data_storage[capacity]; \ + struct delaying_ring_buffer x = { \ + .storage = x##_data_storage, \ + .writer_idx = 0, \ + .reader_idx = 0, \ + .capacity = capacity, \ + .delay = delay \ + } + +#endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c new file mode 100644 index 00000000..63670260 --- /dev/null +++ b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c @@ -0,0 +1,90 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * gala-gopher licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: ilyashakhat + * Create: 2024-03-15 + * Description: A circular (ring) buffer with time-based delay of reading operations + ******************************************************************************/ +#include +#include +#include +#include "common.h" +#include "delaying_ring_buffer.h" + +struct delaying_ring_buffer *drb_new(int capacity, int delay_ms) +{ + struct delaying_ring_buffer *drb = malloc(sizeof(struct delaying_ring_buffer)); + drb->reader_idx = drb->writer_idx = 0; + drb->capacity = capacity; + drb->delay_ms = delay_ms; + drb->storage = malloc(sizeof(struct drb_item) * capacity); + INFO("[DRB] Allocated a delaying ring buffer with capacity %d and delay %d ms\n", capacity, delay_ms); + return drb; +} + +void drb_destroy(struct delaying_ring_buffer *drb) +{ + free(drb->storage); + free(drb); + INFO("[DRB] Delaying ring buffer destroyed\n"); +} + +int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size) +{ + int writer_idx = drb->writer_idx; + if ((writer_idx + 1) % drb->capacity == drb->reader_idx) { + return -1; // storage is full + } + + char *copy = malloc(size); + memcpy(copy, data, size); + drb->storage[writer_idx].data = copy; + drb->storage[writer_idx].size = size; + clock_gettime(CLOCK_MONOTONIC, &drb->storage[writer_idx].creation_time); + + drb->writer_idx = (drb->writer_idx + 1) % drb->capacity; + return 0; +} + +static void timespec_diff(struct timespec *now, struct timespec *past, struct timespec *diff) { + diff->tv_sec = (now)->tv_sec - (past)->tv_sec; + diff->tv_nsec = (now)->tv_nsec - (past)->tv_nsec; + if (diff->tv_nsec < 0) { + --diff->tv_sec; + diff->tv_nsec += 1000000000; + } +} + +const struct drb_item *drb_look(struct delaying_ring_buffer *drb) +{ + if (drb->reader_idx == drb->writer_idx) { + return NULL; + } + struct timespec now, diff; + clock_gettime(CLOCK_MONOTONIC, &now); + + struct drb_item *item = &drb->storage[drb->reader_idx]; + + timespec_diff(&now, &item->creation_time, &diff); + if (diff.tv_sec == 0 && (diff.tv_nsec / 1000000) < drb->delay_ms) { + return NULL; + } + return &drb->storage[drb->reader_idx]; +} + +int drb_pop(struct delaying_ring_buffer *drb) { + if (drb->writer_idx == drb->reader_idx) { + return -1; + } + struct drb_item *item = &drb->storage[drb->reader_idx]; + free(item->data); + drb->reader_idx = (drb->reader_idx + 1) % drb->capacity; + return 0; +} -- Gitee From fc191090657d4e62116951cc7f7eec4edf8c6f8d Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 13 Mar 2024 13:19:37 +0000 Subject: [PATCH 12/17] Delay processing of L7 events to let FlowTracer receive address mapping data --- .../extends/ebpf.probe/src/l7probe/conn_tracker.c | 7 +++++++ .../ebpf.probe/src/l7probe/include/conn_tracker.h | 1 + .../extends/ebpf.probe/src/l7probe/l7_common.h | 2 ++ src/probes/extends/ebpf.probe/src/l7probe/l7probe.c | 13 +++++++++++++ 4 files changed, 23 insertions(+) diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index 2739688c..f0160c35 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -1057,6 +1057,13 @@ void l7_parser(void *ctx) } int tracker_msg(void *ctx, void *data, u32 size) +{ + struct l7_mng_s *l7_mng = ctx; + drb_put(l7_mng->drb, data, size); + return 0; +} + +int tracker_msg_continue(void *ctx, void *data, u32 size) { char *p = data; struct l7_mng_s *l7_mng = ctx; diff --git a/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h b/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h index c129eead..1a3e341c 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h +++ b/src/probes/extends/ebpf.probe/src/l7probe/include/conn_tracker.h @@ -195,6 +195,7 @@ void l7_parser(void *ctx); void report_l7(void *ctx); int tracker_msg(void *ctx, void *data, u32 size); +int tracker_msg_continue(void *ctx, void *data, u32 size); #endif diff --git a/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h b/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h index a7f96038..05566015 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h +++ b/src/probes/extends/ebpf.probe/src/l7probe/l7_common.h @@ -15,6 +15,7 @@ #ifndef __L7_COMMON_H__ #define __L7_COMMON_H__ +#include "delaying_ring_buffer.h" #include "ipc.h" #include "include/filter.h" #include "include/connect.h" @@ -51,6 +52,7 @@ struct l7_mng_s { struct l7_link_s *l7_links; struct conn_data_s conn_data; struct java_proc_s *java_procs; + struct delaying_ring_buffer *drb; }; #endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c index be9fd09b..f7733d7f 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c @@ -350,6 +350,15 @@ static int poll_l7_pb(struct l7_ebpf_prog_s* ebpf_progs) return 0; } +static void poll_drb(struct l7_mng_s *l7_mng) +{ + const struct drb_item *item; + while ((item = drb_look(l7_mng->drb))) { + tracker_msg_continue(l7_mng, item->data, item->size); + drb_pop(l7_mng->drb); + } +} + static void save_filter_proto(int fd, u32 proto) { u32 key = 0; @@ -379,6 +388,7 @@ int main(int argc, char **argv) } (void)memset(l7_mng, 0, sizeof(struct l7_mng_s)); + l7_mng->drb = drb_new(4096, 500); int msq_id = create_ipc_msg_queue(IPC_EXCL); if (msq_id < 0) { @@ -441,6 +451,7 @@ int main(int argc, char **argv) ERROR("[L7Probe]: perf poll failed(%d).\n", ret); break; } + poll_drb(l7_mng); l7_parser(l7_mng); report_l7(l7_mng); } else { @@ -454,5 +465,7 @@ err: l7_unload_probe_jsse(l7_mng); unload_l7_prog(l7_mng); destroy_ipc_body(&(l7_mng->ipc_body)); + drb_destroy(l7_mng->drb); + INFO("[L7PROBE] Cleanup is completed"); return 0; } -- Gitee From bdb1240b09ba5836c5d38e78bc42da5349e9fbef Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 13 Mar 2024 14:04:31 +0000 Subject: [PATCH 13/17] Add error handling into delaying ring buffer --- .../ebpf.probe/src/endpointprobe/endpoint.c | 12 +++++++++--- .../extends/ebpf.probe/src/l7probe/conn_tracker.c | 4 +++- .../extends/ebpf.probe/src/l7probe/l7probe.c | 5 +++++ .../ebpf.probe/src/lib/delaying_ring_buffer.c | 14 ++++++++++++-- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index c3e5fb0d..73c1373d 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -603,7 +603,9 @@ err: static int proc_tcp_sock_evt(void *ctx, void *data, u32 size) { struct endpoint_probe_s *probe = ctx; - drb_put(probe->drb, data, size); + if (drb_put(probe->drb, data, size)) { + WARN("[EPPROBE] Not enough space to put event into the ring buffer. Event is discarded.\n"); + } return 0; } @@ -1118,11 +1120,15 @@ int main(int argc, char **argv) return -1; } + g_ep_probe.drb = drb_new(4096, 500); + if (!g_ep_probe.drb) { + ERROR("[ENDPOINTPROBE] Failed to allocate delaying ring buffer.\n"); + return -1; + } + INIT_BPF_APP(endpoint, EBPF_RLIM_LIMITED); INFO("[ENDPOINTPROBE] Successfully started!\n"); - g_ep_probe.drb = drb_new(4096, 500); - while(!g_stop) { ret = recv_ipc_msg(msq_id, (long)PROBE_SOCKET, &ipc_body); if (ret == 0) { diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index f0160c35..4bae774e 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -1059,7 +1059,9 @@ void l7_parser(void *ctx) int tracker_msg(void *ctx, void *data, u32 size) { struct l7_mng_s *l7_mng = ctx; - drb_put(l7_mng->drb, data, size); + if (drb_put(l7_mng->drb, data, size)) { + WARN("[L7PROBE] Not enough space to put event into the ring buffer. Event is discarded.\n"); + } return 0; } diff --git a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c index f7733d7f..af093339 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/l7probe.c @@ -388,7 +388,12 @@ int main(int argc, char **argv) } (void)memset(l7_mng, 0, sizeof(struct l7_mng_s)); + l7_mng->drb = drb_new(4096, 500); + if (!l7_mng->drb) { + ERROR("[L7PROBE] Failed to allocate delaying ring buffer.\n"); + goto err; + } int msq_id = create_ipc_msg_queue(IPC_EXCL); if (msq_id < 0) { diff --git a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c index 63670260..e855a82b 100644 --- a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c +++ b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c @@ -21,6 +21,9 @@ struct delaying_ring_buffer *drb_new(int capacity, int delay_ms) { struct delaying_ring_buffer *drb = malloc(sizeof(struct delaying_ring_buffer)); + if (!drb) { + return NULL; + } drb->reader_idx = drb->writer_idx = 0; drb->capacity = capacity; drb->delay_ms = delay_ms; @@ -31,8 +34,15 @@ struct delaying_ring_buffer *drb_new(int capacity, int delay_ms) void drb_destroy(struct delaying_ring_buffer *drb) { - free(drb->storage); - free(drb); + if (drb) { + if (drb->storage) { + for (int idx = drb->reader_idx; idx < drb->writer_idx; idx++) { + free(drb->storage[idx].data); + } + free(drb->storage); + } + free(drb); + } INFO("[DRB] Delaying ring buffer destroyed\n"); } -- Gitee From fa26c4a0bd48f92b6642e1312c4466f6e44fa5fd Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 13 Mar 2024 09:34:50 +0000 Subject: [PATCH 14/17] Add map fd into debug message from flowtracer reader --- src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c index a1af16cd..f184e7bc 100644 --- a/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c +++ b/src/probes/extends/ebpf.probe/src/lib/flowtracer_reader.c @@ -77,8 +77,8 @@ int lookup_flowtracer(struct tcp_connect_s *connect) { struct flow_data value = {0}; - DEBUG("[lookup_flowtracer] Lookup local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d, l4_proto: %d, key-size: %d, value-size: %d\n", - key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port, key.l4_proto, sizeof(key), sizeof(value)); + DEBUG("[lookup_flowtracer] Lookup fd %d, local_ip4: %u (%x), local_port: %d, remote_ip4: %u (%x), remote_port: %d, l4_proto: %d, key_size: %d, value_size: %d\n", + g_flow_tracer_map_fd, key.local_ip4, key.local_ip4, key.local_port, key.remote_ip4, key.remote_ip4, key.remote_port, key.l4_proto, sizeof(key), sizeof(value)); int err = bpf_map_lookup_elem(g_flow_tracer_map_fd, &key, &value); if (err) { -- Gitee From b86c24518f2a980b18d71fd3b7c691546bdc5c96 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 18 Dec 2023 11:25:12 +0000 Subject: [PATCH 15/17] Add build artifacts into .gitignore --- .gitignore | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.gitignore b/.gitignore index e7a79a4b..18a081ea 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,12 @@ submodule_test # idea files *.idea + +# BPF skeletons +*.skel.* + +# Build artifacts +/bpftool +/gala-gopher +/gopher-ctl +/src/daemon/build -- Gitee From a6238327cc91d6e3e276826972608e81b7d90323 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Wed, 13 Mar 2024 20:35:01 +0000 Subject: [PATCH 16/17] Fix flow transformation log message for IPv6 addresses + Cleanup the code --- .../ebpf.probe/src/endpointprobe/endpoint.c | 12 +++------ .../ebpf.probe/src/l7probe/conn_tracker.c | 12 +++------ .../ebpf.probe/src/tcpprobe/tcp_tracker.c | 26 ++++++++----------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 73c1373d..eec57bef 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -439,14 +439,10 @@ static void transform_cluster_ip(struct endpoint_probe_s *probe_mng, struct tcp_ #ifdef GOPHER_DEBUG char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; - s_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->server_ipaddr.ip), s_ip1, sizeof(s_ip1)); - s_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, sizeof(s_ip2)); - c_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->client_ipaddr.ip), c_ip1, sizeof(c_ip1)); - c_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, sizeof(c_ip2)); + inet_ntop(connect.family, &tracker->server_ipaddr.ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect.family, &connect.sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(connect.family, &tracker->client_ipaddr.ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect.family, &connect.cip_addr, c_ip2, sizeof(c_ip2)); DEBUG("[EPPROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", c_ip1, tracker->client_ipaddr.port, s_ip1, tracker->server_ipaddr.port, c_ip2, connect.c_port, s_ip2, connect.s_port); diff --git a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c index 4bae774e..5d894012 100644 --- a/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c +++ b/src/probes/extends/ebpf.probe/src/l7probe/conn_tracker.c @@ -394,14 +394,10 @@ static void transform_cluster_ip(struct l7_mng_s *l7_mng, struct conn_tracker_s* #ifdef GOPHER_DEBUG char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; - s_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.server_addr.ip), s_ip1, sizeof(s_ip1)); - s_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.sip_addr), s_ip2, sizeof(s_ip2)); - c_ip1[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(tracker->open_info.client_addr.ip), c_ip1, sizeof(c_ip1)); - c_ip2[0] = 0; - (void)inet_ntop(connect.family, (const void *)&(connect.cip_addr), c_ip2, sizeof(c_ip2)); + inet_ntop(connect.family, &tracker->open_info.server_addr.ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect.family, &connect.sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(connect.family, &tracker->open_info.client_addr.ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect.family, &connect.cip_addr, c_ip2, sizeof(c_ip2)); DEBUG("[L7PROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", c_ip1, tracker->open_info.client_addr.port, s_ip1, tracker->open_info.server_addr.port, c_ip2, connect.c_port, s_ip2, connect.s_port); diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c index c6c41976..cdc3a2da 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tracker.c @@ -210,22 +210,18 @@ static int __transform_cluster_ip(struct tcp_mng_s *tcp_mng, const struct tcp_li transform = lookup_flowtracer(connect); } -#ifdef GOPHER_DEBUG - char c_ip1[IP6_LEN], s_ip1[IP6_LEN]; - inet_ntop(tcp_link->family, (const void *)&(tcp_link->c_ip), c_ip1, IP6_LEN); - inet_ntop(tcp_link->family, (const void *)&(tcp_link->s_ip), s_ip1, IP6_LEN); - - if (transform) { - char c_ip2[IP6_LEN], s_ip2[IP6_LEN]; - inet_ntop(connect->family, (const void *)&(connect->cip_addr), c_ip2, IP6_LEN); - inet_ntop(connect->family, (const void *)&(connect->sip_addr), s_ip2, IP6_LEN); - DEBUG("[TCPPROBE]: Flow (%s:%u-%s:%u) is transformed into (%s:%u-%s:%u)\n", - c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port, - c_ip2, connect->c_port, s_ip2, connect->s_port); - } else { - DEBUG("[TCPPROBE]: Flow (%s:%u-%s:%u) is not transformed\n", - c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port); + if (!transform) { + return ADDR_TRANSFORM_NONE; } + +#ifdef GOPHER_DEBUG + char s_ip1[IP6_STR_LEN], s_ip2[IP6_STR_LEN], c_ip1[IP6_STR_LEN], c_ip2[IP6_STR_LEN]; + inet_ntop(tcp_link->family, &tcp_link->s_ip, s_ip1, sizeof(s_ip1)); + inet_ntop(connect->family, &connect->sip_addr, s_ip2, sizeof(s_ip2)); + inet_ntop(tcp_link->family, &tcp_link->c_ip, c_ip1, sizeof(c_ip1)); + inet_ntop(connect->family, &connect->cip_addr, c_ip2, sizeof(c_ip2)); + DEBUG("[TCPPROBE]: Flow (%s:%u - %s:%u) is transformed into (%s:%u - %s:%u)\n", + c_ip1, tcp_link->c_port, s_ip1, tcp_link->s_port, c_ip2, connect->c_port, s_ip2, connect->s_port); #endif return transform; -- Gitee From e9e3cfa34f9caf70f75305a779af0d0f7db80364 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 25 Mar 2024 09:40:24 +0000 Subject: [PATCH 17/17] White-space cleanup --- .../src/flowtracer/flowtracer.bpf.c | 120 +++++++++--------- .../ebpf.probe/src/flowtracer/flowtracer.c | 6 +- .../src/include/delaying_ring_buffer.h | 12 +- .../ebpf.probe/src/lib/delaying_ring_buffer.c | 6 +- 4 files changed, 67 insertions(+), 77 deletions(-) diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c index b609ca24..fcaa652e 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.bpf.c @@ -66,13 +66,13 @@ static void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_d static inline void notify(enum flow_log_op op, struct flow_key *flow_key, struct flow_data *flow_data) {} // NOP #endif -static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops, __u32 extra) +static inline int set_hdr_cb_flags(struct bpf_sock_ops *skops, __u32 extra) { long err = bpf_sock_ops_cb_flags_set(skops, skops->bpf_sock_ops_cb_flags | extra); return ERR2CG(err); } -static int handle_hdr_opt_len(struct bpf_sock_ops *skops) +static int handle_hdr_opt_len(struct bpf_sock_ops *skops) { const int bytes_to_reserve = sizeof(struct tcp_opt_source_info); int err = bpf_reserve_hdr_opt(skops, bytes_to_reserve, 0); @@ -162,64 +162,64 @@ int flowtracer_sockops_fn(struct bpf_sock_ops *skops) SEC("kprobe/__nf_conntrack_confirm") int BPF_KPROBE(nf_conntrack_confirm, struct sk_buff *skb) { - // Function '__nf_conntrack_confirm' is called when conntrack confirms a connection. For outgoing connections - // this happens immediately upon establishment (sending of SYN packet) - unsigned long nfct; - unsigned int ctinfo; - struct nf_conn *ct; - enum ip_conntrack_dir dir; - struct nf_conntrack_tuple_hash sth[2]; + // Function '__nf_conntrack_confirm' is called when conntrack confirms a connection. For outgoing connections + // this happens immediately upon establishment (sending of SYN packet) + unsigned long nfct; + unsigned int ctinfo; + struct nf_conn *ct; + enum ip_conntrack_dir dir; + struct nf_conntrack_tuple_hash sth[2]; // Check if the current packet is IPv4/TCP void *skb_head = BPF_CORE_READ(skb, head); - __u16 l3_offset = BPF_CORE_READ(skb, network_header); - - struct iphdr *iph = (struct iphdr *)(skb_head + l3_offset); - - if (BPF_CORE_READ_BITFIELD_PROBED(iph, version) != 4) - return 0; // only IPv4 is supported - - if (BPF_CORE_READ(iph, protocol) != IPPROTO_TCP) { - return 0; // only TCP is supported - } - - // Init conntrack info - BPF_CORE_READ_INTO(&nfct, skb, _nfct); - ctinfo = nfct & NFCT_INFOMASK; - ct = (struct nf_conn *)(nfct & NFCT_PTRMASK); - dir = CTINFO2DIR(ctinfo); - BPF_CORE_READ_INTO(&sth, ct, tuplehash); - - // collect conntrack entry - __u32 src_ip_f = sth[dir].tuple.src.u3.ip; - __be16 src_port_f = sth[dir].tuple.src.u.tcp.port; - __u32 dst_ip_f = sth[dir].tuple.dst.u3.ip; - __be16 dst_port_f = sth[dir].tuple.dst.u.tcp.port; - __u32 src_ip_b = sth[!dir].tuple.src.u3.ip; - __be16 src_port_b = sth[!dir].tuple.src.u.tcp.port; - __u32 dst_ip_b = sth[!dir].tuple.dst.u3.ip; - __be16 dst_port_b = sth[!dir].tuple.dst.u.tcp.port; - - if (dst_ip_b == src_ip_f && dst_port_b == src_port_f) { - return 0; // address is not translated, ignore - } - - // Socket view -> Packet view - // We can use conntrack information to map observed socket addresses to real destination original ones - struct flow_key flow_key = {0}; // as observed at socket level of a client - flow_key.local_ip4 = src_ip_f; - flow_key.local_port = src_port_f; - flow_key.remote_ip4 = dst_ip_f; - flow_key.remote_port = dst_port_f; - flow_key.l4_proto = IPPROTO_TCP; - - struct flow_data flow_data = {0}; // real address of a server - flow_data.original_remote_ip4 = src_ip_b; - flow_data.original_remote_port = src_port_b; - - bpf_map_update_elem(&flowtracer_data, &flow_key, &flow_data, BPF_NOEXIST); + __u16 l3_offset = BPF_CORE_READ(skb, network_header); + + struct iphdr *iph = (struct iphdr *)(skb_head + l3_offset); + + if (BPF_CORE_READ_BITFIELD_PROBED(iph, version) != 4) + return 0; // only IPv4 is supported + + if (BPF_CORE_READ(iph, protocol) != IPPROTO_TCP) { + return 0; // only TCP is supported + } + + // Init conntrack info + BPF_CORE_READ_INTO(&nfct, skb, _nfct); + ctinfo = nfct & NFCT_INFOMASK; + ct = (struct nf_conn *)(nfct & NFCT_PTRMASK); + dir = CTINFO2DIR(ctinfo); + BPF_CORE_READ_INTO(&sth, ct, tuplehash); + + // collect conntrack entry + __u32 src_ip_f = sth[dir].tuple.src.u3.ip; + __be16 src_port_f = sth[dir].tuple.src.u.tcp.port; + __u32 dst_ip_f = sth[dir].tuple.dst.u3.ip; + __be16 dst_port_f = sth[dir].tuple.dst.u.tcp.port; + __u32 src_ip_b = sth[!dir].tuple.src.u3.ip; + __be16 src_port_b = sth[!dir].tuple.src.u.tcp.port; + __u32 dst_ip_b = sth[!dir].tuple.dst.u3.ip; + __be16 dst_port_b = sth[!dir].tuple.dst.u.tcp.port; + + if (dst_ip_b == src_ip_f && dst_port_b == src_port_f) { + return 0; // address is not translated, ignore + } + + // Socket view -> Packet view + // We can use conntrack information to map an observed socket address to a real destination address + struct flow_key flow_key = {0}; // as observed at socket level of a client + flow_key.local_ip4 = src_ip_f; + flow_key.local_port = src_port_f; + flow_key.remote_ip4 = dst_ip_f; + flow_key.remote_port = dst_port_f; + flow_key.l4_proto = IPPROTO_TCP; + + struct flow_data flow_data = {0}; // real address of a server + flow_data.original_remote_ip4 = src_ip_b; + flow_data.original_remote_port = src_port_b; + + bpf_map_update_elem(&flowtracer_data, &flow_key, &flow_data, BPF_NOEXIST); notify(FLOW_LOG_ADD, &flow_key, &flow_data); - return 0; + return 0; } SEC("kprobe/nf_conntrack_destroy") @@ -233,17 +233,17 @@ int BPF_KPROBE(nf_conntrack_destroy, struct nf_conntrack *nfct) struct nf_conntrack_tuple_hash th[2]; BPF_CORE_READ_INTO(&th, (struct nf_conn *)nfct, tuplehash); - if (th[IP_CT_DIR_ORIGINAL].tuple.dst.protonum != IPPROTO_TCP) { - return 0; + if (th[IP_CT_DIR_ORIGINAL].tuple.dst.protonum != IPPROTO_TCP) { + return 0; } // Conntrack tracks 4-tuple for original packets and 4-tuple for reply packets in tuplehash array. // If a connection is originated from the current host (= client), then 4-tuple of original packets - // is the same as observed at the socket level. + // is the same as observed at the socket level. // Example of k8s pod talking to virtual service IP: // * original: (10.0.0.159, 34568, 10.247.204.240, 8000) --> matches client's socket view // * reply: (10.0.0.28, 8000, 192.168.3.14, 16256) - // If a connection is incoming (= server), then 4-tuple of reply packets match the one observed + // If a connection is incoming (= server), then 4-tuple of reply packets match the one observed // at the socket level. // Example of an externally accessible service running in a docker container: // * original: (10.221.55.32, 57911, 10.82.1.33, 8001) diff --git a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c index 808fce9b..8533f03c 100644 --- a/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c +++ b/src/probes/extends/ebpf.probe/src/flowtracer/flowtracer.c @@ -29,7 +29,7 @@ static volatile sig_atomic_t stop; -static void sig_handler(int signo) +static void sig_handler(int signo) { stop = 1; } @@ -168,8 +168,8 @@ cleanup: ring_buffer__free(ring_buffer); flowtracer_bpf__destroy(flowtracer_skel); close(cg_fd); - // Delete BPF map (pinned at the default libbpf location) - unlink("/sys/fs/bpf/flowtracer_data"); + // Delete BPF map (pinned at the default libbpf location) + unlink("/sys/fs/bpf/flowtracer_data"); INFO("[FlowTracer] Cleanup is completed\n"); return -err; } diff --git a/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h index 329cc127..4ddc24ef 100644 --- a/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h +++ b/src/probes/extends/ebpf.probe/src/include/delaying_ring_buffer.h @@ -10,7 +10,7 @@ * See the Mulan PSL v2 for more details. * Author: ilyashakhat * Create: 2024-03-15 - * Description: A circular (ring) buffer with time-based delay of reading operations + * Description: A circular (ring) buffer with a time-based delay of reading operations ******************************************************************************/ #ifndef __DELAYING_RING_BUFFER_H__ #define __DELAYING_RING_BUFFER_H__ @@ -37,14 +37,4 @@ int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size); const struct drb_item *drb_look(struct delaying_ring_buffer *drb); int drb_pop(struct delaying_ring_buffer *drb); -#define DELAYING_RING_BUFFER(var,capacity,delay) \ - uint8_t x##_data_storage[capacity]; \ - struct delaying_ring_buffer x = { \ - .storage = x##_data_storage, \ - .writer_idx = 0, \ - .reader_idx = 0, \ - .capacity = capacity, \ - .delay = delay \ - } - #endif \ No newline at end of file diff --git a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c index e855a82b..42b564a1 100644 --- a/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c +++ b/src/probes/extends/ebpf.probe/src/lib/delaying_ring_buffer.c @@ -10,7 +10,7 @@ * See the Mulan PSL v2 for more details. * Author: ilyashakhat * Create: 2024-03-15 - * Description: A circular (ring) buffer with time-based delay of reading operations + * Description: A circular (ring) buffer with a time-based delay of reading operations ******************************************************************************/ #include #include @@ -46,7 +46,7 @@ void drb_destroy(struct delaying_ring_buffer *drb) INFO("[DRB] Delaying ring buffer destroyed\n"); } -int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size) +int drb_put(struct delaying_ring_buffer *drb, const char *data, const int size) { int writer_idx = drb->writer_idx; if ((writer_idx + 1) % drb->capacity == drb->reader_idx) { @@ -72,7 +72,7 @@ static void timespec_diff(struct timespec *now, struct timespec *past, struct ti } } -const struct drb_item *drb_look(struct delaying_ring_buffer *drb) +const struct drb_item *drb_look(struct delaying_ring_buffer *drb) { if (drb->reader_idx == drb->writer_idx) { return NULL; -- Gitee