From 93b7b58601ac52123f978efe16b183fda6953885 Mon Sep 17 00:00:00 2001 From: caixiaomeng Date: Fri, 28 Feb 2025 16:40:24 +0800 Subject: [PATCH] add dfx for xalarmd to rebuild connection after communication disconnection (cherry picked from commit 0ecae622474eda54b9ef60ea1619456929bbf07e) --- ...rmd-to-rebuild-connection-after-comm.patch | 488 ++++++++++++++++++ sysSentry.spec | 9 +- 2 files changed, 496 insertions(+), 1 deletion(-) create mode 100644 add-dfx-for-xalarmd-to-rebuild-connection-after-comm.patch diff --git a/add-dfx-for-xalarmd-to-rebuild-connection-after-comm.patch b/add-dfx-for-xalarmd-to-rebuild-connection-after-comm.patch new file mode 100644 index 0000000..4ddac69 --- /dev/null +++ b/add-dfx-for-xalarmd-to-rebuild-connection-after-comm.patch @@ -0,0 +1,488 @@ +From b0bded3b728fd9118658a2a2d21e18b2e50c3355 Mon Sep 17 00:00:00 2001 +From: PshySimon +Date: Mon, 24 Feb 2025 19:11:26 +0800 +Subject: [PATCH] add dfx for xalarmd to rebuild connection after communication + disconnection + +--- + config/service/xalarmd.service | 7 +- + src/libs/libxalarm/register_xalarm.c | 2 +- + src/services/xalarm/xalarm_server.py | 242 ++++++++++++++++++++++--- + src/services/xalarm/xalarm_transfer.py | 49 +++-- + 4 files changed, 249 insertions(+), 51 deletions(-) + +diff --git a/config/service/xalarmd.service b/config/service/xalarmd.service +index 0665b39..20db997 100644 +--- a/config/service/xalarmd.service ++++ b/config/service/xalarmd.service +@@ -2,8 +2,13 @@ + Description = xalarm daemon for alarm messages forwarding + + [Service] +-ExecStart =/usr/bin/python3 /usr/bin/xalarmd + Type = forking ++ExecStart=/usr/bin/python3 /usr/bin/xalarmd ++ExecStop=/bin/kill ++KillMode=process ++Restart=on-failure ++RestartSec=3s + + [Install] + WantedBy = multi-user.target ++ +diff --git a/src/libs/libxalarm/register_xalarm.c b/src/libs/libxalarm/register_xalarm.c +index 9ff0f7c..f9da783 100644 +--- a/src/libs/libxalarm/register_xalarm.c ++++ b/src/libs/libxalarm/register_xalarm.c +@@ -32,7 +32,7 @@ + #define PATH_REG_ALARM "/var/run/xalarm/alarm" + #define PATH_REPORT_ALARM "/var/run/xalarm/report" + #define ALARM_DIR_PERMISSION 0750 +-#define ALARM_SOCKET_PERMISSION 0700 ++#define ALARM_SOCKET_PERMISSION 0600 + #define TIME_UNIT_MILLISECONDS 1000 + + #define MAX_PARAS_LEN 8191 +diff --git a/src/services/xalarm/xalarm_server.py b/src/services/xalarm/xalarm_server.py +index ade5eb1..3ed0a24 100644 +--- a/src/services/xalarm/xalarm_server.py ++++ b/src/services/xalarm/xalarm_server.py +@@ -18,14 +18,16 @@ import socket + import os + import logging + import select ++import stat + import threading + ++from time import sleep + from .xalarm_api import alarm_bin2stu, alarm_stu2str + from .xalarm_transfer import ( + check_filter, + transmit_alarm, + wait_for_connection, +- peroid_task_to_cleanup_connections ++ cleanup_closed_connections + ) + + +@@ -34,53 +36,223 @@ USER_RECV_SOCK = "/var/run/xalarm/alarm" + SOCK_FILE = "/var/run/xalarm/report" + ALARM_REPORT_LEN = 8216 + ALARM_DIR_PERMISSION = 0o750 ++SOCKET_FILE_PERMISSON = 0o600 ++PERMISION_MASK = 0o777 ++PEROID_CHECK_TIME = 3 + ALARM_LISTEN_QUEUE_LEN = 5 ++PEROID_SCANN_TIME = 60 ++fd_to_socket_lock = threading.Lock() + + +-def clear_sock_path(): ++def check_permission(path, permission): ++ """check whether the permission of path is right ++ """ ++ return (os.stat(path).st_mode & PERMISION_MASK) == permission ++ ++ ++def check_socket_file(path): ++ if not os.path.exists(path): ++ return False ++ ++ file_stat = os.stat(path) ++ # path is not a socket file ++ if not stat.S_ISSOCK(file_stat.st_mode): ++ return False ++ return True ++ ++ ++def clear_sock_file(sock_file): + """unlink unix socket if exist + """ ++ if os.path.exists(sock_file): ++ os.unlink(sock_file) ++ ++ ++def clear_sock_conn(sock_fd, epoll_fd): ++ if sock_fd is None: ++ return ++ if sock_fd.fileno() == -1: ++ return ++ if epoll_fd is not None: ++ epoll_fd.unregister(sock_fd.fileno()) ++ epoll_fd.close() ++ sock_fd.close() ++ ++ ++def create_sock_conn(sock_file, sock_type): ++ sock_fd, epoll_fd = (None, None) ++ try: ++ sock_fd = socket.socket(socket.AF_UNIX, sock_type) ++ sock_fd.bind(sock_file) ++ ++ if sock_type == socket.SOCK_STREAM: ++ sock_fd.listen(ALARM_LISTEN_QUEUE_LEN) ++ sock_fd.setblocking(False) ++ ++ epoll_fd = select.epoll() ++ epoll_fd.register(sock_fd.fileno(), select.EPOLLIN) ++ os.chmod(sock_file, SOCKET_FILE_PERMISSON) ++ logging.info("socket file %s has been created", sock_file) ++ return sock_fd, epoll_fd ++ except socket.error as e: ++ logging.error("failed to bind %s socket, reason is %s", sock_file, str(e)) ++ clear_sock_conn(sock_fd, epoll_fd) ++ ++ return sock_fd, epoll_fd ++ ++ ++def recover_sock_path_and_permission(): ++ # if directory not exists or permission denied, remake + if not os.path.exists(ALARM_DIR): ++ logging.info("xalarmd run dir not exists, create %s", ALARM_DIR) + os.mkdir(ALARM_DIR) ++ if not check_permission(ALARM_DIR, ALARM_DIR_PERMISSION): ++ logging.info("xalarmd run dir %s permission set not properly, recover as default permission", ALARM_DIR) + os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION) +- if os.path.exists(SOCK_FILE): +- os.unlink(SOCK_FILE) +- if os.path.exists(USER_RECV_SOCK): +- os.unlink(USER_RECV_SOCK) ++ if os.path.exists(SOCK_FILE) and not check_permission(SOCK_FILE, SOCKET_FILE_PERMISSON): ++ logging.info("socket file %s permission %s set not properly, recover as default permission", ++ SOCK_FILE, oct(os.stat(SOCK_FILE).st_mode & PERMISION_MASK)) ++ os.chmod(SOCK_FILE, SOCKET_FILE_PERMISSON) ++ if os.path.exists(USER_RECV_SOCK) and not check_permission(USER_RECV_SOCK, SOCKET_FILE_PERMISSON): ++ logging.info("socket file %s permission %s set not properly, recover as default permission", ++ USER_RECV_SOCK, oct(os.stat(USER_RECV_SOCK).st_mode & PERMISION_MASK)) ++ os.chmod(USER_RECV_SOCK, SOCKET_FILE_PERMISSON) ++ ++ ++def peroid_task_to_cleanup_connections(): ++ global alarm_sock ++ global alarm_epoll ++ global fd_to_socket ++ global conn_thread_should_stop ++ global fd_to_socket_lock ++ logging.info("cleanup thread is running") ++ ++ while True: ++ sleep(PEROID_SCANN_TIME) ++ # if conn thread stopped, cleanup thread should not cleanup anymore ++ if conn_thread_should_stop.is_set(): ++ continue ++ cleanup_closed_connections(alarm_sock, alarm_epoll, fd_to_socket, fd_to_socket_lock) ++ ++ ++def watch_socket_file_and_dir(): ++ global conn_thread ++ global alarm_epoll ++ global report_epoll ++ global conn_thread_should_stop ++ global report_sock ++ global alarm_sock ++ global fd_to_socket ++ global fd_to_socket_lock ++ while True: ++ try: ++ recover_sock_path_and_permission() ++ if not check_socket_file(SOCK_FILE): ++ logging.info("socket file %s not found or socket file been replaced, recovering ...", SOCK_FILE) ++ clear_sock_conn(report_sock, report_epoll) ++ clear_sock_file(SOCK_FILE) ++ # if create socket failed, will retry to create because socket file was cleared in last step ++ report_sock, report_epoll = create_sock_conn(SOCK_FILE, socket.SOCK_DGRAM) ++ ++ if not check_socket_file(USER_RECV_SOCK): ++ logging.info("socket file %s not found or socket file been replaced, recovering ...", USER_RECV_SOCK) ++ # set conn_thread_should_stop as True ++ conn_thread_should_stop.set() ++ # Ensure that conn_thread has been stopped before clean and release resources ++ conn_thread.join() ++ ++ # Now only transmit_alarm will use this lock ++ # Ensure fd_to_socket dict resource has been released ++ with fd_to_socket_lock: ++ for stored_sock_fd, stored_sock in fd_to_socket.items(): ++ if stored_sock is None: ++ continue ++ if alarm_sock is not None and (stored_sock.fileno() != alarm_sock.fileno()): ++ stored_sock.close() ++ clear_sock_conn(alarm_sock, alarm_epoll) ++ clear_sock_file(USER_RECV_SOCK) ++ ++ alarm_sock, alarm_epoll = create_sock_conn(USER_RECV_SOCK, socket.SOCK_STREAM) ++ fd_to_socket = {alarm_sock.fileno(): alarm_sock,} ++ ++ # set conn_thread_should_stop as False ++ conn_thread_should_stop.clear() ++ conn_thread = start_wait_for_conn_thread( ++ alarm_sock, ++ alarm_epoll, ++ fd_to_socket, ++ conn_thread_should_stop, ++ fd_to_socket_lock ++ ) ++ except Exception as e: ++ logging.error("Error watch socket file thread: %s", str(e)) ++ ++ sleep(PEROID_CHECK_TIME) ++ ++ ++def start_wait_for_conn_thread(alarm_sock_, alarm_epoll_, ++ fd_to_socket_, conn_thread_should_stop_, fd_to_socket_lock_): ++ conn_thread = threading.Thread( ++ target=wait_for_connection, ++ args=( ++ alarm_sock_, ++ alarm_epoll_, ++ fd_to_socket_, ++ conn_thread_should_stop_, ++ fd_to_socket_lock_) ++ ) ++ conn_thread.daemon = True ++ conn_thread.start() ++ return conn_thread + + + def server_loop(alarm_config): + """alarm daemon process loop + """ + logging.info("server loop waiting for messages") +- clear_sock_path() +- +- sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) +- sock.bind(SOCK_FILE) +- os.chmod(SOCK_FILE, 0o600) +- +- alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +- alarm_sock.bind(USER_RECV_SOCK) +- os.chmod(USER_RECV_SOCK, 0o600) +- alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN) +- alarm_sock.setblocking(False) + +- epoll = select.epoll() +- epoll.register(alarm_sock.fileno(), select.EPOLLIN) ++ clear_sock_file(SOCK_FILE) ++ clear_sock_file(USER_RECV_SOCK) ++ recover_sock_path_and_permission() ++ global report_sock ++ global alarm_sock ++ global alarm_epoll ++ global report_epoll ++ global fd_to_socket ++ global conn_thread_should_stop ++ global conn_thread ++ global fd_to_socket_lock ++ report_sock, report_epoll = create_sock_conn(SOCK_FILE, socket.SOCK_DGRAM) ++ alarm_sock, alarm_epoll = create_sock_conn(USER_RECV_SOCK, socket.SOCK_STREAM) + fd_to_socket = {alarm_sock.fileno(): alarm_sock,} +- thread_should_stop = False + +- conn_thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) +- conn_thread.daemon = True +- conn_thread.start() ++ conn_thread_should_stop = threading.Event() ++ conn_thread = start_wait_for_conn_thread( ++ alarm_sock, ++ alarm_epoll, ++ fd_to_socket, ++ conn_thread_should_stop, ++ fd_to_socket_lock ++ ) + +- cleanup_thread = threading.Thread(target=peroid_task_to_cleanup_connections, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) ++ cleanup_thread = threading.Thread(target=peroid_task_to_cleanup_connections) + cleanup_thread.daemon = True + cleanup_thread.start() + ++ watch_thread = threading.Thread(target=watch_socket_file_and_dir) ++ watch_thread.daemon = True ++ watch_thread.start() ++ + while True: + try: +- data, _ = sock.recvfrom(ALARM_REPORT_LEN) ++ # set timeout as 1 seconds to avoid main process blocked by recvfrom ++ # which will cause socket cannot be rebuild ++ events = report_epoll.poll(1.0) ++ data = None ++ for fileno, event in events: ++ if fileno == report_sock.fileno(): ++ data, _ = report_sock.recvfrom(ALARM_REPORT_LEN) ++ + if not data: + continue + if len(data) != ALARM_REPORT_LEN: +@@ -92,19 +264,29 @@ def server_loop(alarm_config): + logging.info("server recieve report msg, %s", alarm_str) + if not check_filter(alarm_info, alarm_config): + continue +- transmit_alarm(alarm_sock, epoll, fd_to_socket, data, alarm_str) ++ transmit_alarm( ++ alarm_sock, ++ alarm_epoll, ++ fd_to_socket, ++ data, ++ alarm_str, ++ fd_to_socket_lock ++ ) + except Exception as e: + logging.error(f"Error server:{e}") + +- thread_should_stop = True ++ conn_thread_should_stop.set() + conn_thread.join() + cleanup_thread.join() ++ watch_thread.join() + +- epoll.unregister(alarm_sock.fileno()) +- epoll.close() ++ alarm_epoll.unregister(alarm_sock.fileno()) ++ alarm_epoll.close() + alarm_sock.close() + os.unlink(USER_RECV_SOCK) + +- sock.close() ++ report_sock.close() ++ ++ + + +diff --git a/src/services/xalarm/xalarm_transfer.py b/src/services/xalarm/xalarm_transfer.py +index ccf16f9..d52a61f 100644 +--- a/src/services/xalarm/xalarm_transfer.py ++++ b/src/services/xalarm/xalarm_transfer.py +@@ -17,14 +17,14 @@ Create: 2023-11-02 + import socket + import logging + import threading ++import errno + from time import sleep + + MIN_ID_NUMBER = 1001 + MAX_ID_NUMBER = 1128 + MAX_CONNECTION_NUM = 100 + TEST_CONNECT_BUFFER_SIZE = 32 +-PEROID_SCANN_TIME = 60 +-LOCK = threading.Lock() ++MAX_RETRY_TIMES = 3 + + + def check_filter(alarm_info, alarm_filter): +@@ -40,7 +40,7 @@ def check_filter(alarm_info, alarm_filter): + return True + + +-def cleanup_closed_connections(server_sock, epoll, fd_to_socket): ++def cleanup_closed_connections(server_sock, epoll, fd_to_socket, fd_to_socket_lock): + """ + clean invalid client socket connections saved in 'fd_to_socket' + :param server_sock: server socket instance of alarm +@@ -48,7 +48,7 @@ def cleanup_closed_connections(server_sock, epoll, fd_to_socket): + :param fd_to_socket: dict instance, used to hold client connections and server connections + """ + to_remove = [] +- with LOCK: ++ with fd_to_socket_lock: + for fileno, connection in fd_to_socket.items(): + if connection is server_sock: + continue +@@ -69,46 +69,40 @@ def cleanup_closed_connections(server_sock, epoll, fd_to_socket): + logging.info(f"cleaned up connection {fileno} for client lost connection.") + + +-def peroid_task_to_cleanup_connections(server_sock, epoll, fd_to_socket, thread_should_stop): +- while not thread_should_stop: +- sleep(PEROID_SCANN_TIME) +- cleanup_closed_connections(server_sock, epoll, fd_to_socket) +- +- +-def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop): ++def wait_for_connection(server_sock, epoll, fd_to_socket, conn_thread_should_stop, fd_to_socket_lock): + """ + thread function for catch and save client connection + :param server_sock: server socket instance of alarm + :param epoll: epoll instance, used to unregister invalid client connections + :param fd_to_socket: dict instance, used to hold client connections and server connections +- :param thread_should_stop: bool instance ++ :param conn_thread_should_stop: bool instance + """ +- while not thread_should_stop: ++ logging.info("wait for connection thread is running") ++ while not conn_thread_should_stop.is_set(): + try: + events = epoll.poll(1) +- ++ + for fileno, event in events: + if fileno == server_sock.fileno(): + connection, client_address = server_sock.accept() + # if reach max connection, cleanup closed connections + if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: +- cleanup_closed_connections(server_sock, epoll, fd_to_socket) ++ cleanup_closed_connections(server_sock, epoll, fd_to_socket, fd_to_socket_lock) + # if connections still reach max num, close this connection automatically + if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: + logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!") + connection.close() + continue +- with LOCK: ++ with fd_to_socket_lock: + fd_to_socket[connection.fileno()] = connection + logging.info("connection fd %d registered event.", connection.fileno()) + except socket.error as e: + logging.debug(f"socket error, reason is {e}") +- break + except (KeyError, OSError, ValueError) as e: + logging.debug(f"wait for connection failed {e}") + + +-def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): ++def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str, fd_to_socket_lock): + """ + this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket + :param server_sock: server socket instance of alarm +@@ -117,8 +111,9 @@ def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): + :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py + """ + to_remove = [] ++ to_retry = [] + +- with LOCK: ++ with fd_to_socket_lock: + for fileno, connection in fd_to_socket.items(): + if connection is not server_sock: + try: +@@ -127,13 +122,29 @@ def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): + fileno, alarm_str) + except (BrokenPipeError, ConnectionResetError): + to_remove.append(fileno) ++ except socket.error as e: ++ if e.errno == errno.EAGAIN: ++ to_retry.append(connection) ++ else: ++ logging.info("Sending msg failed, fd is %d, alarm msg is %s, reason is: %s", ++ fileno, alarm_str, str(e)) + except Exception as e: + logging.info("Sending msg failed, fd is %d, alarm msg is %s, reason is: %s", + fileno, alarm_str, str(e)) + ++ for connection in to_retry: ++ for i in range(MAX_RETRY_TIMES): ++ try: ++ connection.sendall(bin_data) ++ break ++ except Exception as e: ++ sleep(0.1) ++ logging.info("Sending msg failed for %d times, fd is %d, alarm msg is %s, reason is: %s", ++ i, connection.fileno(), alarm_str, str(e)) + + for fileno in to_remove: + fd_to_socket[fileno].close() + del fd_to_socket[fileno] + logging.info(f"cleaned up connection {fileno} for client lost connection.") + ++ +-- +2.27.0 + diff --git a/sysSentry.spec b/sysSentry.spec index ba9addc..4c4c494 100644 --- a/sysSentry.spec +++ b/sysSentry.spec @@ -4,7 +4,7 @@ Summary: System Inspection Framework Name: sysSentry Version: 1.0.3 -Release: 7 +Release: 8 License: Mulan PSL v2 Group: System Environment/Daemons Source0: https://gitee.com/openeuler/sysSentry/releases/download/v%{version}/%{name}-%{version}.tar.gz @@ -15,6 +15,7 @@ Patch3: add-log-for-xalarmd-and-fix-delete-on-iter-problem.patch Patch4: fix-xalarm-log-not-print-and-add-on-iter-problem.patch Patch5: add-new-func-for-ebpf-in-the-rq_driver-stage.patch Patch6: fix-the-sentryCollector-service-can-t-be-stopped-for.patch +Patch7: add-dfx-for-xalarmd-to-rebuild-connection-after-comm.patch BuildRequires: cmake gcc-c++ BuildRequires: python3 python3-setuptools @@ -211,6 +212,12 @@ rm -rf /var/run/sysSentry | : %attr(0550,root,root) %{python3_sitelib}/syssentry/bmc_alarm.py %changelog +* Fri Feb 28 2025 caixiaomeng - 1.0.3-8 +- Type:bugfix +- CVE:NA +- SUG:NA +- DESC: add dfx for xalarmd to rebuild connection after communication disconnection + * Mon Feb 24 2025 zhuofeng - 1.0.3-7 - Type:bugfix - CVE:NA -- Gitee