diff --git a/qtfs/rexec/rexec.c b/qtfs/rexec/rexec.c index 1858e0b0d48642fa8dea4139dc3226a2512e4389..5b90c3212574e10e2a081216a7e5bbcd8e1ea8ca 100644 --- a/qtfs/rexec/rexec.c +++ b/qtfs/rexec/rexec.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,12 @@ struct rexec_global_var { int rexec_hs_fd[2]; }; +struct rexec_thread_arg { + int efd; + int connfd; + char **argv; +}; + struct rexec_global_var g_rexec; @@ -192,8 +199,8 @@ static int rexec_conn_msg(struct rexec_client_event *evt) break; } - rexec_log("Rexec conn recv msgtype:%d argc:%d stdno:%d msglen:%d", - head.msgtype, head.argc, head.stdno, head.msglen); + rexec_log("Rexec conn recv msgtype:%d argc:%d pipefd:%d msglen:%d", + head.msgtype, head.argc, head.pipefd, head.msglen); return REXEC_EVENT_OK; } @@ -252,27 +259,13 @@ static int rexec_std_event(int efd, int rstdin, int rstdout, int rstderr) return 0; } -static int rexec_run(int efd, int connfd, char *argv[]) +static void rexec_event_run(int efd) { - int pidfd = -1; - int exit_status = EXIT_FAILURE; - - struct rexec_client_event *connevt = rexec_add_event(efd, connfd, -1, rexec_conn_msg); - if (NULL == connevt || rexec_set_nonblock(connfd, 1) != 0) { - // process will exit, fd or mem resource will free by kernel soon - rexec_err("rexec add connfd event failed"); - return exit_status; - } - // 这两个指针只能在当前函数上下文使用,是当前函数栈指针 - connevt->exit_status = &exit_status; - connevt->pidfd = &pidfd; - struct epoll_event *evts = calloc(REXEC_MAX_EVENTS, sizeof(struct epoll_event)); if (evts == NULL) { rexec_err("init calloc evts failed."); - goto end; + return; } - rexec_log("Rexec process start run, as proxy of remote %s", argv[1]); while (1) { int n = epoll_wait(efd, evts, REXEC_MAX_EVENTS, 1000); int process_exit = 0; @@ -294,10 +287,31 @@ static int rexec_run(int efd, int connfd, char *argv[]) } // process will exit, and free all resource and exit if (process_exit) { - rexec_log("Rexec process %s exit.", argv[1]); break; } } + free(evts); + return; +} + +static int rexec_run(int efd, int connfd, char *argv[]) +{ + int pidfd = -1; + int exit_status = EXIT_FAILURE; + + struct rexec_client_event *connevt = rexec_add_event(efd, connfd, -1, rexec_conn_msg); + if (NULL == connevt || rexec_set_nonblock(connfd, 1) != 0) { + // process will exit, fd or mem resource will free by kernel soon + rexec_err("rexec add connfd event failed"); + return exit_status; + } + // 这两个指针只能在当前函数上下文使用,是当前函数栈指针 + connevt->exit_status = &exit_status; + connevt->pidfd = &pidfd; + + rexec_log("Rexec process start run, as proxy of remote %s", argv[1]); + rexec_event_run(efd); + rexec_log("Rexec process %s exit.", argv[1]); // clear pidmap file if (pidfd > 0) { @@ -307,9 +321,6 @@ static int rexec_run(int efd, int connfd, char *argv[]) remove(path); } -free_end: - free(evts); - end: close(efd); return exit_status; @@ -374,17 +385,6 @@ struct rexec_fdinfo { int offset; }; -static inline unsigned int rexec_fd_mode(int fd) -{ - struct stat st; - char path[32] = {0}; - if (fstat(fd, &st) != 0) { - rexec_err("get fd:%d fstat failed, errno:%d", fd, errno); - return 0; - } - return st.st_mode; -} - static inline int rexec_is_reg_file(int fd) { if (S_ISREG(rexec_fd_mode(fd))) @@ -441,7 +441,7 @@ static char *rexec_get_fds_jsonstr() fddir = opendir("/proc/self/fd"); if (fddir == NULL) { free(fdinfo); - rexec_err("open path:%s failed", REXEC_PIDMAP_PATH); + rexec_err("open path:/proc/self/fd failed"); goto err_end; } @@ -488,6 +488,73 @@ err_end: return NULL; } +// 将rexec进程从parent继承到的匿名pipe继承给远端进程 +static int rexec_pipe_remote_inherit(int efd, int connfd) +{ +#define SELF_FD_PATH "/proc/self/fd" + DIR *fddir = NULL; + struct dirent *fdentry; + struct rexec_msg msg; + mode_t mode; + int pfd[2]; + + fddir = opendir(SELF_FD_PATH); + if (fddir == NULL) { + rexec_err("open path:%s failed", SELF_FD_PATH); + return -1; + } + memset(&msg, 0, sizeof(struct rexec_msg)); + msg.msglen = 0; + msg.pipefd = -1; + msg.msgtype = REXEC_PIPE; + while (fdentry = readdir(fddir)) { + int fd = atoi(fdentry->d_name); + if (fd <= STDERR_FILENO) + continue; + mode = rexec_fd_mode(fd); + if (!S_ISFIFO(mode)) + continue; + rexec_log("inherit pipe fd:%d mode:%o is %s pipe", fd, mode, (!!(mode & S_IRUSR)) ? "read" : "write"); + if (pipe(pfd) == -1) { + rexec_err("failed to create pipe for:%d", fd); + goto err_end; + } + msg.pipefd = fd; + if (!!(mode & S_IRUSR)) { + // inherit read pipe + if (rexec_sendmsg(connfd, (char *)&msg, sizeof(struct rexec_msg), pfd[PIPE_READ]) < 0) { + rexec_err("send read pipe failed, inherit fd:%d", fd); + goto pipe_end; + } + if (rexec_add_event(efd, fd, pfd[PIPE_WRITE], rexec_io) == NULL) { + rexec_err("add read pipe event failed:%d", fd); + goto pipe_end; + } + close(pfd[PIPE_READ]); + } else if (!!(mode & S_IWUSR)) { + if (rexec_sendmsg(connfd, (char *)&msg, sizeof(struct rexec_msg), pfd[PIPE_WRITE]) < 0) { + rexec_err("send write pipe failed, inherit fd:%d", fd); + goto pipe_end; + } + if (rexec_add_event(efd, pfd[PIPE_READ], fd, rexec_io) == NULL) { + rexec_err("add write pipe event failed:%d", fd); + goto pipe_end; + } + close(pfd[PIPE_WRITE]); + } + rexec_log("successed to add pipe fd:%d to remote inherit", fd); + } + closedir(fddir); + return 0; + +pipe_end: + close(pfd[0]); + close(pfd[1]); +err_end: + closedir(fddir); + return -1; +} + static int rexec_handshake_proc(struct rexec_client_event *evt) { char msg[sizeof(struct rexec_msg) + 1]; @@ -543,6 +610,86 @@ err_end: return -1; } +static int rexec_send_binary_msg(int efd, int argc, char *argv[], int arglen, char *fds_json, int connfd) +{ + struct rexec_msg *pmsg = (struct rexec_msg *)malloc(arglen); + if (pmsg == NULL) { + rexec_err("malloc failed"); + free(fds_json); + return -1; + } + char *bufmsg = pmsg->msg; + memset(pmsg, 0, arglen); + pmsg->msgtype = REXEC_EXEC; + pmsg->argc = argc - 1; // for remote binary's argc is argc-1 + // pmsg->msg is like: "binary"\0"argv[1]"\0"argv[2]"\0"..." + pmsg->msglen = rexec_msg_fill_argv(pmsg->argc, &argv[1], bufmsg); + strcpy(&bufmsg[pmsg->msglen], fds_json); + pmsg->msglen += strlen(fds_json); + free(fds_json); + + // pipefd[0] -- for read; pipefd[1] -- for write. + // rexec stdin --> rstdin[1] ------> rstdin[0] as stdin + // rexec stdout <-- rstdout[0] <------ rstdout[1] as stdout + // rexec stderr <-- rstderr[0] <------ rstderr[1] as stderr + int rstdin[2]; + int rstdout[2]; + int rstderr[2]; + + if (pipe(rstdin) == -1 || pipe(rstdout) == -1 || pipe(rstderr) == -1) { + rexec_err("Rexec create pipe failed."); + goto err_end; + } + pmsg->pipefd = REXEC_STDIN; + if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg) + pmsg->msglen, rstdin[0]) < 0) { + rexec_err("Rexec send exec msg failed, errno:%d", errno); + goto err_end; + } + rexec_log("Normal msg send len:%d head:%d", sizeof(struct rexec_msg) + pmsg->msglen, sizeof(struct rexec_msg)); + pmsg->msgtype = REXEC_PIPE; + pmsg->argc = 0; + pmsg->msglen = 0; + pmsg->pipefd = REXEC_STDOUT; + if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg), rstdout[1]) < 0) { + rexec_err("Rexec send exec msg failed, errno:%d", errno); + goto err_end; + } + pmsg->pipefd = REXEC_STDERR; + if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg), rstderr[1]) < 0) { + rexec_err("Rexec send exec msg failed, errno:%d", errno); + goto err_end; + } + + if (rexec_std_event(efd, rstdin[1], rstdout[0], rstderr[0]) != 0) { + rexec_err("add std event failed"); + goto err_end; + } + free(pmsg); + close(rstdin[0]); + close(rstdout[1]); + close(rstderr[1]); + return 0; +err_end: + free(pmsg); + return -1; +} + +static void *rexec_pipe_proxy_thread(void *arg) +{ + struct rexec_thread_arg *parg = (struct rexec_thread_arg *)arg; + rexec_log("pipe proxy thread run."); + rexec_event_run(parg->efd); + rexec_log("pipe proxy thread run over"); + return NULL; +} + +static void *rexec_conn_thread(void *arg) +{ + struct rexec_thread_arg *parg = (struct rexec_thread_arg *)arg; + + return (void *)rexec_run(parg->efd, parg->connfd, parg->argv); +} + static void rexec_global_var_init() { memset(&g_rexec, 0, sizeof(g_rexec)); @@ -556,8 +703,9 @@ int main(int argc, char *argv[]) rexec_log_init(); rexec_clear_pids(); + int pipeefd = epoll_create1(0); int efd = epoll_create1(0); - if (efd == -1) { + if (efd == -1 || pipeefd == -1) { rexec_err("epoll create1 failed, errno:%d.", errno); return -1; } @@ -574,6 +722,10 @@ int main(int argc, char *argv[]) return -1; } rexec_log("Remote exec binary:%s", argv[1]); + if (rexec_pipe_remote_inherit(pipeefd, connfd) != 0) { + rexec_err("Rexec pipe remote inherit failed."); + goto err_end; + } int arglen = rexec_calc_argv_len(argc - 1, &argv[1]); if (arglen <= 0) { @@ -590,76 +742,32 @@ int main(int argc, char *argv[]) arglen = ((arglen / REXEC_MSG_LEN) + 1) * REXEC_MSG_LEN; if (arglen <= 0) { rexec_err("invalid arguments length:%d.", arglen); - free(fds_json); - return -1; - } - - struct rexec_msg *pmsg = (struct rexec_msg *)malloc(arglen); - if (pmsg == NULL) { - rexec_err("malloc failed"); free(fds_json); return -1; } - char *bufmsg = pmsg->msg; - memset(pmsg, 0, arglen); - pmsg->msgtype = REXEC_EXEC; - pmsg->argc = argc - 1; // for remote binary's argc is argc-1 - // pmsg->msg is like: "binary"\0"argv[1]"\0"argv[2]"\0"..." - pmsg->msglen = rexec_msg_fill_argv(pmsg->argc, &argv[1], bufmsg); - strcpy(&bufmsg[pmsg->msglen], fds_json); - pmsg->msglen += strlen(fds_json); - free(fds_json); - - // pipefd[0] -- for read; pipefd[1] -- for write. - // rexec stdin --> rstdin[1] ------> rstdin[0] as stdin - // rexec stdout <-- rstdout[0] <------ rstdout[1] as stdout - // rexec stderr <-- rstderr[0] <------ rstderr[1] as stderr - int rstdin[2]; - int rstdout[2]; - int rstderr[2]; - if (pipe(rstdin) == -1 || pipe(rstdout) == -1 || pipe(rstderr) == -1) { - rexec_err("Rexec create pipe failed."); - goto err_end; - } - pmsg->stdno = REXEC_STDIN; - if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg) + pmsg->msglen, rstdin[0]) < 0) { - rexec_err("Rexec send exec msg failed, errno:%d", errno); - goto err_end; - } - rexec_log("Normal msg send len:%d head:%d", sizeof(struct rexec_msg) + pmsg->msglen, sizeof(struct rexec_msg)); - pmsg->msgtype = REXEC_PIPE; - pmsg->argc = 0; - pmsg->msglen = 0; - pmsg->stdno = REXEC_STDOUT; - if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg), rstdout[1]) < 0) { - rexec_err("Rexec send exec msg failed, errno:%d", errno); - goto err_end; - } - pmsg->stdno = REXEC_STDERR; - if (rexec_sendmsg(connfd, (char *)pmsg, sizeof(struct rexec_msg), rstderr[1]) < 0) { - rexec_err("Rexec send exec msg failed, errno:%d", errno); + if (rexec_send_binary_msg(efd, argc, argv, arglen, fds_json, connfd) != 0) { + rexec_err("send binary information message failed."); goto err_end; } - free(pmsg); - int exit_status; - close(rstdin[0]); - close(rstdout[1]); - close(rstderr[1]); - if (rexec_std_event(efd, rstdin[1], rstdout[0], rstderr[0]) != 0) { - rexec_err("add std event failed"); - goto err_end; - } - exit_status = rexec_run(efd, connfd, argv); - close(rstdin[1]); - close(rstdout[0]); - close(rstderr[0]); - fclose(rexec_logfile); + pthread_t thrd; + pthread_t thrd_conn; + struct rexec_thread_arg targ; + struct rexec_thread_arg connarg; + void *exit_status; + targ.efd = pipeefd; + (void)pthread_create(&thrd, NULL, rexec_pipe_proxy_thread, &targ); - exit(exit_status); + connarg.efd = efd; + connarg.connfd = connfd; + connarg.argv = argv; + (void)pthread_create(&thrd_conn, NULL, rexec_conn_thread, &connarg); + pthread_join(thrd_conn, (void *)&exit_status); + fclose(rexec_logfile); + exit((int)exit_status); err_end: fclose(rexec_logfile); - free(pmsg); + rexec_logfile = NULL; return -1; } diff --git a/qtfs/rexec/rexec.h b/qtfs/rexec/rexec.h index e9182a29caeec09cf3c815532d2c6e63e3f1482e..acd6cca19df94b680ec90df2e881d43b9225102a 100644 --- a/qtfs/rexec/rexec.h +++ b/qtfs/rexec/rexec.h @@ -18,6 +18,7 @@ #include #include +#include enum { PIPE_READ = 0, @@ -32,7 +33,7 @@ enum { }; enum { - REXEC_STDIN = 0x5a, + REXEC_STDIN = 0, REXEC_STDOUT, REXEC_STDERR, REXEC_NONE, @@ -59,7 +60,7 @@ struct rexec_msg { int msgtype; // client to server int argc; - int stdno; + int pipefd; int msglen; // server to client int exit_status; @@ -155,5 +156,15 @@ static inline int rexec_set_inherit(int fd, bool inherit) p.tm_hour, p.tm_min, p.tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ } +static inline unsigned int rexec_fd_mode(int fd) +{ + struct stat st; + if (fstat(fd, &st) != 0) { + rexec_err("get fd:%d fstat failed, errno:%d", fd, errno); + return 0; + } + return st.st_mode; +} + #endif diff --git a/qtfs/rexec/rexec_server.c b/qtfs/rexec/rexec_server.c index c8b686b374eaad9867fcf0acb52943475616d4fc..00479b8c1e749fb32065acac6e367da257b732eb 100644 --- a/qtfs/rexec/rexec_server.c +++ b/qtfs/rexec/rexec_server.c @@ -109,8 +109,8 @@ static int rexec_event_process_manage(struct rexec_event *event) kill(event->pid, SIGKILL); return REXEC_EVENT_DEL; } - rexec_err("Recv msg from client, msgtype:%d msglen:%d argc:%d stdno:%d", - head.msgtype, head.msglen, head.argc, head.stdno); + rexec_err("Recv msg from client, msgtype:%d msglen:%d argc:%d pipefd:%d", + head.msgtype, head.msglen, head.argc, head.pipefd); return REXEC_EVENT_OK; } @@ -137,14 +137,18 @@ static int rexec_event_handshake(struct rexec_event *event) return REXEC_EVENT_DEL; } -static void rexec_dup_std(int fd, int stdno) +static int rexec_dup_pipefd(int fd, int pipefd) { - if (stdno < REXEC_STDIN || stdno > REXEC_STDERR) { - return; + int dupfd; + if (fd == -1) + return 0; + dupfd = dup2(fd, pipefd); + if (dupfd != pipefd) { + rexec_err("failed to dup pipefd:%d", pipefd); + return -1; } - dup2(fd, stdno - REXEC_STDIN); close(fd); - return; + return 0; } // argv list: [0]binary,[1]-f,[2]*json_str,[3]arg1,[4]arg2,... @@ -312,15 +316,19 @@ static int rexec_start_new_process(int newconnfd) int scmfd = -1; int len = sizeof(struct rexec_msg); memset(&head, 0, sizeof(struct rexec_msg)); + scmfd = -1; ret = rexec_recvmsg(newconnfd, (char *)&head, len, &scmfd, MSG_WAITALL); if (ret <= 0) { rexec_log("recvmsg ret:%d, errno:%d", ret, errno); goto err_to_parent; } // 将管道与自己的标准输入输出关联 - rexec_dup_std(scmfd, head.stdno); - if (head.stdno >= REXEC_STDIN && head.stdno <= REXEC_STDERR) { - msg_bit |= (1 << (head.stdno - REXEC_STDIN)); + if (rexec_dup_pipefd(scmfd, head.pipefd) != 0) { + rexec_err("dup scm:%d pipefd:%d failed", scmfd, head.pipefd); + goto err_to_parent; + } + if (head.pipefd >= REXEC_STDIN && head.pipefd <= REXEC_STDERR) { + msg_bit |= (1 << (head.pipefd - REXEC_STDIN)); } if (head.msglen == 0) continue; @@ -332,7 +340,7 @@ static int rexec_start_new_process(int newconnfd) msg_bit |= REXEC_MSG_NORMAL; // exec msg rexec_log("Exec msgtype:0x%x msglen:%d argc:%d stdno:%d", - head.msgtype, head.msglen, head.argc, head.stdno); + head.msgtype, head.msglen, head.argc, head.pipefd); argc = head.argc; if (head.msglen > REXEC_MSG_MAX || argc > REXEC_MSG_MAX / sizeof(uintptr_t) || head.msglen <= 0 || argc < 0) { @@ -512,7 +520,9 @@ static void rexec_server_mainloop() end: close(main_epoll_fd); + main_epoll_fd = -1; close(ser.sockfd); + ser.sockfd = -1; return; } diff --git a/qtfs/rexec/rexec_shim.c b/qtfs/rexec/rexec_shim.c index cc7fc9e10e8b78263035b130c7ded894149e3da3..5bd8a19980d9d9426c4f54329ed04ae21119ff82 100644 --- a/qtfs/rexec/rexec_shim.c +++ b/qtfs/rexec/rexec_shim.c @@ -41,7 +41,7 @@ void rshim_close_all_fd() } while (entry = readdir(dir)) { int fd = atoi(entry->d_name); - if (fd <= 2) + if (fd <= 2 || S_ISFIFO(rexec_fd_mode(fd))) continue; close(fd); } diff --git a/qtfs/test/rexec_test/test_rexec_pipe_inherit.c b/qtfs/test/rexec_test/test_rexec_pipe_inherit.c new file mode 100644 index 0000000000000000000000000000000000000000..de85fc25daa03b199eb9a1bd8446121f5228fd1a --- /dev/null +++ b/qtfs/test/rexec_test/test_rexec_pipe_inherit.c @@ -0,0 +1,23 @@ +#include +#include + +int main(int argc, char *argv[]) +{ + int fd[2]; + pipe(fd); + int pid = fork(); + if (pid == 0) { + // child + char fdstr[16] = {0}; + sprintf(fdstr, "%d", fd[0]); + char *argvchild[] = {"rexec", argv[1], fdstr, NULL}; + close(fd[1]); + execv("/usr/bin/rexec", argvchild); + perror("execv"); + } + close(fd[0]); + sleep(1); + write(fd[1], "hello", 5); + return 0; +} + diff --git a/qtfs/test/rexec_test/test_rexec_pipe_inherit_remote.c b/qtfs/test/rexec_test/test_rexec_pipe_inherit_remote.c new file mode 100644 index 0000000000000000000000000000000000000000..91e183b3c0c8689e355c9b88e2ec8193c65f04f8 --- /dev/null +++ b/qtfs/test/rexec_test/test_rexec_pipe_inherit_remote.c @@ -0,0 +1,13 @@ +#include +#include +#include + +int main(int argc, char *argv[]) +{ + char buf[16] = {0}; + int fd = atoi(argv[1]); + int ret = read(fd, buf, 16); + printf("read from pipe fd:%d string:%s ret:%d errno:%d\n", fd, buf, ret, errno); + return 0; +} +