diff --git a/qtfs/include/comm.h b/qtfs/include/comm.h index b275a1df53d23bffce2cd7399076e784042d99fd..faa8ce3f7da4406d91bc10bfa1f501a6953a945a 100644 --- a/qtfs/include/comm.h +++ b/qtfs/include/comm.h @@ -201,6 +201,7 @@ struct qtinfo { int epoll_state; int pvar_vld; // valid param's number int pvar_busy; // busy param's number + unsigned int port; // qtfs port }; #define QTINFO_STATE(state) ((state == QTCONN_INIT) ? "INIT" : \ diff --git a/qtfs/include/log.h b/qtfs/include/log.h index eaf351754f9e66e9658662c856dc0868d7f7c972..4df42b32346e83d86786c69530d134519b5e0f50 100644 --- a/qtfs/include/log.h +++ b/qtfs/include/log.h @@ -14,7 +14,6 @@ #ifndef __QTFS_LOG_H__ #define __QTFS_LOG_H__ -#include #include "comm.h" enum level { @@ -25,6 +24,46 @@ enum level { LOG_DEBUG }; +#ifndef __KERNEL__ +#include +#define true 1 +#define log_info(info, ...) \ + if (true) {\ + time_t t; \ + struct tm p; \ + time(&t); \ + localtime_r(&t, &p); \ + fprintf(stdout, "[%d/%02d/%02d %02d:%02d:%02d][LOG:%s:%3d]"info"\n", \ + p.tm_year + 1900, p.tm_mon+1, p.tm_mday, \ + p.tm_hour, p.tm_min, p.tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +#define log_warn(info, ...) \ + if (true) {\ + time_t t; \ + struct tm p; \ + time(&t); \ + localtime_r(&t, &p); \ + fprintf(stdout, "[%d/%02d/%02d %02d:%02d:%02d][WARN:%s:%3d]"info"\n", \ + p.tm_year + 1900, p.tm_mon+1, p.tm_mday, \ + p.tm_hour, p.tm_min, p.tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +#define log_err(info, ...) \ + if (true) {\ + time_t t; \ + struct tm p; \ + time(&t); \ + localtime_r(&t, &p); \ + fprintf(stderr, "[%d/%02d/%02d %02d:%02d:%02d][ERROR:%s:%3d]"info"\n", \ + p.tm_year + 1900, p.tm_mon+1, p.tm_mday, \ + p.tm_hour, p.tm_min, p.tm_sec, __func__, __LINE__, ##__VA_ARGS__); \ + } + +#endif + +#ifdef __KERNEL__ +#include extern int log_level; #define qtfs_crit(fmt, ...) \ @@ -121,6 +160,6 @@ static inline int qtfs_log_init(char *level, int len) { } \ } \ ) - +#endif #endif diff --git a/qtfs/include/qtfs_fifo.h b/qtfs/include/qtfs_fifo.h new file mode 100644 index 0000000000000000000000000000000000000000..f50a693680ced6a7966286bd6bc9c61335547fb5 --- /dev/null +++ b/qtfs/include/qtfs_fifo.h @@ -0,0 +1,14 @@ +#ifndef __QTFS_FIFO_H__ +#define __QTFS_FIFO_H__ + + +struct fifo_server_arg_t { + char *addr; + unsigned int port; + unsigned int cid; + unsigned int family; // AF_VSOCK or AF_INET +}; +void *fifo_server_main_thread(void *arg); + +#endif + diff --git a/qtfs/include/req.h b/qtfs/include/req.h index 70a6bf7fdde971cf288a3bf2535af65d9a09e1d3..3591bd6dd0d7cac70edcd6b917e155abf6a25717 100644 --- a/qtfs/include/req.h +++ b/qtfs/include/req.h @@ -14,12 +14,24 @@ #ifndef __QTFS_REQ_STRUCT_DEF_H__ #define __QTFS_REQ_STRUCT_DEF_H__ +#ifdef __KERNEL__ #include #include #include +#endif + +#ifndef __KERNEL__ +#define PATH_MAX 4096 +typedef unsigned int u32; +typedef int s32; +typedef unsigned long u64; +typedef long s64; +#endif + #include "comm.h" #include "log.h" + enum qtreq_type { QTFS_REQ_NULL, QTFS_REQ_MOUNT, @@ -100,8 +112,6 @@ struct qtfs_dirent64 { #define MAX_ELSE_LEN (1024 * 128) #define QTFS_REQ_MAX_LEN (MAX_PATH_LEN + MAX_ELSE_LEN) -#define MAX_BUF 4096 - // QTFS_TAIL_LEN解释: // 私有数据结构最大长度为QTFS_REQ_MAX_LEN,超出就越界了 // 一般有变长buf要求的,把变长buf放在末尾 @@ -136,6 +146,7 @@ struct qtreq { #define QTFS_FIFO_HEAD_LEN 32 // fifo只用很少的额外头,32应该足够了 #define QTFS_FIFO_REQ_LEN (QTFS_MSG_HEAD_LEN + QTFS_FIFO_HEAD_LEN) +#ifdef __KERNEL__ struct qtreq_ioctl { struct qtreq_ioctl_len { unsigned int cmd; @@ -560,3 +571,42 @@ struct qtrsp_sc_sched_affinity { unsigned long user_mask_ptr[0]; }; #endif + +// fifo +struct qtreq_fifo_open { + u32 flags; + u32 mode; + char path[MAX_PATH_LEN]; +}; + +struct qtrsp_fifo_open { + s32 err; +}; + +struct qtreq_fifo_read { + u64 len; +}; + +struct qtrsp_fifo_read { + s32 err; // same as kernel errcode, 0 is ok, < 0 is errcode + u64 len; +}; + +struct qtreq_fifo_write { + u64 len; +}; + +struct qtrsp_fifo_write { + s32 err; + u64 len; +}; + +struct qtreq_fifo_close { + // nothing +}; + +struct qtrsp_fifo_close { + // nothing +}; + +#endif diff --git a/qtfs/qtfs/fifo.c b/qtfs/qtfs/fifo.c index 2bf772b5c393bca88066933d74c81ae5160cadd0..ec68337bb590519a494bdd1ec9bca7f0cc1f2589 100644 --- a/qtfs/qtfs/fifo.c +++ b/qtfs/qtfs/fifo.c @@ -24,45 +24,6 @@ #include "req.h" #include "log.h" -// 对接rust,长度对齐1字节 -#pragma pack(1) -struct qtreq_fifo_open { - u32 flags; - u32 mode; - char path[MAX_PATH_LEN]; -}; - -struct qtrsp_fifo_open { - s32 errno; -}; - -struct qtreq_fifo_read { - u64 len; -}; - -struct qtrsp_fifo_read { - s32 errno; // same as kernel errcode, 0 is ok, < 0 is errcode - u64 len; -}; - -struct qtreq_fifo_write { - u64 len; -}; - -struct qtrsp_fifo_write { - s32 errno; - u64 len; -}; - -struct qtreq_fifo_close { - // nothing -}; - -struct qtrsp_fifo_close { - // nothing -}; -#pragma pack() - static void qtfs_fifo_put_file(struct file *file) { struct qtfs_conn_var_s *pvar = file->private_data; @@ -109,7 +70,7 @@ int qtfs_fifo_open(struct inode *inode, struct file *file) fiforeq->flags = file->f_flags; fiforeq->mode = file->f_mode; - qtfs_debug("fifo open path:%s size req:%lu size open:%lu, flags:%llu mode%u", + qtfs_debug("fifo open path:%s size req:%lu size open:%lu, flags:%u mode%u", fiforeq->path, sizeof(struct qtreq), QTFS_SEND_SIZE(struct qtreq_fifo_open, fiforeq->path), fiforeq->flags, fiforeq->mode); vec_save = pvar->vec_send; @@ -120,10 +81,10 @@ int qtfs_fifo_open(struct inode *inode, struct file *file) pvar->vec_send = vec_save; pvar->send_max = sendmax_save; - if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { - ret = IS_ERR_OR_NULL(rsp) ? -EFAULT : -rsp->errno; + if (IS_ERR_OR_NULL(rsp) || rsp->err != 0) { + ret = IS_ERR_OR_NULL(rsp) ? -EFAULT : -rsp->err; qtfs_fifo_put_param(pvar); - qtfs_err("qtfs fifo open :%s failed mode:%o flag:%llx", fiforeq->path, fiforeq->mode, fiforeq->flags); + qtfs_err("qtfs fifo open :%s failed mode:%o flag:%x", fiforeq->path, fiforeq->mode, fiforeq->flags); kfree(req); return ret; } @@ -148,20 +109,26 @@ ssize_t qtfs_fifo_readiter(struct kiocb *kio, struct iov_iter *iov) req = pvar->conn_ops->get_conn_msg_buf(pvar, QTFS_SEND); req->len = iov_iter_count(iov); pvar->vec_recv.iov_len = QTFS_MSG_HEAD_LEN + sizeof(struct qtrsp_fifo_read); + qtfs_info("fifo readiter len:%llu", req->len); rsp = qtfs_remote_run(pvar, QTFS_REQ_READITER, sizeof(struct qtreq_fifo_read)); - if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { - qtfs_err("remote run failed. or errno:%d", (rsp == NULL) ? -1 : rsp->errno); + if (IS_ERR_OR_NULL(rsp) || rsp->err != 0) { + qtfs_err("remote run failed. or errno:%d", (rsp == NULL) ? -1 : rsp->err); return -EFAULT; } while (total < rsp->len) { ret = pvar->conn_ops->conn_recv_iter(&pvar->conn_var, iov, false); + if (ret == -EAGAIN) + continue; + if (ret <= 0) { qtfs_err("recv iter from conn module ret:%d", ret); break; } + iov->iov_offset += ret; total += ret; } + qtfs_info("fifo readiter over, total:%d, rsplen:%llu", total, rsp->len); return total; } @@ -180,8 +147,8 @@ ssize_t qtfs_fifo_writeiter(struct kiocb *kio, struct iov_iter *iov) pvar->vec_recv.iov_len = QTFS_MSG_HEAD_LEN + sizeof(struct qtrsp_fifo_write); pvar->iov_send = iov; rsp = qtfs_remote_run(pvar, QTFS_REQ_WRITE, sizeof(struct qtreq_fifo_write)); - if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { - qtfs_err("fifo write remote run failed, or errno:%d", (rsp == NULL) ? -1 : rsp->errno); + if (IS_ERR_OR_NULL(rsp) || rsp->err != 0) { + qtfs_err("fifo write remote run failed, or errno:%d", (rsp == NULL) ? -1 : rsp->err); return -EFAULT; } return rsp->len; diff --git a/qtfs/qtfs/syscall.c b/qtfs/qtfs/syscall.c index adebeaf209f8722b9d4d956a6db7c550b39a7bfa..61a0de06c57338604e1a4a9acd039d33d9361204 100644 --- a/qtfs/qtfs/syscall.c +++ b/qtfs/qtfs/syscall.c @@ -56,7 +56,7 @@ static inline int qtfs_fstype_judgment(char __user *dir) return 1; } path_put(&path); - qtfs_debug("qtfs fstype judge <%s> is not qtfs.\n", path.dentry->d_iname); + qtfs_info("qtfs fstype judge <%s> is not qtfs.\n", path.dentry->d_iname); return 0; } diff --git a/qtfs/qtfs_common/conn.c b/qtfs/qtfs_common/conn.c index ca9104d796608deb900c472c977a963d218ea047..99809d229a70281c3041d758272283decb3dc5b2 100644 --- a/qtfs/qtfs_common/conn.c +++ b/qtfs/qtfs_common/conn.c @@ -894,8 +894,6 @@ struct qtfs_conn_var_s *qtfs_fifo_get_param(void) // initialize conn_pvar here pvar->recv_max = QTFS_FIFO_REQ_LEN; pvar->send_max = QTFS_FIFO_REQ_LEN; - pvar->magic_send = QTFS_FIFO_MAGIC_SEND; - pvar->magic_recv = QTFS_FIFO_MAGIC_RECV; pvar->user_type = QTFS_CONN_TYPE_FIFO; g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type); if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) { diff --git a/qtfs/qtfs_common/libsocket.c b/qtfs/qtfs_common/libsocket.c new file mode 100644 index 0000000000000000000000000000000000000000..72151f1d18450ca15b3d6cd619fc94e28dc9ac6d --- /dev/null +++ b/qtfs/qtfs_common/libsocket.c @@ -0,0 +1,175 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * qtfs licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: Liqiang + * Create: 2023-11-23 + * Description: socket api in user-mode + *******************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "log.h" +#include "libsocket.h" + +struct lib_sock_arg { + int cs; // client(1) or server(2) + int sfamily; // vsock or tcp or uds + int stype; // SOCK_DGRAM or SOCK_STREAM + + struct sockaddr_storage saddr; +}; + +static inline int check_sock_arg(struct lib_sock_arg *arg) +{ + if (arg->cs != LIBSOCK_CLIENT && arg->cs != LIBSOCK_SERVER) { + log_err("build new connection role invalid(%d) must be CLIENT(%d) or SERVER(%d)", arg->cs, LIBSOCK_CLIENT, LIBSOCK_SERVER); + return -1; + } + if (arg->sfamily != AF_VSOCK && arg->sfamily != AF_INET && arg->sfamily != AF_UNIX) { + log_err("build new connection family invalid(%d), just support AF_UNIX(%d)/AF_INET(%d)/AF_VSOCK(%d).", + arg->sfamily, AF_UNIX, AF_INET, AF_VSOCK); + return -1; + } + if (arg->stype != SOCK_DGRAM && arg->stype != SOCK_STREAM) { + log_err("build new connection type invalid(%d), just support SOCK_DGRAM or SOCK_STREAM.", + arg->stype, SOCK_DGRAM, SOCK_STREAM); + return -1; + } + return 0; +} + +static inline int get_sock_len(int family) +{ + switch (family) { + case AF_VSOCK: + return sizeof(struct sockaddr_vm); + case AF_INET: + return sizeof(struct sockaddr_in); + case AF_UNIX: + return sizeof(struct sockaddr_un); + default: + break; + } + log_err("invalid family:%d", family); + return -1; +} + +static int libsock_build_connection(struct lib_sock_arg *arg) +{ + int ret; +#define MAX_LISTEN_NUM 64 + if (check_sock_arg(arg) != 0) { + log_err("Arg error, please check!"); + return -1; + } + + int sockfd = socket(arg->sfamily, arg->stype, 0); + if (sockfd < 0) { + log_err("As %s failed, socket fd:%d, errno:%d.", + (arg->cs == LIBSOCK_CLIENT) ? "client" : "server", sockfd, errno); + return -1; + } + + if (arg->cs == LIBSOCK_SERVER) { + if ((ret = bind(sockfd, (struct sockaddr *)&arg->saddr, get_sock_len(arg->sfamily))) < 0) { + log_err("As server failed socklen:%d, bind ret:%d error:%d", get_sock_len(arg->sfamily), ret, errno); + goto err_ret; + } + if ((ret = listen(sockfd, MAX_LISTEN_NUM)) < 0) { + log_err("As server listen failed ret:%d errno:%d", ret, errno); + goto err_ret; + } + } else { + if ((ret = connect(sockfd, (struct sockaddr *)&arg->saddr, get_sock_len(arg->sfamily))) < 0) { + log_err("As client failed socklen:%d, connect ret:%d errno:%d", get_sock_len(arg->sfamily), ret, errno); + goto err_ret; + } + } + return sockfd; + +err_ret: + close(sockfd); + return -1; +} + +int libsock_accept(int sockfd, int family) +{ + struct sockaddr_storage saddr; + socklen_t len = get_sock_len(family); + int connfd = accept(sockfd, (struct sockaddr *)&saddr, &len); + if (connfd <= 0) { + log_err("Accept failed sockfd:%d family:%d ret:%d errno:%d", sockfd, family, connfd, errno); + return -1; + } + return connfd; +} + +int libsock_build_inet_connection(char *ip, unsigned short port, enum libsock_cs_e cs) +{ + struct lib_sock_arg arg; + struct sockaddr_in *in; + in = (struct sockaddr_in *)&arg.saddr; + + memset(&arg, 0, sizeof(struct lib_sock_arg)); + in->sin_family = AF_INET; + in->sin_port = htons(port); + in->sin_addr.s_addr = inet_addr(ip); + arg.cs = cs; + arg.sfamily = AF_INET; + arg.stype = SOCK_STREAM; + + int sockfd = libsock_build_connection(&arg); + if (sockfd < 0) { + log_err("build inet connection failed, ip:%s port:%u", ip, port); + return -1; + } + return sockfd; +} + +int libsock_build_vsock_connection(unsigned int cid, unsigned int port, enum libsock_cs_e cs) +{ + struct lib_sock_arg arg; + struct sockaddr_vm *vm; + vm = (struct sockaddr_vm *)&arg.saddr; + + memset(&arg, 0, sizeof(struct lib_sock_arg)); + vm->svm_family = AF_VSOCK; + vm->svm_port = port; + vm->svm_cid = cid; + arg.cs = cs; + arg.sfamily = AF_VSOCK; + arg.stype = SOCK_STREAM; + + int sockfd = libsock_build_connection(&arg); + if (sockfd < 0) { + log_err("build vsock connection failed, cid:%u port:%u", cid, port); + return -1; + } + return sockfd; +} + diff --git a/qtfs/qtfs_common/libsocket.h b/qtfs/qtfs_common/libsocket.h new file mode 100644 index 0000000000000000000000000000000000000000..b3f407421e7510ad8be0d011992bca851137bf1e --- /dev/null +++ b/qtfs/qtfs_common/libsocket.h @@ -0,0 +1,29 @@ +/****************************************************************************** + * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. + * qtfs licensed under the Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR + * PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: Liqiang + * Create: 2023-11-23 + * Description: socket api in user-mode + *******************************************************************************/ + +#ifndef __LIB_SOCKET_H__ +#define __LIB_SOCKET_H__ + +enum libsock_cs_e { + LIBSOCK_CLIENT = 1, + LIBSOCK_SERVER, +}; + +int libsock_accept(int sockfd, int family); +int libsock_build_inet_connection(char *ip, unsigned short port, enum libsock_cs_e cs); +int libsock_build_vsock_connection(unsigned int cid, unsigned int port, enum libsock_cs_e cs); + +#endif + diff --git a/qtfs/qtfs_common/misc.c b/qtfs/qtfs_common/misc.c index 6f84b9506c4ceb48fa921c636aaf54e9a9ef27e9..e1c3520da20860e7f848196a8078a1ea875a9037 100644 --- a/qtfs/qtfs_common/misc.c +++ b/qtfs/qtfs_common/misc.c @@ -277,6 +277,11 @@ long qtfs_misc_ioctl(struct file *file, unsigned int cmd, unsigned long arg) } qtfs_req_size(); qtfs_diag_info->log_level = log_level; +#ifdef QTFS_TEST_MODE + qtfs_diag_info->port = qtfs_server_port; +#else + qtfs_diag_info->port = qtfs_server_vsock_port; +#endif qtfs_misc_flush_threadstate(); qtfs_conn_list_cnt(); if (copy_to_user((void *)arg, qtfs_diag_info, sizeof(struct qtinfo))) { diff --git a/qtfs/qtfs_common/user_engine.c b/qtfs/qtfs_common/user_engine.c index 4ce8b8eba93e953b0f6c0c7d757956704d920bc0..90707c02301ca0e4195d31a7fec7aea6ac644d2e 100644 --- a/qtfs/qtfs_common/user_engine.c +++ b/qtfs/qtfs_common/user_engine.c @@ -39,6 +39,7 @@ #include "comm.h" #include "ipc/uds_main.h" +#include "qtfs_fifo.h" char wl_type_str[QTFS_WHITELIST_MAX][16] = { "Open", @@ -434,6 +435,26 @@ err: return -1; } +static unsigned int engine_get_kernel_port(int fd) +{ + unsigned int port; + struct qtinfo *diag = (struct qtinfo *)malloc(sizeof(struct qtinfo)); + if (diag == NULL) { + engine_err("malloc failed."); + return -1; + } + memset(diag, 0, sizeof(struct qtinfo)); + int ret = ioctl(fd, QTFS_IOCTL_ALLINFO, diag); + if (ret != QTOK) { + engine_err("ioctl failed, ret:%d.", ret); + free(diag); + return -1; + } + port = diag->port; + free(diag); + return port; +} + #define QTFS_ENGINE_FD_LIMIT 65536 static void engine_rlimit() { @@ -490,6 +511,7 @@ int main(int argc, char *argv[]) pthread_t texec[QTFS_MAX_THREADS]; pthread_t tepoll; + pthread_t tfifo; if (thread_nums < 0 || thread_nums > QTFS_MAX_THREADS) { engine_err("qtfs engine parm invalid, thread_nums:%d(must <= %d).", thread_nums, QTFS_MAX_THREADS); @@ -506,6 +528,7 @@ int main(int argc, char *argv[]) goto end; } struct engine_arg arg[QTFS_MAX_THREADS]; + struct fifo_server_arg_t fifo_arg; for (int i = 0; i < thread_nums; i++) { arg[i].psize = QTFS_USERP_SIZE; arg[i].fd = fd; @@ -513,6 +536,22 @@ int main(int argc, char *argv[]) (void)pthread_create(&texec[i], NULL, qtfs_engine_kthread, &arg[i]); } (void)pthread_create(&tepoll, NULL, qtfs_engine_epoll_thread, &arg[0]); + +#ifdef QTFS_TEST_MODE + fifo_arg.addr = argv[3]; + fifo_arg.family = AF_INET; +#else + fifo_arg.cid = atoi(argv[3]); + fifo_arg.family = AF_VSOCK; +#endif + fifo_arg.port = engine_get_kernel_port(fd); + if (fifo_arg.port < 0) { + engine_err("failed to get qtfs port."); + goto end; + } + engine_out("qtfs server port:%d set to fifo is:%d", fifo_arg.port, fifo_arg.port + 2); + fifo_arg.port += 2; // 默认约定epoll+1,fifo+2 + (void)pthread_create(&tfifo, NULL, fifo_server_main_thread, &fifo_arg); // 必须放在这个位置,uds main里面最终也有join if (uds_proxy_main(6, &argv[1]) != 0) { engine_out("uds proxy start failed."); @@ -524,6 +563,7 @@ int main(int argc, char *argv[]) engine_out("qtfs engine join thread %d.", i); } pthread_join(tepoll, NULL); + pthread_join(tfifo, NULL); engine_free: qtfs_engine_userp_free(userp, thread_nums); engine_out("qtfs engine join epoll thread."); diff --git a/qtfs/qtfs_server/Makefile b/qtfs/qtfs_server/Makefile index 08b70f52093b64837b745be28be0cae9aba1327d..4608365f1ca60e08ec60fd9abcbeb4e8941f61d8 100644 --- a/qtfs/qtfs_server/Makefile +++ b/qtfs/qtfs_server/Makefile @@ -1,50 +1,56 @@ -ifdef QTFS_TEST_MODE -ccflags-y += -I$(src)/../ -I$(src) -I$(src)/../ipc/ -I$(src)/../include/ -DQTFS_SERVER -DQTFS_TEST_MODE -CFLAGS += -DUDS_TEST_MODE -else -ccflags-y += -I$(src)/../ -I$(src) -I$(src)/../ipc/ -I$(src)/../include/ -DQTFS_SERVER -endif - -CFLAGS += -g -O2 -CFLAGS += -fstack-protector-strong -CFLAGS += -fPIE -pie -fPIC -CFLAGS += -D_FORTIFY_SOURCE=2 -LDFLAGS += -Wl,-z,now -LDFLAGS += -Wl,-z,noexecstack -LDFLAGS += -fPIE -pie - -KBUILD=/lib/modules/$(shell uname -r)/build/ -COMM=../qtfs_common/ -COMMO=$(COMM)/conn.o $(COMM)/misc.o $(COMM)/symbol_wrapper.o $(COMM)/socket.o $(COMM)/qtfs_check.o - -obj-m:=qtfs_server.o -qtfs_server-objs:=fsops.o qtfs-server.o $(COMMO) - -DEPGLIB=-lglib-2.0 -I../ -I../include/ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include - -all: qtfs_server engine - -qtfs_server: - make -C $(KBUILD) M=$(PWD) modules - @test -z $(QTFS_TEST_MODE) || echo "Important risk warning: The test mode is turned on,\ - and qtfs will expose the network port, which will bring security risks and is only for\ - testing! If you do not understand the risks, please don't use or compile again without\ - QTFS_TEST_MODE." - -engine: uds_event.o uds_main.o user_engine.o - gcc $(LDFLAGS) -o engine $^ -lpthread $(DEPGLIB) -I../ -I../ipc/ -DQTFS_SERVER - -user_engine.o: - cc $(CFLAGS) -c -o user_engine.o ../qtfs_common/user_engine.c $(DEPGLIB) -I../ -DQTFS_SERVER - -uds_event.o: - cc $(CFLAGS) -c -o uds_event.o ../ipc/uds_event.c -DQTFS_SERVER $(DEPGLIB) - -uds_main.o: - cc $(CFLAGS) -c -o uds_main.o ../ipc/uds_main.c -DQTFS_SERVER $(DEPGLIB) - -clean: - make -C $(KBUILD) M=$(PWD) clean - rm -rf engine - rm -rf ../*.o - rm -rf $(COMMO) $(COMM).*.o.cmd +ifdef QTFS_TEST_MODE +ccflags-y += -I$(src)/../ -I$(src) -I$(src)/../ipc/ -I$(src)/../include/ -DQTFS_SERVER -DQTFS_TEST_MODE +CFLAGS += -DUDS_TEST_MODE -DQTFS_TEST_MODE +else +ccflags-y += -I$(src)/../ -I$(src) -I$(src)/../ipc/ -I$(src)/../include/ -DQTFS_SERVER +endif + +CFLAGS += -g -O2 +CFLAGS += -fstack-protector-strong +CFLAGS += -fPIE -pie -fPIC +CFLAGS += -D_FORTIFY_SOURCE=2 +LDFLAGS += -Wl,-z,now +LDFLAGS += -Wl,-z,noexecstack +LDFLAGS += -fPIE -pie + +KBUILD=/lib/modules/$(shell uname -r)/build/ +COMM=../qtfs_common/ +COMMO=$(COMM)/conn.o $(COMM)/misc.o $(COMM)/symbol_wrapper.o $(COMM)/socket.o $(COMM)/qtfs_check.o + +obj-m:=qtfs_server.o +qtfs_server-objs:=fsops.o qtfs-server.o $(COMMO) + +DEPGLIB=-lglib-2.0 -I../ -I../include/ -I/usr/include/glib-2.0 -I/usr/lib64/glib-2.0/include + +all: qtfs_server engine + +qtfs_server: + make -C $(KBUILD) M=$(PWD) modules + @test -z $(QTFS_TEST_MODE) || echo "Important risk warning: The test mode is turned on,\ + and qtfs will expose the network port, which will bring security risks and is only for\ + testing! If you do not understand the risks, please don't use or compile again without\ + QTFS_TEST_MODE." + +engine: uds_event.o uds_main.o user_engine.o server_fifo.o libsocket.o + gcc $(LDFLAGS) -o engine $^ -lpthread $(DEPGLIB) -I../ -I../ipc/ -DQTFS_SERVER + +libsocket.o: + cc $(CFLAGS) -c -o libsocket.o ../qtfs_common/libsocket.c -I../qtfs_common/ -I../include + +server_fifo.o: + cc $(CFLAGS) -c -o server_fifo.o server_fifo.c -I../include/ -I../qtfs_common/ + +user_engine.o: + cc $(CFLAGS) -c -o user_engine.o ../qtfs_common/user_engine.c $(DEPGLIB) -I../ -DQTFS_SERVER + +uds_event.o: + cc $(CFLAGS) -c -o uds_event.o ../ipc/uds_event.c -DQTFS_SERVER $(DEPGLIB) + +uds_main.o: + cc $(CFLAGS) -c -o uds_main.o ../ipc/uds_main.c -DQTFS_SERVER $(DEPGLIB) + +clean: + make -C $(KBUILD) M=$(PWD) clean + rm -rf engine fifo_server + rm -rf ../*.o + rm -rf $(COMMO) $(COMM).*.o.cmd diff --git a/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml b/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml deleted file mode 100644 index 803b5edef0a110d1bcf1516728eef1970f7db7ad..0000000000000000000000000000000000000000 --- a/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "qtfs_fifo_server" -version = "1.0.0" -edition = "2021" - -[dependencies] -tokio = { version = "1.29.1", features = ["full"]} -libc = "0.2" -rlimit = "0.10.1" \ No newline at end of file diff --git a/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs b/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs deleted file mode 100644 index db6805e7ac7ac3f684bdcd77113e9da59cd1daf9..0000000000000000000000000000000000000000 --- a/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs +++ /dev/null @@ -1,338 +0,0 @@ -/****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. - * qtfs licensed under the Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * http://license.coscl.org.cn/MulanPSL2 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR - * PURPOSE. - * See the Mulan PSL v2 for more details. - * Author: Liqiang - * Create: 2023-07-26 - * Description: - *******************************************************************************/ - -use tokio::net::TcpStream; -use std::mem; -use tokio::fs::File; -use tokio::fs; -use std::os::unix::fs::FileTypeExt; -use tokio::fs::OpenOptions; -use libc::{O_RDONLY, O_RDWR, O_WRONLY, O_ACCMODE}; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -#[derive(Debug, Clone, Copy)] -#[repr(C, packed)] -struct Qtreq { - // magic: [u8; 4], //magic: 0x5aa55aa5 - msgtype: u32, - error: u32, - seq_num: u64, - len: usize, -} - -const QTFS_REQ_OPEN: u32 = 2; -const QTFS_REQ_CLOSE: u32 = 3; -const QTFS_REQ_READ: u32 = 5; -const QTFS_REQ_WRITE: u32 = 6; -pub async fn qtfs_fifo_server(stream: TcpStream, idx: usize) { - let mut conn = Conn {stream}; - let mut head: Qtreq; - - match conn.qtfs_req_head(idx.clone()).await { - Ok(h) => head = h, - Err(e) => { - println!("Recv invalid head exit this proc :{}.", e); - return; - } - } - if head.msgtype != QTFS_REQ_OPEN { - println!("first msg type is invalid"); - return; - } - let file = match conn.qtfs_fifo_open(head.clone()).await { - Ok(f) => { - head.len = mem::size_of::(); - conn.req_head_ack(head).await; - conn.open_ack(0).await; - f - } - Err(e) => { - head.len = mem::size_of::(); - println!("Open fifo error:{}", e); - conn.req_head_ack(head).await; - conn.open_ack(1).await; - return; - } - }; - - 'main: loop { - let mut head: Qtreq; - match conn.qtfs_req_head(idx.clone()).await { - Ok(h) => head = h, - Err(e) => { - println!("head recv failed, {}.", e); - return; - } - } - match head.msgtype { - QTFS_REQ_OPEN => { - println!("Fifo is opened and recv open request again!"); - head.len = mem::size_of::(); - conn.req_head_ack(head).await; - conn.open_ack(1).await; - } - QTFS_REQ_CLOSE => { - println!("Close req idx:{}", idx.clone()); - head.len = 0; - conn.req_head_ack(head).await; - break 'main; - } - QTFS_REQ_READ => { - println!("Read req idx:{}", idx.clone()); - conn.qtfs_fifo_read(file.try_clone().await.unwrap(), head).await; - } - QTFS_REQ_WRITE => { - println!("Write req idx:{}", idx.clone()); - conn.qtfs_fifo_write(file.try_clone().await.unwrap(), head).await; - } - _ => { - println!("Recv invalid msg type"); - } - } - } - println!("Fifo server idx:{} is closed.", idx); -} - -#[derive(Debug, Clone, Copy)] -#[repr(C, packed)] -struct Qtreqopen { - flags: u32, - mode: u32, -} -#[repr(C, packed)] -struct Qtrspopen { - ret: i32, -} - -#[repr(C, packed)] -struct Qtreqread { - len: u64, -} - -#[repr(C, packed)] -struct Qtrspread { - errno: i32, - len: u64, -} - - - -#[repr(C, packed)] -struct Qtreqwrite { - len: u64, -} -#[repr(C, packed)] -struct Qtrspwrite { - errno: i32, - len: u64, -} - -struct Conn { - stream: TcpStream, -} - -impl Conn { - // sync head magic bytes sequence: 0x5a 0xa5 0x5a 0xa5 - // 逐字节读取magic,连续匹配的四个字节即视为同步包头 - async fn package_sync(&mut self) { - let mut byte: [u8; 1] = [0; 1]; - loop { - self.stream.read_exact(&mut byte).await.unwrap(); - if byte[0] != 0x5a {continue;} - self.stream.read_exact(&mut byte).await.unwrap(); - if byte[0] != 0xa5 {continue;} - self.stream.read_exact(&mut byte).await.unwrap(); - if byte[0] != 0x5a {continue;} - self.stream.read_exact(&mut byte).await.unwrap(); - if byte[0] != 0xa5 {continue;} - break; - } - } - - async fn send_magic_head(&mut self) { - const MAGIC: [u8; 4] = [0x5a, 0xa5, 0x5a, 0xa5]; - let _ = self.stream.write_all(&MAGIC[0..4]).await; - } - - async fn qtfs_req_head(&mut self, _idx: usize) -> Result { - const HEADSIZE: usize = mem::size_of::(); - self.package_sync().await; - let mut msghead = [0; HEADSIZE]; - self.stream.read_exact(&mut msghead).await?; - let head = Qtreq { - msgtype: u32::from_le_bytes(msghead[0..4].try_into().unwrap()), - error: u32::from_le_bytes(msghead[4..8].try_into().unwrap()), - seq_num: u64::from_le_bytes(msghead[8..16].try_into().unwrap()), - len: usize::from_le_bytes(msghead[16..16+mem::size_of::()].try_into().unwrap()), - }; - let reqtype: String = match head.msgtype { - QTFS_REQ_OPEN => String::from("Open"), - QTFS_REQ_CLOSE => String::from("Close"), - QTFS_REQ_READ => String::from("Read"), - QTFS_REQ_WRITE => String::from("Write"), - _ => String::from("Unknown"), - }; - println!("Recv new head type:{} msg:{:?}", reqtype, head); - Ok(head) - } - - async fn qtfs_fifo_open(&mut self, head: Qtreq) -> Result { - const HEADSIZE: usize = mem::size_of::(); - let mut openhead = [0; HEADSIZE]; - - if head.len >= 4096 + HEADSIZE { - println!("qtfs fifo len invalid"); - return Err(1); - } - self.stream.read_exact(&mut openhead).await.unwrap(); - let openhead1 = Qtreqopen { - flags: u32::from_le_bytes(openhead[0..4].try_into().unwrap()), - mode: u32::from_le_bytes(openhead[4..8].try_into().unwrap()), - }; - println!("open head:{:?}", openhead1); - let mut path = Vec::with_capacity(head.len - HEADSIZE); - path.resize(head.len - HEADSIZE, 0); - self.stream.read_exact(&mut path).await.unwrap(); - - let getstr = String::from_utf8(path).unwrap(); - let pathstr = getstr.trim_end_matches('\0').trim(); - match fs::metadata(pathstr.clone()).await { - Ok(meta) => { - if meta.file_type().is_fifo() == false { - println!("Requst path:{} not fifo!", pathstr); - return Err(1); - } - } - Err(_) => { - println!("path:{} check failed.", pathstr); - return Err(1); - } - }; - println!("Recv open path:{}", pathstr); - let file = OpenOptions::new() - .read((openhead1.flags as i32 & O_ACCMODE == O_RDONLY) || (openhead1.flags as i32 & O_ACCMODE == O_RDWR)) - .write((openhead1.flags as i32 & O_ACCMODE == O_WRONLY) || (openhead1.flags as i32 & O_ACCMODE == O_RDWR)) - .custom_flags(openhead1.flags as i32) - .open(pathstr).await.unwrap(); - - Ok(file) - } - - async fn qtfs_fifo_read(&mut self, mut file: File, mut reqhead: Qtreq) { - let mut head = [0; mem::size_of::()]; - self.stream.read_exact(&mut head).await.unwrap(); - let req = Qtreqread { - len: u64::from_le_bytes(head[0..8].try_into().unwrap()), - }; - let len = std::cmp::min(req.len, 4096); - - let mut rsp = Qtrspread { - errno: 0, - len: 0, - }; - - let mut buf = Vec::with_capacity(len.try_into().unwrap()); - buf.resize(len.try_into().unwrap(), 0); - - match file.read(&mut buf).await { - Ok(n) => { - rsp.len = n as u64; - let send = unsafe { - let ptr = &rsp as *const Qtrspread as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - println!("Read {} bytes from fifo", n.clone()); - reqhead.len = mem::size_of::(); - self.req_head_ack(reqhead).await; - self.stream.write_all(&send[..mem::size_of::()]).await.unwrap(); - let _ = self.stream.write_all(&buf[..n]).await.unwrap(); - } - Err(e) => { - rsp.errno = -1; - rsp.len = 0; - let send = unsafe { - let ptr = &rsp as *const Qtrspread as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - reqhead.len = mem::size_of::(); - self.req_head_ack(reqhead).await; - self.stream.write_all(&send[..mem::size_of::()]).await.unwrap(); - println!("Read from fifo error:{}", e); - } - } - } - - async fn qtfs_fifo_write(&mut self, mut file: File, mut reqhead: Qtreq) { - let mut whead = [0; mem::size_of::()]; - self.stream.read_exact(&mut whead).await.unwrap(); - let len = u64::from_le_bytes(whead[0..8].try_into().unwrap()); - - // 最大接收一次性写入4k - let len = std::cmp::min(len, 4096); - - let mut rsp = Qtrspwrite { - errno: 0, - len: 0, - }; - - let mut buf = Vec::with_capacity(len.try_into().unwrap()); - buf.resize(len.try_into().unwrap(), 0); - self.stream.read_exact(&mut buf).await.unwrap(); - - match file.write_all(&mut buf[..len as usize]).await { - Ok(_) => { - rsp.len = len as u64; - reqhead.len = mem::size_of::(); - self.req_head_ack(reqhead).await; - self.write_ack(rsp).await; - println!("Write fifo ok, send ack."); - } - Err(e) => { - rsp.len = 0; - reqhead.len = mem::size_of::(); - self.req_head_ack(reqhead).await; - self.write_ack(rsp).await; - println!("Write failed {}.", e); - } - } - } - - async fn req_head_ack(&mut self, head: Qtreq) { - let send = unsafe { - let ptr = &head as *const Qtreq as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - self.send_magic_head().await; - self.stream.write_all(&send[..mem::size_of::()]).await.expect("req head ack failed"); - } - - async fn open_ack(&mut self, retcode: i32) { - let rsp = Qtrspopen {ret: retcode,}; - let send = unsafe { - let ptr = &rsp as *const Qtrspopen as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - self.stream.write_all(&send[..mem::size_of::()]).await.expect("Response open failed"); - } - - async fn write_ack(&mut self, rsp: Qtrspwrite) { - let send = unsafe { - let ptr = &rsp as *const Qtrspwrite as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - self.stream.write_all(&send[..mem::size_of::()]).await.expect("Response write failed"); - } -} \ No newline at end of file diff --git a/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs b/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs deleted file mode 100644 index 4f8c0c4050d99ffbc81027dd8366d9f545d21031..0000000000000000000000000000000000000000 --- a/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs +++ /dev/null @@ -1,72 +0,0 @@ -/****************************************************************************** - * Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. - * qtfs licensed under the Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * http://license.coscl.org.cn/MulanPSL2 - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR - * PURPOSE. - * See the Mulan PSL v2 for more details. - * Author: Liqiang - * Create: 2023-07-26 - * Description: - *******************************************************************************/ - -use std::env; -use std::net::TcpListener; -use tokio::net::TcpListener as AsyncTcpListener; -use tokio::runtime::Builder; - -extern crate rlimit; -use rlimit::Resource; -use rlimit::setrlimit; -mod cofifo; - -async fn set_rlimit_fd(){ - let rlimit = Resource::NOFILE; - let fd_limit = 65536; - match setrlimit(rlimit, fd_limit, fd_limit) { - Ok(_) => {}, - Err(e) => println!("Set file rlimit to {} failed {}.", fd_limit, e), - } -} - -#[tokio::main] -async fn main() { - let args: Vec = env::args().collect(); - if args.len() != 3 { - let bin: String = args[0].trim().parse().expect("Binary name error"); - println!("Usage example:"); - println!(" {} 192.168.1.10:12310 10", bin); - return; - } - set_rlimit_fd().await; - let addr: String = args[1].trim().parse().expect("Input address: '192.168.1.10:12310'"); - let max_block_threads: usize = args[2].trim().parse().expect("Input max blocking threads number in arg 2: like '10'"); - let listener = TcpListener::bind(addr.clone()).unwrap(); - let async_listener = AsyncTcpListener::from_std(listener).unwrap(); - let runtime = Builder::new_multi_thread() - .max_blocking_threads(max_block_threads) - .enable_all() - .build() - .unwrap(); - - println!("Ready to listen addr:{}, max blocking threads:{}", addr, max_block_threads); - - let mut coroutine_idx: usize = 1; - loop { - let (s, _) = async_listener.accept().await.unwrap(); - let cur_idx = coroutine_idx.clone(); - coroutine_idx += 1; - match Some(s) { - Some(stream) => { - // 收到一个新的fifo连接请求,拉起新的协程处理函数 - runtime.spawn(cofifo::qtfs_fifo_server(stream, cur_idx)); - } - _ => { - eprintln!("Accept error!"); - } - } - } -} \ No newline at end of file diff --git a/qtfs/qtfs_server/server_fifo.c b/qtfs/qtfs_server/server_fifo.c new file mode 100644 index 0000000000000000000000000000000000000000..5a5b054d3be8cb3f01682bdd8c496d4891a498f8 --- /dev/null +++ b/qtfs/qtfs_server/server_fifo.c @@ -0,0 +1,711 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "req.h" +#include "log.h" +#include "libsocket.h" +#include "qtfs_fifo.h" + + +/* + 总体架构 + 主线程:epoll所有fifo的读或写, + open线程:被epoll线程创建,阻塞打开fifo,打开后将fd加入epoll监听列表后退出。 + + 关于线程资源回收:open线程拉起时被设置为PTHREAD_CREATE_DETACHED属性,不进行主动回收。 + + open线程正常打开fd后,自己退出;还未打开fd,主线程收到对端关闭管道消息时会直接调用pthread_cancel杀死。 + 此处有可能有低概率并发资源泄漏问题:open线程刚好打开了fd,还没加入主线程时被主线程杀死。 +*/ + +static int epollfd = -1; +static int sockfamily = AF_VSOCK; // 默认vsock +static pthread_mutex_t fifomutex = PTHREAD_MUTEX_INITIALIZER; +#define EPOLL_MAX_EVENT_NUMS 64 + +enum { + FIFO_RET_OK, + FIFO_RET_ERR, + FIFO_RET_DEL, // only delete myself + FIFO_RET_DEL_BOTH, // delete myself and peer + FIFO_RET_SUSPEND, // 将此fd的事件挂起,不删除fd,只从epoll中去掉监听 +}; + +enum { + FIFO_INV, // 初始无效 + FIFO_READ, + FIFO_WRITE, + FIFO_BLOCK, + FIFO_NONBLOCK, +}; + +enum { + FIFO_PEER_PRE, + FIFO_PEER_ADD, + FIFO_PEER_POST, +}; +// 主线程epoll关键数据结构 +struct fifo_event_t { + int fd; + struct fifo_event_t *peerevt; + + /* 触发时的操作函数 */ + int (*handler)(struct fifo_event_t *event); + // 仅在open阻塞状态有效,open完成后应该置空 + union { + void *priv; + int len; // valid read or write len + int peerfd; // priv fd + }; + unsigned long seq_num; + int block; // block fifo or nonblock +}; + +struct open_arg_t { + struct qtreq_fifo_open *req; + /* 此fifo对应在epoll主线程中的event结构, + 用于open成功后将fd加入main_event */ + struct fifo_event_t *main_evt; + /* open与主线程有竞争资源,因为open + 是临时的少量线程,放在open结构里少占用资源, + 只有在open线程中需要用锁,以及epoll线程中 + 在open状态的fd需要用锁,epoll线程非open状态 + 不需要加锁 */ + pthread_mutex_t mutex; + pthread_t *t; +}; + +static int fifo_rw_flags(unsigned int flags) +{ + if (flags & O_WRONLY) + return FIFO_WRITE; + return FIFO_READ; +} + +static int fifo_block_flags(unsigned int flags) +{ + if (flags & O_NONBLOCK) + return FIFO_NONBLOCK; + return FIFO_BLOCK; +} + +static int fifo_recv_with_timeout(int fd, char *msg, int len) +{ +#define TMOUT_BLOCK_SIZE 1024 +#define TMOUT_UNIT_MS 20 +#define TMOUT_INTERVAL 1 +#define TMOUT_MAX_MS 1000 + int total_recv = 0; + int ret; + int tmout_ms = ((len / TMOUT_BLOCK_SIZE) + 1) * TMOUT_UNIT_MS; + if (len <= 0 || msg == NULL || fd < 0) { + log_err("invalid param fd:%d len:%d or %s", fd, len, (msg == NULL) ? "msg is NULL" : "msg is not NULL"); + return 0; + } + if (tmout_ms > TMOUT_MAX_MS) + tmout_ms = TMOUT_MAX_MS; + do { + ret = recv(fd, &msg[total_recv], len - total_recv, 0); + if (ret < 0) { + log_err("recv failed ret:%d errno:%d", ret, errno); + return ret; + } + total_recv += ret; + if (total_recv > len) { + log_err("fatal error total recv:%d longger than target len:%d", total_recv, len); + return 0; + } + if (total_recv == len) { + return total_recv; + } + usleep(TMOUT_INTERVAL * 1000); + tmout_ms -= TMOUT_INTERVAL; + } while (tmout_ms > 0); + log_err("Fatal error, the target recv len:%d and only %d length is received when it time out", len, total_recv); + return 0; +} + +struct fifo_event_t *fifo_add_event(int fd, struct fifo_event_t *peerevt, int (*handler)(struct fifo_event_t *), void *priv, unsigned int events) +{ + struct epoll_event evt; + struct fifo_event_t *fifoevt = (struct fifo_event_t *)malloc(sizeof(struct fifo_event_t)); + if (fifoevt == NULL) { + log_err("failed to malloc event, fd:%d peer:%d errno:%d", fd, peerevt->fd, errno); + return NULL; + } + memset(fifoevt, 0, sizeof(struct fifo_event_t)); + fifoevt->fd = fd; + fifoevt->peerevt = peerevt; + fifoevt->handler = handler; + fifoevt->priv = priv; + evt.data.ptr = (void *)fifoevt; + evt.events = events; + if (-1 == epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &evt)) { + log_err("epoll add fd:%d peer:%d failed, errno:%d", fd, peerevt->fd, errno); + free(fifoevt); + return NULL; + } + return fifoevt; +} + +void fifo_del_event(struct fifo_event_t *evt) +{ + // close fd, 内核会回收epoll资源 + close(evt->fd); + free(evt); + return; +} + +void fifo_suspend_event(struct fifo_event_t *evt) +{ + struct epoll_event event; + event.data.ptr = (void *)evt; + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, evt->fd, &event) == -1) { + log_err("suspend event fd:%d failed, errno:%d", evt->fd, errno); + } + free(evt); + return; +} + +static int fifo_peer_index; +static struct fifo_event_t *fifo_peer_evt[EPOLL_MAX_EVENT_NUMS]; +static int fifo_del_peer(int flag, struct fifo_event_t *peer) +{ + switch (flag) { + case FIFO_PEER_PRE: + fifo_peer_index = 0; + memset(fifo_peer_evt, 0, sizeof(struct fifo_event_t *) * EPOLL_MAX_EVENT_NUMS); + break; + case FIFO_PEER_ADD: + fifo_peer_evt[fifo_peer_index] = peer; + break; + case FIFO_PEER_POST: + for (int i = 0; i < fifo_peer_index; i++) { + fifo_del_event(fifo_peer_evt[i]); + } + break; + default: + log_err("invalid flag:%d", flag); + break; + } + return 0; +} + +int fifo_mod_event(struct fifo_event_t *evt, unsigned int events) +{ + struct epoll_event event; + event.data.ptr = (void *)evt; + event.events = events; + if (-1 == epoll_ctl(epollfd, EPOLL_CTL_MOD, evt->fd, &event)) { + log_err("modify event fd:%d failed, errno:%d", evt->fd, errno); + return -1; + } + return 0; +} + +static void fifo_proc_ack(struct fifo_event_t *evt, int type, int sockfd, char *arg, int arglen) +{ + int ret; + struct qtreq rsp; + + rsp.type = type; + rsp.err = 0; + rsp.seq_num = evt->seq_num; + rsp.len = arglen; + + ret = write(sockfd, &rsp, sizeof(struct qtreq)); + if (ret < 0) { + log_err("fifo ack type:%d failed, sockfd:%d err:%d", type, sockfd, errno); + return; + } + ret = write(sockfd, arg, arglen); + if (ret < 0) { + log_err("fifo ack arg type:%d failed, sockfd:%d err:%d", type, sockfd, errno); + return; + } + log_info("Type:%d ack successed, sockfd:%d.", type, sockfd); + return; +} + + +int fifo_proc_unknown(struct fifo_event_t *evt) +{ + struct open_arg_t *oarg; + log_info("unknown read/write event fd:%d happend, open event not complete!", evt->fd); + // 这不是预期的事件,直接删除此事件,且关联删除open线程 + pthread_mutex_lock(&fifomutex); + if (evt->priv) { + oarg = (struct open_arg_t *)evt->priv; + pthread_cancel(*oarg->t); + free(oarg->t); + free(oarg->req); + free(oarg); + evt->priv = NULL; + } + pthread_mutex_unlock(&fifomutex); + return FIFO_RET_DEL; +} + +// reverse是处理异常事件,正常情况下不会接受反向事件, +// 如果有反向事件则说明是断连事件 +int fifo_proc_reverse(struct fifo_event_t *evt) +{ + log_info("reverse event happend."); + return FIFO_RET_OK; +} + +// 当读请求发生时,有可能阻塞,此时将fifo端加入监听,等到可读时 +// 再触发处理,不在主线程中阻塞读 +int fifo_proc_readable(struct fifo_event_t *evt) +{ + // 读完立即将自己置为EPOLLHUP,不连续读取 + int ret; + char *msg; + struct qtrsp_fifo_read *rsp; + int readlen = evt->len; + if (readlen > QTFS_REQ_MAX_LEN) { + log_err("Read rsp len:%d too large!", readlen); + ret = EINVAL; + goto err_ack; + } + + msg = (char *)malloc(readlen + sizeof(struct qtrsp_fifo_read)); + if (msg == NULL) { + log_err("malloc memory failed, errno:%d", errno); + ret = ENOMEM; + goto err_ack; + } + + rsp = (struct qtrsp_fifo_read *)msg; + ret = read(evt->fd, &msg[sizeof(struct qtrsp_fifo_read)], readlen); + if (ret <= 0) { + log_err("read from fifo:%d failed, readlen:%d, errno:%d", evt->fd, readlen, errno); + ret = errno; + free(msg); + goto err_ack; + } + rsp->err = 0; + rsp->len = ret; + fifo_proc_ack(evt, QTFS_REQ_READITER, evt->peerevt->fd, msg, ret + sizeof(struct qtrsp_fifo_read)); + + log_info("readable event fd:%d peerfd:%d, readfromfd:%s, errno:%d", evt->fd, evt->peerevt->fd, &msg[sizeof(struct qtrsp_fifo_read)], errno); + free(msg); + evt->peerevt->peerevt = NULL; + // 读完立即删除本监听,如果继续读后面再添加进来 + return FIFO_RET_SUSPEND; + +err_ack: + do { + struct qtrsp_fifo_read errrsp; + errrsp.err = ret; + errrsp.len = 0; + fifo_proc_ack(evt, QTFS_REQ_READITER, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp)); + } while (0); + evt->peerevt->peerevt = NULL; + return FIFO_RET_SUSPEND; +} + +int fifo_proc_writeable(struct fifo_event_t *evt) +{ + // 写完立即将自己置为EPOLLHUP,不连续写 + int ret; + char *msg; + struct qtrsp_fifo_write rsp; + int writelen = evt->len; + if (writelen > QTFS_REQ_MAX_LEN) { + log_err("Read rsp len:%d too large!", writelen); + ret = EINVAL; + goto err_ack; + } + msg = (char *)malloc(writelen + sizeof(struct qtrsp_fifo_write)); + if (msg == NULL) { + log_err("malloc memory failed, errno:%d", errno); + ret = errno; + goto err_ack; + } + ret = fifo_recv_with_timeout(evt->peerevt->fd, msg, evt->len); + if (ret <= 0) { + log_err("recv socket write failed, fd:%d peer:%d, errno:%d.", evt->peerevt->fd, evt->fd, errno); + // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + ret = errno; + free(msg); + goto err_ack; + } + ret = write(evt->fd, msg, ret); + if (ret <= 0) { + log_err("write to fifo failed, ret:%d errno:%d", ret, errno); + ret = errno; + free(msg); + goto err_ack; + } + rsp.err = 0; + rsp.len = ret; + fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->peerevt->fd, (char *)&rsp, sizeof(struct qtrsp_fifo_write)); + + log_info("writeable event fd:%d peerfd:%d, writelen:%lu, errno:%d", evt->fd, evt->peerevt->fd, rsp.len, errno); + free(msg); + evt->peerevt->peerevt = NULL; + return FIFO_RET_SUSPEND; + +err_ack: + do { + struct qtrsp_fifo_write errrsp; + errrsp.err = ret; + errrsp.len = 0; + fifo_proc_ack(evt, QTFS_REQ_WRITE, evt->peerevt->fd, (char *)&errrsp, sizeof(errrsp)); + } while (0); + return FIFO_RET_SUSPEND; +} + +// 处理读请求,读可能阻塞,因为打开时已经确定是否阻塞型, +// 这里直接将peer改成监听状态去等待触发 +int fifo_proc_read_req(struct fifo_event_t *evt) +{ + struct qtreq_fifo_read req; + int ret; + ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req)); + if (ret <= 0) { + log_err("recv fifo read head failed, errno:%d.", errno); + // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + if (evt->peerevt == NULL) { + close(evt->peerfd); + return FIFO_RET_DEL; + } + // 如果peerevt非空则要同时删除peer事件 + return FIFO_RET_DEL_BOTH; + } + log_info("recv read req len:%d", req.len); + if (evt->block == FIFO_NONBLOCK) { + struct fifo_event_t rd; + rd.fd = evt->peerfd; + rd.peerevt = evt; + rd.len = req.len; + rd.seq_num = evt->seq_num; + fifo_proc_readable(&rd); + return FIFO_RET_OK; + } + + // if fifo is block, dont block on main thread + struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_readable, NULL, EPOLLIN); + if (newevt == NULL) { + log_err("add readable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); + return FIFO_RET_ERR; + } + evt->peerevt = newevt; + newevt->len = req.len; + newevt->seq_num = evt->seq_num; + + return FIFO_RET_OK; +} + +// 写 +int fifo_proc_write_req(struct fifo_event_t *evt) +{ + struct qtreq_fifo_write req; + int ret; + ret = fifo_recv_with_timeout(evt->fd, (char *)&req, sizeof(req)); + if (ret <= 0) { + log_err("recv fifo write head failed, errno:%d.", errno); + // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + if (evt->peerevt == NULL) { + close(evt->peerfd); + return FIFO_RET_DEL; + } + // 如果peerevt非空则要同时删除peer事件 + return FIFO_RET_DEL_BOTH; + } + log_info("recv write req len:%d", req.len); + if (evt->block == FIFO_NONBLOCK) { + struct fifo_event_t wr; + wr.fd = evt->peerfd; + wr.peerevt = evt; + wr.len = req.len; + wr.seq_num = evt->seq_num; + fifo_proc_writeable(&wr); + return FIFO_RET_OK; + } + // if fifo is block, dont block on main thread + struct fifo_event_t *newevt = fifo_add_event(evt->peerfd, evt, fifo_proc_writeable, NULL, EPOLLOUT); + if (newevt == NULL) { + log_err("add writeable event failed, fd:%d socketfd:%d", evt->peerfd, evt->fd); + return FIFO_RET_ERR; + } + evt->peerevt = newevt; + newevt->len = req.len; + newevt->seq_num = evt->seq_num; + + return FIFO_RET_OK; +} + +// read/write/close req +int fifo_proc_new_req(struct fifo_event_t *evt) +{ + struct qtreq head; + int ret; + ret = fifo_recv_with_timeout(evt->fd, (char *)&head, sizeof(struct qtreq)); + if (ret <= 0) { + log_err("recv qtreq head failed, errno:%d.", errno); + // 主线程是串行的,peerevt如果是空,则没有readable监听,直接close peerfd即可 + if (evt->peerevt == NULL) { + close(evt->peerfd); + return FIFO_RET_DEL; + } + // 如果peerevt非空则要同时删除peer事件 + return FIFO_RET_DEL_BOTH; + } + switch (head.type) { + case QTFS_REQ_CLOSE: + log_info("recv close req, close tcp fd:%d fifofd:%d", evt->fd, evt->peerfd); + evt->seq_num = head.seq_num; + fifo_proc_ack(evt, QTFS_REQ_CLOSE, evt->fd, NULL, 0); + if (evt->peerevt == NULL) { + close(evt->peerfd); + return FIFO_RET_DEL; + } + // 如果peerevt非空则要同时删除peer事件 + return FIFO_RET_DEL_BOTH; + + case QTFS_REQ_READITER: + log_info("recv readiter req, fd:%d peerfd:%d", evt->fd, evt->peerfd); + evt->seq_num = head.seq_num; + return fifo_proc_read_req(evt); + + case QTFS_REQ_WRITE: + log_info("recv write req, fd:%d peerfd:%d", evt->fd, evt->peerfd); + evt->seq_num = head.seq_num; + return fifo_proc_write_req(evt); + + default: + log_info("recv invalid req:%u fd:%d peerfd:%d", head.type, evt->fd, evt->peerfd); + break; + } + + return FIFO_RET_ERR; +} + +void *fifo_open_thread(void *arg) +{ + int fd; + struct open_arg_t *oarg = (struct open_arg_t *)arg; + int rw; + int err = 0; + struct fifo_event_t *newevt; + struct qtrsp_fifo_open rsp = {.err = 0}; + + fd = open(oarg->req->path, oarg->req->flags, oarg->req->mode); + if (fd < 0) { + log_err("open fifo:%s failed, fd:%d errno:%d", oarg->req->path, fd, errno); + goto err_end; + } + rw = fifo_rw_flags(oarg->req->flags); + log_info("Recv open request fifo:%s flags:%x mode:%x rw:%d", oarg->req->path, oarg->req->flags, oarg->req->mode, rw); + + // read和write,代表的是向server端fifofd的操作方向,在初始状态,本 + // 代理不应该主动,只监听挂断事件,在通信对端发来read/write消息才 + // 改为监听可读/可写状态并进行实际读写。 + pthread_mutex_lock(&fifomutex); + if (rw == FIFO_READ) { + oarg->main_evt->peerevt = NULL; + oarg->main_evt->peerfd = fd; + oarg->main_evt->handler = fifo_proc_new_req; + } else { + oarg->main_evt->peerevt = NULL; + oarg->main_evt->handler = fifo_proc_new_req; + oarg->main_evt->peerfd = fd; + } + oarg->main_evt->block = fifo_block_flags(oarg->req->flags); + + // 按理说每个fifo的链接只有自己串行使用,不需要考虑两个线程竞争 + fifo_proc_ack(oarg->main_evt, QTFS_REQ_OPEN, oarg->main_evt->fd, (char *)&rsp, sizeof(rsp)); + + goto end; + +err_end: + rsp.err = errno; + fifo_proc_ack(oarg->main_evt, QTFS_REQ_OPEN, oarg->main_evt->fd, (char *)&rsp, sizeof(rsp)); + +end: + free(oarg->t); + free(oarg->req); + free(oarg); + pthread_mutex_unlock(&fifomutex); + return NULL; +} + +int fifo_proc_open_req(struct fifo_event_t *evt) +{ + struct open_arg_t *oarg; + struct qtreq_fifo_open *req; + struct qtreq head; + pthread_t *t; + pthread_attr_t attr; + int ret; + ret = fifo_recv_with_timeout(evt->fd, (char *)&head, sizeof(head)); + if (ret <= 0) { + log_err("recv open head failed."); + return FIFO_RET_ERR; + } + log_info("recv head type:%u seq:%lu len:%d", head.type, head.seq_num, head.len); + if (head.len > sizeof(struct qtreq_fifo_open)) { + log_err("open head len:%d is too big", head.len); + return FIFO_RET_ERR; + } + // 按需申请path长度 + req = (struct qtreq_fifo_open *)malloc(head.len + 1); + if (req == NULL) { + // todo: 既然没有成功,要清理掉后面的消息体 + log_err("alloc memory failed, errno:%d", errno); + return FIFO_RET_ERR; + } + oarg = (struct open_arg_t *)malloc(sizeof(struct open_arg_t)); + if (oarg == NULL) { + log_err("alloc open arg memory failed, errno:%d", errno); + free(req); + return FIFO_RET_ERR; + } + memset(req, 0, head.len + 1); + ret = fifo_recv_with_timeout(evt->fd, (char *)req, head.len); + if (ret <= 0) { + log_err("recv req failed."); + free(req); + free(oarg); + return FIFO_RET_ERR; + } + pthread_mutex_init(&oarg->mutex, NULL); + oarg->main_evt = evt; + oarg->req = req; + evt->seq_num = head.seq_num; + + // create new open thread + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + t = (pthread_t *)malloc(sizeof(pthread_t)); + if (t == NULL) { + log_err("alloc memory failed, errno:%d", errno); + free(req); + free(oarg); + return FIFO_RET_ERR; + } + pthread_create(t, &attr, fifo_open_thread, oarg); + oarg->t = t; + + // 临时状态机,暂时不知道是读是写 + evt->priv = oarg; + evt->handler = fifo_proc_unknown; + + log_info("Start new fifo open thread head:%u, len:%d", head.type, head.len); + return FIFO_RET_OK; +} + +// mainsock 仅处理新连接建联,添加事件, +// 单线程不能阻塞等消息,必须尽快让出线程使用权 +int fifo_proc_main_sock(struct fifo_event_t *evt) +{ + int ret; + struct qtreq headreq; + int connfd = libsock_accept(evt->fd, sockfamily); + if (connfd < 0) { + log_err("accept new connection failed, ret:%d errno:%d", connfd, errno); + return FIFO_RET_ERR; + } + // 新建联肯定是open请求了 + if (fifo_add_event(connfd, NULL, fifo_proc_open_req, NULL, EPOLLIN|EPOLLHUP) < 0) { + log_err("add new connection event failed."); + return FIFO_RET_ERR; + } + + return FIFO_RET_OK; +} + +void *fifo_server_main_thread(void *arg) +{ + int indx = 0; + + int sockfd; + struct fifo_server_arg_t *parg = (struct fifo_server_arg_t *)arg; + struct epoll_event *evts = NULL; + + // init socket server + if (parg->family == AF_INET) { + sockfd = libsock_build_inet_connection(parg->addr, parg->port, LIBSOCK_SERVER); + } else { + sockfd = libsock_build_vsock_connection(parg->cid, parg->port, LIBSOCK_SERVER); + } + if (sockfd < 0) { + log_err("fifo server main thread start failed, please check input argument!"); + return NULL; + } + sockfamily = parg->family; + + // create epoll + epollfd = epoll_create1(0); + if (epollfd == -1) { + log_err("fifo server epoll create failed, errno:%d", errno); + goto epoll_create_err; + } + evts = calloc(EPOLL_MAX_EVENT_NUMS, sizeof(struct fifo_event_t)); + if (evts == NULL) { + log_err("fifo server calloc events failed, errno:%d", errno); + goto evts_calloc_err; + } + + fifo_add_event(sockfd, NULL, fifo_proc_main_sock, NULL, EPOLLIN); + + while (1) { + int ret; + struct fifo_event_t *event; + int n = epoll_wait(epollfd, evts, EPOLL_MAX_EVENT_NUMS, 1000); + if (n == 0) + continue; + if (n < 0) { + log_err("epoll wait err:%d, errno:%d", n, errno); + continue; + } + fifo_del_peer(FIFO_PEER_PRE, NULL); + for (int i = 0; i < n; i ++) { + event = (struct fifo_event_t *)evts[i].data.ptr; + log_info("new event fd:%d events:0x%x", event->fd, evts[i].events); + ret = event->handler(event); + if (ret == FIFO_RET_SUSPEND) { + fifo_suspend_event(event); + } else if (ret == FIFO_RET_DEL) { + fifo_del_event(event); + } else if (ret == FIFO_RET_DEL_BOTH) { + fifo_del_peer(FIFO_PEER_ADD, event->peerevt); + fifo_del_event(event); + } + } + fifo_del_peer(FIFO_PEER_POST, NULL); + } + + return NULL; + +evts_calloc_err: + close(epollfd); + epollfd = -1; +epoll_create_err: + close(sockfd); + log_err("fifo server error exit."); + return NULL; +}