From b0bded3b728fd9118658a2a2d21e18b2e50c3355 Mon Sep 17 00:00:00 2001 From: PshySimon Date: Mon, 24 Feb 2025 19:11:26 +0800 Subject: [PATCH] add dfx for xalarmd to rebuild connection after communication disconnection --- config/service/xalarmd.service | 7 +- src/libs/libxalarm/register_xalarm.c | 2 +- src/services/xalarm/xalarm_server.py | 242 ++++++++++++++++++++++--- src/services/xalarm/xalarm_transfer.py | 49 +++-- 4 files changed, 249 insertions(+), 51 deletions(-) diff --git a/config/service/xalarmd.service b/config/service/xalarmd.service index 0665b39..20db997 100644 --- a/config/service/xalarmd.service +++ b/config/service/xalarmd.service @@ -2,8 +2,13 @@ Description = xalarm daemon for alarm messages forwarding [Service] -ExecStart =/usr/bin/python3 /usr/bin/xalarmd Type = forking +ExecStart=/usr/bin/python3 /usr/bin/xalarmd +ExecStop=/bin/kill +KillMode=process +Restart=on-failure +RestartSec=3s [Install] WantedBy = multi-user.target + diff --git a/src/libs/libxalarm/register_xalarm.c b/src/libs/libxalarm/register_xalarm.c index 9ff0f7c..f9da783 100644 --- a/src/libs/libxalarm/register_xalarm.c +++ b/src/libs/libxalarm/register_xalarm.c @@ -32,7 +32,7 @@ #define PATH_REG_ALARM "/var/run/xalarm/alarm" #define PATH_REPORT_ALARM "/var/run/xalarm/report" #define ALARM_DIR_PERMISSION 0750 -#define ALARM_SOCKET_PERMISSION 0700 +#define ALARM_SOCKET_PERMISSION 0600 #define TIME_UNIT_MILLISECONDS 1000 #define MAX_PARAS_LEN 8191 diff --git a/src/services/xalarm/xalarm_server.py b/src/services/xalarm/xalarm_server.py index ade5eb1..3ed0a24 100644 --- a/src/services/xalarm/xalarm_server.py +++ b/src/services/xalarm/xalarm_server.py @@ -18,14 +18,16 @@ import socket import os import logging import select +import stat import threading +from time import sleep from .xalarm_api import alarm_bin2stu, alarm_stu2str from .xalarm_transfer import ( check_filter, transmit_alarm, wait_for_connection, - peroid_task_to_cleanup_connections + cleanup_closed_connections ) @@ -34,53 +36,223 @@ USER_RECV_SOCK = "/var/run/xalarm/alarm" SOCK_FILE = "/var/run/xalarm/report" ALARM_REPORT_LEN = 8216 ALARM_DIR_PERMISSION = 0o750 +SOCKET_FILE_PERMISSON = 0o600 +PERMISION_MASK = 0o777 +PEROID_CHECK_TIME = 3 ALARM_LISTEN_QUEUE_LEN = 5 +PEROID_SCANN_TIME = 60 +fd_to_socket_lock = threading.Lock() -def clear_sock_path(): +def check_permission(path, permission): + """check whether the permission of path is right + """ + return (os.stat(path).st_mode & PERMISION_MASK) == permission + + +def check_socket_file(path): + if not os.path.exists(path): + return False + + file_stat = os.stat(path) + # path is not a socket file + if not stat.S_ISSOCK(file_stat.st_mode): + return False + return True + + +def clear_sock_file(sock_file): """unlink unix socket if exist """ + if os.path.exists(sock_file): + os.unlink(sock_file) + + +def clear_sock_conn(sock_fd, epoll_fd): + if sock_fd is None: + return + if sock_fd.fileno() == -1: + return + if epoll_fd is not None: + epoll_fd.unregister(sock_fd.fileno()) + epoll_fd.close() + sock_fd.close() + + +def create_sock_conn(sock_file, sock_type): + sock_fd, epoll_fd = (None, None) + try: + sock_fd = socket.socket(socket.AF_UNIX, sock_type) + sock_fd.bind(sock_file) + + if sock_type == socket.SOCK_STREAM: + sock_fd.listen(ALARM_LISTEN_QUEUE_LEN) + sock_fd.setblocking(False) + + epoll_fd = select.epoll() + epoll_fd.register(sock_fd.fileno(), select.EPOLLIN) + os.chmod(sock_file, SOCKET_FILE_PERMISSON) + logging.info("socket file %s has been created", sock_file) + return sock_fd, epoll_fd + except socket.error as e: + logging.error("failed to bind %s socket, reason is %s", sock_file, str(e)) + clear_sock_conn(sock_fd, epoll_fd) + + return sock_fd, epoll_fd + + +def recover_sock_path_and_permission(): + # if directory not exists or permission denied, remake if not os.path.exists(ALARM_DIR): + logging.info("xalarmd run dir not exists, create %s", ALARM_DIR) os.mkdir(ALARM_DIR) + if not check_permission(ALARM_DIR, ALARM_DIR_PERMISSION): + logging.info("xalarmd run dir %s permission set not properly, recover as default permission", ALARM_DIR) os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION) - if os.path.exists(SOCK_FILE): - os.unlink(SOCK_FILE) - if os.path.exists(USER_RECV_SOCK): - os.unlink(USER_RECV_SOCK) + if os.path.exists(SOCK_FILE) and not check_permission(SOCK_FILE, SOCKET_FILE_PERMISSON): + logging.info("socket file %s permission %s set not properly, recover as default permission", + SOCK_FILE, oct(os.stat(SOCK_FILE).st_mode & PERMISION_MASK)) + os.chmod(SOCK_FILE, SOCKET_FILE_PERMISSON) + if os.path.exists(USER_RECV_SOCK) and not check_permission(USER_RECV_SOCK, SOCKET_FILE_PERMISSON): + logging.info("socket file %s permission %s set not properly, recover as default permission", + USER_RECV_SOCK, oct(os.stat(USER_RECV_SOCK).st_mode & PERMISION_MASK)) + os.chmod(USER_RECV_SOCK, SOCKET_FILE_PERMISSON) + + +def peroid_task_to_cleanup_connections(): + global alarm_sock + global alarm_epoll + global fd_to_socket + global conn_thread_should_stop + global fd_to_socket_lock + logging.info("cleanup thread is running") + + while True: + sleep(PEROID_SCANN_TIME) + # if conn thread stopped, cleanup thread should not cleanup anymore + if conn_thread_should_stop.is_set(): + continue + cleanup_closed_connections(alarm_sock, alarm_epoll, fd_to_socket, fd_to_socket_lock) + + +def watch_socket_file_and_dir(): + global conn_thread + global alarm_epoll + global report_epoll + global conn_thread_should_stop + global report_sock + global alarm_sock + global fd_to_socket + global fd_to_socket_lock + while True: + try: + recover_sock_path_and_permission() + if not check_socket_file(SOCK_FILE): + logging.info("socket file %s not found or socket file been replaced, recovering ...", SOCK_FILE) + clear_sock_conn(report_sock, report_epoll) + clear_sock_file(SOCK_FILE) + # if create socket failed, will retry to create because socket file was cleared in last step + report_sock, report_epoll = create_sock_conn(SOCK_FILE, socket.SOCK_DGRAM) + + if not check_socket_file(USER_RECV_SOCK): + logging.info("socket file %s not found or socket file been replaced, recovering ...", USER_RECV_SOCK) + # set conn_thread_should_stop as True + conn_thread_should_stop.set() + # Ensure that conn_thread has been stopped before clean and release resources + conn_thread.join() + + # Now only transmit_alarm will use this lock + # Ensure fd_to_socket dict resource has been released + with fd_to_socket_lock: + for stored_sock_fd, stored_sock in fd_to_socket.items(): + if stored_sock is None: + continue + if alarm_sock is not None and (stored_sock.fileno() != alarm_sock.fileno()): + stored_sock.close() + clear_sock_conn(alarm_sock, alarm_epoll) + clear_sock_file(USER_RECV_SOCK) + + alarm_sock, alarm_epoll = create_sock_conn(USER_RECV_SOCK, socket.SOCK_STREAM) + fd_to_socket = {alarm_sock.fileno(): alarm_sock,} + + # set conn_thread_should_stop as False + conn_thread_should_stop.clear() + conn_thread = start_wait_for_conn_thread( + alarm_sock, + alarm_epoll, + fd_to_socket, + conn_thread_should_stop, + fd_to_socket_lock + ) + except Exception as e: + logging.error("Error watch socket file thread: %s", str(e)) + + sleep(PEROID_CHECK_TIME) + + +def start_wait_for_conn_thread(alarm_sock_, alarm_epoll_, + fd_to_socket_, conn_thread_should_stop_, fd_to_socket_lock_): + conn_thread = threading.Thread( + target=wait_for_connection, + args=( + alarm_sock_, + alarm_epoll_, + fd_to_socket_, + conn_thread_should_stop_, + fd_to_socket_lock_) + ) + conn_thread.daemon = True + conn_thread.start() + return conn_thread def server_loop(alarm_config): """alarm daemon process loop """ logging.info("server loop waiting for messages") - clear_sock_path() - - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.bind(SOCK_FILE) - os.chmod(SOCK_FILE, 0o600) - - alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - alarm_sock.bind(USER_RECV_SOCK) - os.chmod(USER_RECV_SOCK, 0o600) - alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN) - alarm_sock.setblocking(False) - epoll = select.epoll() - epoll.register(alarm_sock.fileno(), select.EPOLLIN) + clear_sock_file(SOCK_FILE) + clear_sock_file(USER_RECV_SOCK) + recover_sock_path_and_permission() + global report_sock + global alarm_sock + global alarm_epoll + global report_epoll + global fd_to_socket + global conn_thread_should_stop + global conn_thread + global fd_to_socket_lock + report_sock, report_epoll = create_sock_conn(SOCK_FILE, socket.SOCK_DGRAM) + alarm_sock, alarm_epoll = create_sock_conn(USER_RECV_SOCK, socket.SOCK_STREAM) fd_to_socket = {alarm_sock.fileno(): alarm_sock,} - thread_should_stop = False - conn_thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) - conn_thread.daemon = True - conn_thread.start() + conn_thread_should_stop = threading.Event() + conn_thread = start_wait_for_conn_thread( + alarm_sock, + alarm_epoll, + fd_to_socket, + conn_thread_should_stop, + fd_to_socket_lock + ) - cleanup_thread = threading.Thread(target=peroid_task_to_cleanup_connections, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) + cleanup_thread = threading.Thread(target=peroid_task_to_cleanup_connections) cleanup_thread.daemon = True cleanup_thread.start() + watch_thread = threading.Thread(target=watch_socket_file_and_dir) + watch_thread.daemon = True + watch_thread.start() + while True: try: - data, _ = sock.recvfrom(ALARM_REPORT_LEN) + # set timeout as 1 seconds to avoid main process blocked by recvfrom + # which will cause socket cannot be rebuild + events = report_epoll.poll(1.0) + data = None + for fileno, event in events: + if fileno == report_sock.fileno(): + data, _ = report_sock.recvfrom(ALARM_REPORT_LEN) + if not data: continue if len(data) != ALARM_REPORT_LEN: @@ -92,19 +264,29 @@ def server_loop(alarm_config): logging.info("server recieve report msg, %s", alarm_str) if not check_filter(alarm_info, alarm_config): continue - transmit_alarm(alarm_sock, epoll, fd_to_socket, data, alarm_str) + transmit_alarm( + alarm_sock, + alarm_epoll, + fd_to_socket, + data, + alarm_str, + fd_to_socket_lock + ) except Exception as e: logging.error(f"Error server:{e}") - thread_should_stop = True + conn_thread_should_stop.set() conn_thread.join() cleanup_thread.join() + watch_thread.join() - epoll.unregister(alarm_sock.fileno()) - epoll.close() + alarm_epoll.unregister(alarm_sock.fileno()) + alarm_epoll.close() alarm_sock.close() os.unlink(USER_RECV_SOCK) - sock.close() + report_sock.close() + + diff --git a/src/services/xalarm/xalarm_transfer.py b/src/services/xalarm/xalarm_transfer.py index ccf16f9..d52a61f 100644 --- a/src/services/xalarm/xalarm_transfer.py +++ b/src/services/xalarm/xalarm_transfer.py @@ -17,14 +17,14 @@ Create: 2023-11-02 import socket import logging import threading +import errno from time import sleep MIN_ID_NUMBER = 1001 MAX_ID_NUMBER = 1128 MAX_CONNECTION_NUM = 100 TEST_CONNECT_BUFFER_SIZE = 32 -PEROID_SCANN_TIME = 60 -LOCK = threading.Lock() +MAX_RETRY_TIMES = 3 def check_filter(alarm_info, alarm_filter): @@ -40,7 +40,7 @@ def check_filter(alarm_info, alarm_filter): return True -def cleanup_closed_connections(server_sock, epoll, fd_to_socket): +def cleanup_closed_connections(server_sock, epoll, fd_to_socket, fd_to_socket_lock): """ clean invalid client socket connections saved in 'fd_to_socket' :param server_sock: server socket instance of alarm @@ -48,7 +48,7 @@ def cleanup_closed_connections(server_sock, epoll, fd_to_socket): :param fd_to_socket: dict instance, used to hold client connections and server connections """ to_remove = [] - with LOCK: + with fd_to_socket_lock: for fileno, connection in fd_to_socket.items(): if connection is server_sock: continue @@ -69,46 +69,40 @@ def cleanup_closed_connections(server_sock, epoll, fd_to_socket): logging.info(f"cleaned up connection {fileno} for client lost connection.") -def peroid_task_to_cleanup_connections(server_sock, epoll, fd_to_socket, thread_should_stop): - while not thread_should_stop: - sleep(PEROID_SCANN_TIME) - cleanup_closed_connections(server_sock, epoll, fd_to_socket) - - -def wait_for_connection(server_sock, epoll, fd_to_socket, thread_should_stop): +def wait_for_connection(server_sock, epoll, fd_to_socket, conn_thread_should_stop, fd_to_socket_lock): """ thread function for catch and save client connection :param server_sock: server socket instance of alarm :param epoll: epoll instance, used to unregister invalid client connections :param fd_to_socket: dict instance, used to hold client connections and server connections - :param thread_should_stop: bool instance + :param conn_thread_should_stop: bool instance """ - while not thread_should_stop: + logging.info("wait for connection thread is running") + while not conn_thread_should_stop.is_set(): try: events = epoll.poll(1) - + for fileno, event in events: if fileno == server_sock.fileno(): connection, client_address = server_sock.accept() # if reach max connection, cleanup closed connections if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: - cleanup_closed_connections(server_sock, epoll, fd_to_socket) + cleanup_closed_connections(server_sock, epoll, fd_to_socket, fd_to_socket_lock) # if connections still reach max num, close this connection automatically if len(fd_to_socket) - 1 >= MAX_CONNECTION_NUM: logging.info(f"connection reach max num of {MAX_CONNECTION_NUM}, closed current connection!") connection.close() continue - with LOCK: + with fd_to_socket_lock: fd_to_socket[connection.fileno()] = connection logging.info("connection fd %d registered event.", connection.fileno()) except socket.error as e: logging.debug(f"socket error, reason is {e}") - break except (KeyError, OSError, ValueError) as e: logging.debug(f"wait for connection failed {e}") -def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): +def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str, fd_to_socket_lock): """ this function is to broadcast alarm data to client, if fail to send data, remove connections held by fd_to_socket :param server_sock: server socket instance of alarm @@ -117,8 +111,9 @@ def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): :param bin_data: binary instance, alarm info data in C-style struct format defined in xalarm_api.py """ to_remove = [] + to_retry = [] - with LOCK: + with fd_to_socket_lock: for fileno, connection in fd_to_socket.items(): if connection is not server_sock: try: @@ -127,13 +122,29 @@ def transmit_alarm(server_sock, epoll, fd_to_socket, bin_data, alarm_str): fileno, alarm_str) except (BrokenPipeError, ConnectionResetError): to_remove.append(fileno) + except socket.error as e: + if e.errno == errno.EAGAIN: + to_retry.append(connection) + else: + logging.info("Sending msg failed, fd is %d, alarm msg is %s, reason is: %s", + fileno, alarm_str, str(e)) except Exception as e: logging.info("Sending msg failed, fd is %d, alarm msg is %s, reason is: %s", fileno, alarm_str, str(e)) + for connection in to_retry: + for i in range(MAX_RETRY_TIMES): + try: + connection.sendall(bin_data) + break + except Exception as e: + sleep(0.1) + logging.info("Sending msg failed for %d times, fd is %d, alarm msg is %s, reason is: %s", + i, connection.fileno(), alarm_str, str(e)) for fileno in to_remove: fd_to_socket[fileno].close() del fd_to_socket[fileno] logging.info(f"cleaned up connection {fileno} for client lost connection.") + -- Gitee