From 37a94a5d52ff9c38ac85af6533c9e881b0313ece Mon Sep 17 00:00:00 2001 From: liqiang Date: Thu, 28 Dec 2023 14:35:42 +0800 Subject: [PATCH] fix uds block on connect and randomization proxy sun path Signed-off-by: liqiang --- qtfs/ipc/uds_event.c | 52 ++++++++++++++++++++++++++++++++++------- qtfs/ipc/uds_main.c | 49 ++++++++++++++++++++++++++++++++++---- qtfs/ipc/uds_main.h | 19 +++++++++++++-- qtfs/ipc/uds_module.h | 4 +++- qtfs/qtfs_common/conn.c | 12 +++++----- qtfs/qtinfo/qtinfo.c | 2 +- 6 files changed, 116 insertions(+), 22 deletions(-) diff --git a/qtfs/ipc/uds_event.c b/qtfs/ipc/uds_event.c index 78a2a97..9d3b016 100644 --- a/qtfs/ipc/uds_event.c +++ b/qtfs/ipc/uds_event.c @@ -73,6 +73,7 @@ int uds_event_module_init(struct uds_event_global_var *p) uds_err("malloc buf failed."); goto free4; } + memset(&p->diag, 0, sizeof(p->diag)); return EVENT_OK; free4: @@ -280,6 +281,7 @@ int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_ev struct uds_event *evt = (struct uds_event *)arg; struct uds_proxy_remote_conn_rsp msg; int len; + int ret; memset(&msg, 0, sizeof(struct uds_proxy_remote_conn_rsp)); len = read(evt->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp)); if (len <= 0) { @@ -305,6 +307,12 @@ int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_ev goto event_del; } strcat(uds.sun_path, UDS_PROXY_SUFFIX); + if ((ret = mkstemp(uds.sun_path)) < 0) { + uds_err("failed to create local proxy uds path, ret:%d errno:%d", ret, errno); + goto event_del; + } + // ret is temp fd, just close + close(ret); if (uds_build_unix_connection(&uds) < 0) { uds_err("failed to build uds server sunpath:%s", uds.sun_path); goto event_del; @@ -326,8 +334,10 @@ int uds_event_build_step3(void *arg, int epfd, struct uds_event_global_var *p_ev uds_err("add time out hash failed fd:%d %d", evt->fd, newevt->fd); } uds_log("Add hash key:%d-->value and key:%d-->value", evt->fd, newevt->fd); - + memcpy(evt->proxy_path, uds.sun_path, UDS_SUN_PATH_LEN); msg.ret = 1; + memset(msg.sun_path_suffix, 0, strlen(UDS_PROXY_SUFFIX) + 1); + memcpy(msg.sun_path_suffix, &uds.sun_path[strlen(udsmsg->sun_path)], strlen(UDS_PROXY_SUFFIX)); // return value of write can be ignored len = write(evt->peer->fd, &msg, sizeof(struct uds_proxy_remote_conn_rsp)); free(evt->priv); @@ -366,6 +376,10 @@ int uds_event_build_step4(void *arg, int epfd, struct uds_event_global_var *p_ev uds_event_add_to_free(p_event_var, peerevt); return EVENT_DEL; } + // 将sun path转移到新事件中,本建联事件即将删除 + // 新事件为业务消息转发事件,生命周期结束时负责unlink代理sun path文件 + memcpy(peerevt->peer->proxy_path, evt->proxy_path, UDS_SUN_PATH_LEN); + memset(evt->proxy_path, 0, UDS_SUN_PATH_LEN); uds_log("accept new connection fd:%d, peerfd:%d frontfd:%d peerfd:%d, peerevt(fd:%d) active now", connfd, evt->peer->fd, peerevt->fd, peerevt->peer->fd, peerevt->fd); @@ -803,12 +817,23 @@ int uds_event_pipe2tcp(void *arg, int epfd, struct uds_event_global_var *p_event } -int uds_msg_tcp_end_msg(int sock) +static void uds_diag_add_cnt(struct uds_diag_info *p, unsigned long type) +{ + if (type >= DIAG_MAX) { + uds_err("diag count add type invalid:%lu", type); + return; + } + p->diag_cnt[type]++; + return; +} + +int uds_msg_tcp_end_msg(int sock, struct uds_event_global_var *p) { struct uds_tcp2tcp end = {.msgtype = MSG_END, .msglen = 0,}; int ret = write(sock, &end, sizeof(struct uds_tcp2tcp)); if (ret <= 0) { uds_err("write end msg failed, ret:%d fd:%d", ret, sock); + uds_diag_add_cnt(&p->diag, DIAG_U2T_ENDMSG_DEL); return EVENT_DEL; } return EVENT_OK; @@ -850,8 +875,9 @@ int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_ len = recvmsg(evt->fd, &msg, 0); if (len == 0) { - uds_err("recvmsg error, return:%d", len); + uds_err("recvmsg error, return:%d events:%lx", len, p_event_var->events); uds_event_add_to_free(p_event_var, evt); + uds_diag_add_cnt(&p_event_var->diag, DIAG_U2T_RCV_DEL); return EVENT_DEL; } if (len < 0) { @@ -879,7 +905,7 @@ int uds_event_uds2tcp(void *arg, int epfd, struct uds_event_global_var *p_event_ uds_log("write iov msg to tcp success, msgtype:%d ret:%d iovlen:%d recvlen:%d udsheadlen:%d msglen:%d msg:\n>>>>>>>\n%.*s\n<<<<<<<\n", p_msg->msgtype, ret, iov.iov_len, len, sizeof(struct uds_tcp2tcp), p_msg->msglen, p_msg->msglen, p_msg->data); endmsg: - return uds_msg_tcp_end_msg(evt->peer->fd); + return uds_msg_tcp_end_msg(evt->peer->fd, p_event_var); } static inline void uds_close_fds(int *fds, int fdnum) @@ -917,7 +943,9 @@ int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_ while (1) { int len = uds_recv_with_timeout(evt->fd, (char *)p_msg, sizeof(struct uds_tcp2tcp)); if (len <= 0) { - uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d.", len); + uds_err("recv no msg maybe sock is closed, delete this tcp2uds event, len:%d, events:%lx.", len, + p_event_var->events); + uds_diag_add_cnt(&p_event_var->diag, DIAG_T2U_RCVHEAD_DEL); goto close_event; } uds_log(" type:%d len:%d len:%d", p_msg->msgtype, p_msg->msglen, len); @@ -937,6 +965,7 @@ int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_ normal_msg_len = uds_recv_with_timeout(evt->fd, p_event_var->iov_base_send, p_msg->msglen); if (normal_msg_len <= 0) { uds_err("recv msg error:%d fd:%d", len, evt->fd); + uds_diag_add_cnt(&p_event_var->diag, DIAG_T2U_NORM_DEL); goto close_event; } iov.iov_len = normal_msg_len; @@ -955,7 +984,7 @@ int uds_event_tcp2uds(void *arg, int epfd, struct uds_event_global_var *p_event_ len = uds_recv_with_timeout(evt->fd, (char *)p_msg->data, p_msg->msglen); if (len <= 0) { uds_err("recv data failed len:%d", p_msg->msglen); - return EVENT_DEL; + goto close_event; } if (fdnum >= MAX_FDS) { uds_err("Too many fds scm."); @@ -1016,7 +1045,7 @@ int uds_diag_is_epoll_fd(int fd) return 0; } -int uds_diag_string(char *buf, int len) +int uds_diag_string(char *buf, int len, struct uds_diag_info *p) { int pos = 0; memset(buf, 0, len); @@ -1026,6 +1055,13 @@ int uds_diag_string(char *buf, int len) for (int i = 0; i < p_uds_var->work_thread_num; i++) { pos += snprintf(&buf[pos], len - pos, "+ Thread %d events count:%d\n", i+1, p_uds_var->work_thread[i].info.events); } + pos += snprintf(&buf[pos], len - pos, "+ U2T RECV DEL: %lu U2T END MSG DEL: %lu \n" + "+ T2U RECVHEAD DEL: %lu T2U NORM DEL: %lu\n" + "+ HUP NOT DEL: %lu DEL NOT HUP: %lu\n", + p->diag_cnt[DIAG_U2T_RCV_DEL], p->diag_cnt[DIAG_U2T_ENDMSG_DEL], + p->diag_cnt[DIAG_T2U_RCVHEAD_DEL], p->diag_cnt[DIAG_T2U_NORM_DEL], + p->diag_cnt[DIAG_HUP_NOT_DEL], p->diag_cnt[DIAG_DEL_NOT_HUP]); + pos += snprintf(&buf[pos], len - pos, "+ Log level:%s\n", p_uds_var->logstr[p_uds_var->loglevel]); pos += snprintf(&buf[pos], len - pos, "+---------------------------------------------------------------------------------------+\n"); return strlen(buf); @@ -1049,7 +1085,7 @@ int uds_event_diag_info(void *arg, int epfd, struct uds_event_global_var *p_even } uds_log("diag accept an new connection to send diag info, fd:%d", connfd); - len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len); + len = uds_diag_string(p_event_var->iov_base, p_event_var->iov_len, &p_event_var->diag); ret = send(connfd, p_event_var->iov_base, len, 0); if (ret <= 0) { uds_err("send diag info error, ret:%d len:%d", ret, len); diff --git a/qtfs/ipc/uds_main.c b/qtfs/ipc/uds_main.c index 2775087..65468b8 100644 --- a/qtfs/ipc/uds_main.c +++ b/qtfs/ipc/uds_main.c @@ -38,6 +38,7 @@ #include #include #include +#include #include "comm.h" #include "uds_main.h" @@ -91,7 +92,7 @@ int uds_event_delete(int efd, int fd) int uds_recv_with_timeout(int fd, char *msg, int len) { #define TMOUT_BLOCK_SIZE 1024 -#define TMOUT_UNIT_MS 20 +#define TMOUT_UNIT_MS 100 #define TMOUT_INTERVAL 1 #define TMOUT_MAX_MS 1000 int total_recv = 0; @@ -149,10 +150,22 @@ void uds_event_timeout_proc() g_hash_table_foreach_remove(event_tmout_hash, uds_event_tmout_item, NULL); } +static unsigned long uds_gettime_us(void) +{ + struct timespec ts; + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) { + uds_err("get time of us failed, errno:%d", errno); + return (unsigned long)-1; + } + return (unsigned long)(ts.tv_sec * 1000000 + ts.tv_nsec / 1000); +} + void uds_main_loop(int efd, struct uds_thread_arg *arg) { int n = 0; int ret; + unsigned long us_old; + unsigned long us_cur; struct uds_event *udsevt; struct epoll_event *evts = NULL; struct uds_event_global_var *p_event_var = arg->p_event_var; @@ -171,17 +184,23 @@ void uds_main_loop(int efd, struct uds_thread_arg *arg) free(evts); return; } + us_old = uds_gettime_us(); #ifdef QTFS_SERVER extern int engine_run; while (engine_run) { #else while (1) { #endif - n = epoll_wait(efd, evts, UDS_EPOLL_MAX_EVENTS, 1000); - if (n == 0) { + n = epoll_wait(efd, evts, UDS_EPOLL_MAX_EVENTS, 100); + us_cur = uds_gettime_us(); + // 每1秒超时一次,需要在较短的时间不断采样处理 + if (us_cur < us_old || us_cur - us_old > 1000000) { uds_event_timeout_proc(); - continue; + us_old = us_cur; } + // 正常超时,无事件可处理,不要往下走 + if (n == 0) + continue; if (n < 0) { uds_err("epoll wait return errcode:%d", n); continue; @@ -199,11 +218,16 @@ void uds_main_loop(int efd, struct uds_thread_arg *arg) if (uds_event_pre_handler(udsevt) == EVENT_ERR) { continue; } + p_event_var->events = evts[i].events; ret = udsevt->handler(udsevt, efd, p_event_var); // 此处释放当前事件,peer事件需要handler里面释放 if (ret == EVENT_DEL) { uds_del_event(udsevt); } + if ((evts[i].events & EPOLLHUP) != 0 && ret != EVENT_DEL) + p_event_var->diag.diag_cnt[DIAG_HUP_NOT_DEL] ++; + if ((evts[i].events & EPOLLHUP) == 0 && ret == EVENT_DEL) + p_event_var->diag.diag_cnt[DIAG_DEL_NOT_HUP] ++; } uds_event_post_hook(p_event_var); } @@ -213,6 +237,20 @@ void uds_main_loop(int efd, struct uds_thread_arg *arg) return; } +int uds_set_nonblock(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + uds_err("client: get fflag failed, flags:0x%x", flags); + return -1; + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + uds_err("client: set fflag failed:0x%x", flags); + return -1; + } + return 0; +} + #define UDS_MAX_LISTEN_NUM 64 int uds_build_tcp_connection(struct uds_conn_arg *arg) { @@ -316,6 +354,7 @@ int uds_build_unix_connection(struct uds_conn_arg *arg) goto close_and_return; } } else { + (void)uds_set_nonblock(sock_fd); if (connect(arg->sockfd, (struct sockaddr *)&sock_addr, sizeof(struct sockaddr_un)) < 0) { goto close_and_return; } @@ -412,6 +451,8 @@ void uds_del_event(struct uds_event *evt) close(evt->peerfd); evt->peerfd = -1; } + if (evt->proxy_path[0]) + unlink(evt->proxy_path); uds_event_delete(p_uds_var->efd[hash], evt->fd); free(evt); return; diff --git a/qtfs/ipc/uds_main.h b/qtfs/ipc/uds_main.h index f69e4b3..6fd77ec 100644 --- a/qtfs/ipc/uds_main.h +++ b/qtfs/ipc/uds_main.h @@ -67,6 +67,19 @@ struct uds_thread_info { int status; }; +enum uds_diag_cnt { + DIAG_U2T_RCV_DEL, + DIAG_U2T_ENDMSG_DEL, + DIAG_T2U_RCVHEAD_DEL, + DIAG_T2U_NORM_DEL, + DIAG_HUP_NOT_DEL, + DIAG_DEL_NOT_HUP, + DIAG_MAX, +}; +struct uds_diag_info { + unsigned long diag_cnt[DIAG_MAX]; +}; + struct uds_event_global_var { int cur; struct uds_event *tofree[UDS_EPOLL_MAX_EVENTS]; @@ -80,6 +93,8 @@ struct uds_event_global_var { int iov_sendlen; char *buf; int buflen; + struct uds_diag_info diag; + unsigned long events; }; struct uds_event { @@ -94,8 +109,8 @@ struct uds_event { }; int (*handler)(void *, int, struct uds_event_global_var *); /* event处理函数 */ void *priv; // private data - char cpath[UDS_SUN_PATH_LEN]; - char spath[UDS_SUN_PATH_LEN]; + // need to unlink if event deleted + char proxy_path[UDS_SUN_PATH_LEN]; }; diff --git a/qtfs/ipc/uds_module.h b/qtfs/ipc/uds_module.h index 4e199ac..305392f 100644 --- a/qtfs/ipc/uds_module.h +++ b/qtfs/ipc/uds_module.h @@ -22,7 +22,8 @@ #define UDS_LOCK_ADDR "/var/run/qtfs/uds.lock" #define UDS_BUILD_CONN_DIR "/var/run/qtfs/" -#define UDS_PROXY_SUFFIX ".proxy" +#define UDS_PROXY_SUFFIX "XXXXXX" +#define UDS_SUFFIX_LEN 6 #define UDS_SUN_PATH_LEN 108 // from glibc struct uds_proxy_remote_conn_req { @@ -32,6 +33,7 @@ struct uds_proxy_remote_conn_req { }; struct uds_proxy_remote_conn_rsp { int ret; + char sun_path_suffix[UDS_SUFFIX_LEN + 1]; }; #endif diff --git a/qtfs/qtfs_common/conn.c b/qtfs/qtfs_common/conn.c index 1974c06..132652c 100644 --- a/qtfs/qtfs_common/conn.c +++ b/qtfs/qtfs_common/conn.c @@ -51,7 +51,7 @@ struct kmem_cache *qtfs_fifo_pvar_cache; // try to connect remote uds server, only for unix domain socket #define QTFS_UDS_PROXY_SUFFIX ".proxy" -int qtfs_uds_proxy_build(struct socket *sock, struct sockaddr_un *addr, int len) +int qtfs_uds_proxy_build(struct socket *sock, struct sockaddr_un *addr, int len, struct sockaddr_un *addr_proxy) { int ret; struct uds_proxy_remote_conn_req req; @@ -96,8 +96,10 @@ int qtfs_uds_proxy_build(struct socket *sock, struct sockaddr_un *addr, int len) if (rsp.ret == 0) { goto err_end; } - qtfs_info("try to build uds proxy successed, sun path:%s", addr->sun_path); - + qtfs_info("try to build uds proxy successed, sun path:%s, sun suffix:%s", addr->sun_path, rsp.sun_path_suffix); + + memcpy(addr_proxy, addr, sizeof(struct sockaddr_un)); + strlcat(addr_proxy->sun_path, rsp.sun_path_suffix, sizeof(addr_proxy->sun_path)); sock_release(proxy_sock); return 0; err_end: @@ -195,10 +197,8 @@ try_conn_remote: goto end; } // try to connect remote uds's proxy - ret = qtfs_uds_proxy_build(sock, &addr_un, len); + ret = qtfs_uds_proxy_build(sock, &addr_un, len, &addr_proxy); if (ret == 0) { - memcpy(&addr_proxy, &addr_un, sizeof(struct sockaddr_un)); - strlcat(addr_proxy.sun_path, QTFS_UDS_PROXY_SUFFIX, sizeof(addr_proxy.sun_path)); sysret = sock->ops->connect(sock, (struct sockaddr *)&addr_proxy, sizeof(struct sockaddr_un), sock->file->f_flags); qtfs_info("try remote connect sunpath:%s ret:%d", addr_proxy.sun_path, sysret); } diff --git a/qtfs/qtinfo/qtinfo.c b/qtfs/qtinfo/qtinfo.c index 4c915ae..9915d10 100644 --- a/qtfs/qtinfo/qtinfo.c +++ b/qtfs/qtinfo/qtinfo.c @@ -372,7 +372,7 @@ static int qtinfo_opt_u() } while (1) { memset(buf, 0, RECV_BUFF_LEN); - ret = recv(sockfd, buf, RECV_BUFF_LEN, 0); + ret = recv(sockfd, buf, RECV_BUFF_LEN - 1, 0); if (ret <= 0) break; qtinfo_out2("%s", buf); -- Gitee