diff --git a/qtfs/README.md b/qtfs/README.md index dbf400ebf7b5d6f895724d2a396e43e1d2beb46a..2e316bda45a29976ac410c200b81b0dd5fef3201 100644 --- a/qtfs/README.md +++ b/qtfs/README.md @@ -34,7 +34,8 @@ qtfs的特性: ### VSOCK通信模式 -选择host-vm或同一台host上的vm-vm作为qtfs的client与server进行测试,通信通道为vsock: +如有DPU硬件支持通过vsock与host通信,可选择此方法。 +如果没有硬件,也可以选择host-vm作为qtfs的client与server进行模拟测试,通信通道为vsock: 1. 启动vm时为vm配置vsock通道,vm可参考如下配置,将vsock段加在devices配置内: ``` diff --git a/qtfs/include/conn.h b/qtfs/include/conn.h index 43534ef1b4fd17676be2f9dfe09c58601d803f79..ff43086488523127e292161a967e224da65fb59f 100644 --- a/qtfs/include/conn.h +++ b/qtfs/include/conn.h @@ -114,8 +114,6 @@ struct qtfs_sock_var_s { unsigned int vm_port; unsigned int vm_cid; #endif - struct msghdr msg_recv; - struct msghdr msg_send; }; struct qtfs_conn_ops_s { @@ -131,6 +129,8 @@ struct qtfs_conn_ops_s { int (*conn_new_connection)(void *connvar, qtfs_conn_type_e type); int (*conn_send)(void *connvar, void *buf, size_t len); int (*conn_recv)(void *connvar, void *buf, size_t len, bool block); + int (*conn_send_iter)(void *connvar, struct iov_iter *iov); + int (*conn_recv_iter)(void *connvar, struct iov_iter *iov, bool block); bool (*conn_inited)(void *connvar, qtfs_conn_type_e type); bool (*conn_connected)(void *connvar); void (*conn_recv_buff_drop)(void *connvar); @@ -169,6 +169,10 @@ struct qtfs_conn_var_s { unsigned int send_max; struct kvec vec_recv; struct kvec vec_send; + struct iov_iter *iov_recv; // for non-copy + struct iov_iter *iov_send; // for non-copy + unsigned int magic_send; + unsigned int magic_recv; }; struct qtfs_wl_cap { @@ -192,13 +196,18 @@ void qtfs_conn_var_fini(struct qtfs_conn_var_s *pvar); void qtfs_conn_msg_clear(struct qtfs_conn_var_s *pvar); void *qtfs_conn_msg_buf(struct qtfs_conn_var_s *pvar, int dir); -void qtfs_conn_param_init(void); +int qtfs_conn_param_init(void); void qtfs_conn_param_fini(void); void qtfs_conn_put_param(struct qtfs_conn_var_s *pvar); struct qtfs_conn_var_s *qtfs_epoll_establish_conn(void); void qtfs_epoll_cut_conn(struct qtfs_conn_var_s *pvar); +#ifdef QTFS_CLIENT +struct qtfs_conn_var_s *qtfs_fifo_get_param(void); +void qtfs_fifo_put_param(struct qtfs_conn_var_s *pvar); +#endif + int qtfs_sm_active(struct qtfs_conn_var_s *pvar); int qtfs_sm_reconnect(struct qtfs_conn_var_s *pvar); int qtfs_sm_exit(struct qtfs_conn_var_s *pvar); diff --git a/qtfs/include/req.h b/qtfs/include/req.h index bbc7718e526570eb26c43537dbd2a79dea00a574..70a6bf7fdde971cf288a3bf2535af65d9a09e1d3 100644 --- a/qtfs/include/req.h +++ b/qtfs/include/req.h @@ -131,6 +131,11 @@ struct qtreq { #define QTFS_MSG_LEN sizeof(struct qtreq) + QTFS_REQ_MAX_LEN #define QTFS_MSG_HEAD_LEN sizeof(struct qtreq) +#define QTFS_REQ_MAGIC 0xa55aa55a + +#define QTFS_FIFO_HEAD_LEN 32 // fifo只用很少的额外头,32应该足够了 +#define QTFS_FIFO_REQ_LEN (QTFS_MSG_HEAD_LEN + QTFS_FIFO_HEAD_LEN) + struct qtreq_ioctl { struct qtreq_ioctl_len { unsigned int cmd; diff --git a/qtfs/qtfs/Makefile b/qtfs/qtfs/Makefile index ddb7d0ff9e07bf0655bb0c7744963a1636186462..9b3ebe5586b3ce1c28dcfc87225a792bdd16dd81 100644 --- a/qtfs/qtfs/Makefile +++ b/qtfs/qtfs/Makefile @@ -9,7 +9,7 @@ COMM=../qtfs_common/ COMMO=$(COMM)/conn.o $(COMM)/misc.o $(COMM)/symbol_wrapper.o $(COMM)/socket.o obj-m:=qtfs.o -qtfs-objs:=qtfs-mod.o sb.o syscall.o xattr.o proc.o miss.o $(COMMO) ../utils/utils.o +qtfs-objs:=qtfs-mod.o sb.o syscall.o xattr.o proc.o miss.o fifo.o $(COMMO) ../utils/utils.o all: qtfs diff --git a/qtfs/qtfs/fifo.c b/qtfs/qtfs/fifo.c new file mode 100644 index 0000000000000000000000000000000000000000..424bae964dbcb04005fa661b81eda56b1f0a19a8 --- /dev/null +++ b/qtfs/qtfs/fifo.c @@ -0,0 +1,221 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 and + * only version 2 as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "conn.h" +#include "qtfs-mod.h" +#include "req.h" +#include "log.h" + +// 对接rust,长度对齐1字节 +#pragma pack(1) +struct qtreq_fifo_open { + u64 flags; + u32 mode; + char path[MAX_PATH_LEN]; +}; + +struct qtrsp_fifo_open { + s32 errno; +}; + +struct qtreq_fifo_read { + u64 len; +}; + +struct qtrsp_fifo_read { + s32 errno; // same as kernel errcode, 0 is ok, < 0 is errcode + u64 len; +}; + +struct qtreq_fifo_write { + u64 len; +}; + +struct qtrsp_fifo_write { + s32 errno; + u64 len; +}; + +struct qtreq_fifo_close { + // nothing +}; + +struct qtrsp_fifo_close { + // nothing +}; +#pragma pack() + +static void qtfs_fifo_put_file(struct file *file) +{ + struct qtfs_conn_var_s *pvar = file->private_data; + if (pvar == NULL) { + qtfs_err("fifo private data invalid to put"); + return; + } + qtfs_fifo_put_param(pvar); + file->private_data = NULL; + return; +} + +int qtfs_fifo_open(struct inode *inode, struct file *file) +{ + struct kvec vec_save; + unsigned int sendmax_save; + struct qtreq *req; + struct qtreq_fifo_open *fiforeq; + struct qtrsp_fifo_open *rsp; + struct qtfs_conn_var_s *pvar = NULL; + int ret; + + req = (struct qtreq *)kmalloc(sizeof(struct qtreq) + sizeof(struct qtreq_fifo_open), GFP_KERNEL); + if (req == NULL) { + qtfs_err("get fifo open memory failed."); + return -ENOMEM; + } + memset(req, 0, sizeof(struct qtreq) + sizeof(struct qtreq_fifo_open)); + + pvar = qtfs_fifo_get_param(); + if (pvar == NULL) { + qtfs_err("fifo get param failed."); + kfree(req); + return -EINVAL; + } + fiforeq = (struct qtreq_fifo_open *)req->data; + + if (qtfs_fullname(fiforeq->path, file->f_path.dentry, sizeof(fiforeq->path)) < 0) { + qtfs_err("qtfs fifo fullname failed"); + kfree(req); + qtfs_fifo_put_param(pvar); + return -EINVAL; + } + + fiforeq->flags = file->f_flags; + fiforeq->mode = file->f_mode; + qtfs_err("fifo open path:%s size req:%u size open:%u, flags:%lu mode%u", + fiforeq->path, sizeof(struct qtreq), QTFS_SEND_SIZE(struct qtreq_fifo_open, fiforeq->path), + fiforeq->flags, fiforeq->mode); + vec_save = pvar->vec_send; + sendmax_save = pvar->send_max; + pvar->vec_send.iov_base = req; + pvar->send_max = sizeof(struct qtreq) + sizeof(struct qtreq_fifo_open); + rsp = qtfs_remote_run(pvar, QTFS_REQ_OPEN, QTFS_SEND_SIZE(struct qtreq_fifo_open, fiforeq->path)); + pvar->vec_send = vec_save; + pvar->send_max = sendmax_save; + + if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { + ret = IS_ERR_OR_NULL(rsp) ? -EFAULT : -rsp->errno; + qtfs_fifo_put_param(pvar); + qtfs_err("qtfs fifo open :%s failed mode:%o flag:%x", fiforeq->path, fiforeq->mode, fiforeq->flags); + kfree(req); + return ret; + } + kfree(req); + WARN_ON(file->private_data); + file->private_data = pvar; + return 0; +} + +ssize_t qtfs_fifo_readiter(struct kiocb *kio, struct iov_iter *iov) +{ + struct qtfs_conn_var_s *pvar = kio->ki_filp->private_data; + struct qtreq_fifo_read *req; + struct qtrsp_fifo_read *rsp; + int total = 0; + int ret; + + if (pvar == NULL || !virt_addr_valid(pvar)) { + qtfs_err("invalid fifo read req, private data is invalid"); + return -EFAULT; + } + req = pvar->conn_ops->get_conn_msg_buf(pvar, QTFS_SEND); + req->len = iov_iter_count(iov); + pvar->vec_recv.iov_len = QTFS_MSG_HEAD_LEN + sizeof(struct qtrsp_fifo_read); + rsp = qtfs_remote_run(pvar, QTFS_REQ_READITER, sizeof(struct qtreq_fifo_read)); + if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { + qtfs_err("remote run failed. or errno:%d", (rsp == NULL) ? -1 : rsp->errno); + return -EFAULT; + } + + while (total < rsp->len) { + ret = pvar->conn_ops->conn_recv_iter(&pvar->conn_var, iov, false); + if (ret <= 0) { + qtfs_err("recv iter from conn module ret:%d", ret); + break; + } + total += ret; + } + return total; +} + +ssize_t qtfs_fifo_writeiter(struct kiocb *kio, struct iov_iter *iov) +{ + struct qtfs_conn_var_s *pvar = kio->ki_filp->private_data; + struct qtreq_fifo_write *req; + struct qtrsp_fifo_write *rsp; + int total = 0; + + if (pvar == NULL || !virt_addr_valid(pvar)) { + qtfs_err("invalid fifo write req, private data is invalid"); + return -EFAULT; + } + req = pvar->conn_ops->get_conn_msg_buf(pvar, QTFS_SEND); + req->len = iov_iter_count(iov); + pvar->vec_recv.iov_len = QTFS_MSG_HEAD_LEN + sizeof(struct qtrsp_fifo_write); + pvar->iov_send = iov; + rsp = qtfs_remote_run(pvar, QTFS_REQ_WRITE, sizeof(struct qtreq_fifo_write)); + if (IS_ERR_OR_NULL(rsp) || rsp->errno != 0) { + qtfs_err("fifo write remote run failed, or errno:%d", (rsp == NULL) ? -1 : rsp->errno); + return -EFAULT; + } + return rsp->len; +} + +int qtfs_fifo_release(struct inode *inode, struct file *file) +{ + struct qtfs_conn_var_s *pvar = file->private_data; + struct qtrsp_fifo_close *rsp = NULL; + if (pvar == NULL) { + qtfs_err("invalid fifo write req, private data is invalid"); + return -EFAULT; + } + pvar->vec_recv.iov_len = QTFS_MSG_HEAD_LEN; + rsp = qtfs_remote_run(pvar, QTFS_REQ_CLOSE, 0); + if (IS_ERR_OR_NULL(rsp)) { + qtfs_err("fifo close failed"); + } + qtfs_fifo_put_file(file); + return 0; +} + +static __poll_t +qtfs_fifo_poll(struct file *filp, poll_table *wait) +{ + return 0; +} + +struct file_operations qtfsfifo_ops = { + .read_iter = qtfs_fifo_readiter, + .write_iter = qtfs_fifo_writeiter, + .open = qtfs_fifo_open, + .release = qtfs_fifo_release, + .llseek = no_llseek, + .poll = qtfs_fifo_poll, +}; \ No newline at end of file diff --git a/qtfs/qtfs/ops.h b/qtfs/qtfs/ops.h index bdb2628001e9976df3a050ffc0fbb4f8e0505da0..7493d3d33212d448319bba232d0357d60c5df7a0 100644 --- a/qtfs/qtfs/ops.h +++ b/qtfs/qtfs/ops.h @@ -22,6 +22,7 @@ extern struct inode_operations qtfs_proc_inode_ops; extern struct file_operations qtfs_proc_file_ops; extern struct inode_operations qtfs_proc_sym_ops; +extern struct file_operations qtfsfifo_ops; enum qtfs_type qtfs_get_type(char *str); bool is_sb_proc(struct super_block *sb); @@ -37,5 +38,6 @@ int qtfs_getattr(struct user_namespace *mnt_userns, const struct path *, struct int qtfs_getattr(const struct path *, struct kstat *, u32, unsigned int); #endif struct dentry * qtfs_lookup(struct inode *, struct dentry *, unsigned int); +__poll_t qtfsfifo_poll(struct file *filp, poll_table *wait); #endif diff --git a/qtfs/qtfs/qtfs-mod.c b/qtfs/qtfs/qtfs-mod.c index e2648cd24880c14a0a5e66c55a8bcf2e1a2ef2c1..f2e1670d150b89dfa312f056cdb191e492de9294 100644 --- a/qtfs/qtfs/qtfs-mod.c +++ b/qtfs/qtfs/qtfs-mod.c @@ -53,6 +53,7 @@ void *qtfs_remote_run(struct qtfs_conn_var_s *pvar, unsigned int type, unsigned pvar->seq_num++; req->type = type; req->len = len; + req->err = 0; req->seq_num = pvar->seq_num; pvar->conn_ops->conn_recv_buff_drop(&pvar->conn_var); @@ -62,13 +63,10 @@ void *qtfs_remote_run(struct qtfs_conn_var_s *pvar, unsigned int type, unsigned // 给server发一个消息 pvar->vec_send.iov_len = QTFS_MSG_HEAD_LEN + len; ret = qtfs_conn_send(pvar); - if (ret == -EPIPE) { - qtfs_err("qtfs remote run thread:%d send get EPIPE, try reconnect.", pvar->cur_threadidx); - qtfs_sm_reconnect(pvar); - } if (ret <= 0) { qtfs_err("qtfs remote run send failed, ret:%d pvar sendlen:%lu.", ret, pvar->vec_send.iov_len); qtinfo_senderrinc(req->type); + return NULL; } qtinfo_sendinc(type); @@ -221,6 +219,10 @@ static int __init qtfs_init(void) ret = -ENOMEM; goto cache_create_err; } + if (qtfs_conn_param_init() < 0) { + ret = -ENOMEM; + goto conn_init_err; + } qtfs_whitelist_initset(); qtfs_conn_param_init(); g_qtfs_epoll_thread = kthread_run(qtfs_epoll_thread, NULL, "qtfs_epoll"); @@ -267,6 +269,8 @@ diag_malloc_err: kthread_stop(g_qtfs_epoll_thread); epoll_thread_err: qtfs_conn_param_fini(); +conn_init_err: + kmem_cache_destroy(qtfs_inode_priv_cache); cache_create_err: unregister_filesystem(&qtfs_fs_type); return ret; diff --git a/qtfs/qtfs/sb.c b/qtfs/qtfs/sb.c index c967cc0c25c4e0401ee83b0db5e02fb92d676202..2ccb48a04fface8b3ca4d08ddbab8a337e335cb2 100644 --- a/qtfs/qtfs/sb.c +++ b/qtfs/qtfs/sb.c @@ -711,58 +711,6 @@ static struct file_operations qtfs_dir_ops = { .read = qtfs_dir_read_dir, }; -static __poll_t -qtfsfifo_poll(struct file *filp, poll_table *wait) -{ - struct qtfs_inode_priv *priv = filp->f_inode->i_private; - __poll_t mask = 0; - struct list_head *p; - struct qtfs_conn_var_s *pvar; - struct qtreq_poll *req; - struct qtrsp_poll *rsp; - struct private_data *fpriv = (struct private_data *)filp->private_data; - - poll_wait(filp, &priv->readq, wait); - - p = &priv->readq.head; - - if (fpriv->fd < 0) { - qtfs_err("fifo poll priv file invalid."); - return 0; - } - pvar = qtfs_conn_get_param(); - if (pvar == NULL) { - qtfs_err("qtfs fifo poll get param failed."); - return 0; - } - req = pvar->conn_ops->get_conn_msg_buf(pvar, QTFS_SEND); - req->fd = fpriv->fd; - rsp = qtfs_remote_run(pvar, QTFS_REQ_FIFOPOLL, sizeof(struct qtreq_poll)); - if (IS_ERR_OR_NULL(rsp)) { - qtfs_conn_put_param(pvar); - return 0; - } - if (rsp->ret == QTFS_ERR) { - qtfs_err("qtfs fifo poll remote run error."); - qtfs_conn_put_param(pvar); - return 0; - } - mask = rsp->mask; - - qtfs_info("fifo poll success mask:%x", mask); - qtfs_conn_put_param(pvar); - return mask; -} - -struct file_operations qtfsfifo_ops = { - .read_iter = qtfs_readiter, - .write_iter = qtfs_writeiter, - .open = qtfs_open, - .release = qtfs_release, - .llseek = no_llseek, - .poll = qtfsfifo_poll, -}; - static struct file_operations qtfs_file_ops = { .read_iter = qtfs_readiter, .write_iter = qtfs_writeiter, @@ -772,10 +720,8 @@ static struct file_operations qtfs_file_ops = { .llseek = qtfs_llseek, .fsync = qtfs_fsync, .unlocked_ioctl = qtfs_ioctl, - .poll = qtfsfifo_poll, }; - static int qtfs_readpage(struct file *file, struct page *page) { void *kaddr = NULL; @@ -1342,6 +1288,7 @@ int qtfs_getattr(const struct path *path, struct kstat *stat, u32 req_mask, unsi req->query_flags = flags; mnt_path = qtfs_mountpoint_path_init(path->dentry, (struct path*)path, req->path); if (IS_ERR(mnt_path)) { + qtfs_conn_put_param(pvar); return PTR_ERR(mnt_path); } rsp = qtfs_remote_run(pvar, QTFS_REQ_GETATTR, QTFS_SEND_SIZE(struct qtreq_getattr, req->path)); @@ -1672,7 +1619,7 @@ struct dentry *qtfs_fs_mount(struct file_system_type *fs_type, strlcpy(req->path, dev_name, PATH_MAX); rsp = qtfs_remote_run(pvar, QTFS_REQ_MOUNT, strlen(dev_name)); if (IS_ERR_OR_NULL(rsp) || rsp->ret != QTFS_OK) { - errno = rsp->errno; + errno = IS_ERR_OR_NULL(rsp) ? -EFAULT : rsp->errno; qtfs_err("qtfs fs mount failed, path:<%s> errno:%d.\n", dev_name, errno); qtfs_conn_put_param(pvar); return (IS_ERR_VALUE((long)errno)) ? ERR_PTR(errno) : ERR_PTR(-EFAULT); diff --git a/qtfs/qtfs_common/conn.c b/qtfs/qtfs_common/conn.c index cc59d5256e58ebfe845412ec985901778ba68c9f..da576d89f32e32d1bfce70268160fb836a0c38f5 100644 --- a/qtfs/qtfs_common/conn.c +++ b/qtfs/qtfs_common/conn.c @@ -36,13 +36,18 @@ static atomic_t g_qtfs_conn_num; static struct list_head g_vld_lst; static struct list_head g_busy_lst; static struct llist_head g_lazy_put_llst; +static struct list_head g_fifo_lst; static struct mutex g_param_mutex; +static struct mutex g_fifo_mutex; int qtfs_mod_exiting = false; struct qtfs_conn_var_s *qtfs_thread_var[QTFS_MAX_THREADS] = {NULL}; struct qtfs_conn_var_s *qtfs_epoll_var = NULL; #ifdef QTFS_SERVER struct qtfs_server_userp_s *qtfs_userps = NULL; #endif +#ifdef QTFS_CLIENT +struct kmem_cache *qtfs_fifo_pvar_cache; +#endif // try to connect remote uds server, only for unix domain socket #define QTFS_UDS_PROXY_SUFFIX ".proxy" @@ -207,17 +212,65 @@ void qtfs_conn_fini(struct qtfs_conn_var_s *pvar) return pvar->conn_ops->conn_fini(&pvar->conn_var, pvar->user_type); } +#define MAGIC_U32(magic, n) ((magic >> (n * 8)) & 0xff) +static inline int qtfs_conn_sync_magic(struct qtfs_conn_var_s *pvar, bool block) +{ + u8 byte; + int ret; + if (pvar->magic_recv == 0) + return 0; + while (1) { + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block); + if (ret <= 0) break; + if (byte != MAGIC_U32(pvar->magic_recv, 3)) continue; + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block); + if (ret <= 0) break; + if (byte != MAGIC_U32(pvar->magic_recv, 2)) continue; + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block); + if (ret <= 0) break; + if (byte != MAGIC_U32(pvar->magic_recv, 1)) continue; + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block); + if (ret <= 0) break; + if (byte != MAGIC_U32(pvar->magic_recv, 0)) continue; + break; + } + if (ret < 0) { + if (ret != -EAGAIN) + qtfs_err("qtfs sync magic failed ret:%d byte:%u", ret, byte); + return ret; + } + return 0; +} + int qtfs_conn_send(struct qtfs_conn_var_s *pvar) { + int ret = 0; + int iov_ret = 0; if (pvar->vec_send.iov_len > pvar->send_max) return -EMSGSIZE; + if (pvar->magic_send != 0) { + ret = pvar->conn_ops->conn_send(&pvar->conn_var, &pvar->magic_send, sizeof(pvar->magic_send)); + if (ret <= 0) { + qtfs_err("magic send failed, ret:%d", ret); + return ret; + } + } pvar->send_valid = pvar->vec_send.iov_len; - return pvar->conn_ops->conn_send(&pvar->conn_var, pvar->vec_send.iov_base, pvar->vec_send.iov_len); + ret = pvar->conn_ops->conn_send(&pvar->conn_var, pvar->vec_send.iov_base, pvar->vec_send.iov_len); + if (ret <= 0) + return ret; + if (pvar->iov_send) { + iov_ret = pvar->conn_ops->conn_send_iter(&pvar->conn_var, pvar->iov_send); + pvar->iov_send = NULL; // invalid it after use + if (iov_ret <= 0) + return iov_ret; + } + return ret + iov_ret; } int do_qtfs_conn_recv(struct qtfs_conn_var_s *pvar, bool block) { - int ret; + int ret = 0; int headlen = 0; struct qtreq *rsp = NULL; struct kvec load; @@ -232,13 +285,14 @@ int do_qtfs_conn_recv(struct qtfs_conn_var_s *pvar, bool block) load.iov_base = pvar->vec_recv.iov_base + QTFS_MSG_HEAD_LEN; load.iov_len = pvar->vec_recv.iov_len - QTFS_MSG_HEAD_LEN; rsp = pvar->vec_recv.iov_base; - if (rsp->len > load.iov_len) { - qtfs_err("qtfs recv head invalid len is:%lu", rsp->len); - return -EINVAL; - } + + // only recv head + if (load.iov_len == 0) + goto end; retry: - ret = pvar->conn_ops->conn_recv(&pvar->conn_var, load.iov_base, rsp->len, true); + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, load.iov_base, + (rsp->len < load.iov_len) ? rsp->len : load.iov_len, true); if (ret == -EAGAIN) goto retry; if (ret == -ERESTARTSYS) { @@ -261,12 +315,18 @@ retry: qtfs_crit("recv total:%d msg len:%lu\n", ret, rsp->len); WARN_ON(1); } +end: return ret + headlen; } int qtfs_conn_recv_block(struct qtfs_conn_var_s *pvar) { - int ret = do_qtfs_conn_recv(pvar, true); + int ret = 0; + ret = qtfs_conn_sync_magic(pvar, true); + if (ret != 0) { + return ret; + } + ret = do_qtfs_conn_recv(pvar, true); if (ret > 0) { pvar->recv_valid = (ret > pvar->recv_max) ? pvar->recv_max : ret; } @@ -275,7 +335,12 @@ int qtfs_conn_recv_block(struct qtfs_conn_var_s *pvar) int qtfs_conn_recv(struct qtfs_conn_var_s *pvar) { - int ret = do_qtfs_conn_recv(pvar, false); + int ret = 0; + ret = qtfs_conn_sync_magic(pvar, true); + if (ret != 0) { + return ret; + } + ret = do_qtfs_conn_recv(pvar, false); if (ret <= 0) { msleep(1); } else { @@ -286,10 +351,12 @@ int qtfs_conn_recv(struct qtfs_conn_var_s *pvar) int qtfs_conn_var_init(struct qtfs_conn_var_s *pvar) { + INIT_LIST_HEAD(&pvar->lst); // qtfs消息为130多k,当作最大值作为合法性判断 if (pvar->recv_max > QTFS_MSG_LEN || pvar->send_max > QTFS_MSG_LEN || pvar->recv_max == 0 || pvar->recv_max == 0) { - qtfs_err("invalid recv max:%u or invalid send max:%u", pvar->recv_max, pvar->send_max); + qtfs_err("invalid recv max:%u or invalid send max:%u", + pvar->recv_max, pvar->send_max); return QTFS_ERR; } pvar->vec_recv.iov_base = kmalloc(pvar->recv_max, GFP_KERNEL); @@ -310,7 +377,6 @@ int qtfs_conn_var_init(struct qtfs_conn_var_s *pvar) memset(pvar->vec_send.iov_base, 0, pvar->send_max); pvar->recv_valid = 0; pvar->send_valid = 0; - INIT_LIST_HEAD(&pvar->lst); qtfs_info("init pvar thread:%d recv max:%u, send max:%u", pvar->cur_threadidx, pvar->recv_max, pvar->send_max); return QTFS_OK; } @@ -419,6 +485,7 @@ int qtfs_sm_reconnect(struct qtfs_conn_var_s *pvar) if (ret) { qtfs_err("qtfs sm active init failed, ret:%d.", ret); ret = QTERROR; + pvar->state = QTCONN_INIT; break; } @@ -494,10 +561,22 @@ static void parse_param(void) g_pvar_ops->parse_param(); } -void qtfs_conn_param_init(void) +int qtfs_conn_param_init(void) { +#ifdef QTFS_CLIENT + qtfs_fifo_pvar_cache = kmem_cache_create("qtfs_fifo_pvar", + sizeof(struct qtfs_conn_var_s), + 0, + (SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD), + NULL); + if (!qtfs_fifo_pvar_cache) { + qtfs_err("qtfs fifo pvar cache create failed.\n"); + return -ENOMEM; + } +#endif INIT_LIST_HEAD(&g_vld_lst); INIT_LIST_HEAD(&g_busy_lst); + INIT_LIST_HEAD(&g_fifo_lst); init_llist_head(&g_lazy_put_llst); atomic_set(&g_qtfs_conn_num, 0); // parse module_param and choose specified channel @@ -506,7 +585,8 @@ void qtfs_conn_param_init(void) g_pvar_ops->param_init(); mutex_init(&g_param_mutex); - return; + mutex_init(&g_fifo_mutex); + return 0; } void release_pvar(struct qtfs_conn_var_s *pvar) @@ -535,6 +615,10 @@ void qtfs_conn_param_fini(void) int conn_num; int i; +#ifdef QTFS_CLIENT + kmem_cache_destroy(qtfs_fifo_pvar_cache); +#endif + ret = qtfs_mutex_lock_interruptible(&g_param_mutex); if (ret) { qtfs_err("qtfs conn param finish mutex lock interrup failed, ret:%d.", ret); @@ -723,7 +807,7 @@ struct qtfs_conn_var_s *qtfs_epoll_establish_conn(void) ret = qtfs_sm_active(pvar); if (ret) { qtfs_err("qtfs epoll get param active new param failed, ret:%d state:%s", ret, QTCONN_CUR_STATE(pvar)); - return NULL; + return pvar; } qtfs_info("qtfs create new epoll param state:%s", QTCONN_CUR_STATE(pvar)); @@ -764,6 +848,60 @@ void qtfs_epoll_cut_conn(struct qtfs_conn_var_s *pvar) } } +#ifdef QTFS_CLIENT +/* fifo的机制有所不同,每一个pvar对应唯一一个fifo的访问,生命周期贯穿 + 从fifo open开始到fifo close结束,在open时get param,在close时put param */ +#define QTFS_FIFO_MAGIC_SEND 0xa55aa55a +#define QTFS_FIFO_MAGIC_RECV 0x5aa55aa5 +struct qtfs_conn_var_s *qtfs_fifo_get_param(void) +{ + int ret; + struct qtfs_conn_var_s *pvar = kmem_cache_alloc(qtfs_fifo_pvar_cache, GFP_KERNEL); + if (pvar == NULL) { + qtfs_err("kmem cache alloc fifo cache failed."); + return NULL; + } + memset(pvar, 0, sizeof(struct qtfs_conn_var_s)); + // initialize conn_pvar here + pvar->recv_max = QTFS_FIFO_REQ_LEN; + pvar->send_max = QTFS_FIFO_REQ_LEN; + pvar->magic_send = QTFS_FIFO_MAGIC_SEND; + pvar->magic_recv = QTFS_FIFO_MAGIC_RECV; + pvar->user_type = QTFS_CONN_TYPE_FIFO; + g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type); + if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) { + qtfs_err("qtfs sock var init failed.\n"); + kmem_cache_free(qtfs_fifo_pvar_cache, pvar); + return NULL; + } + pvar->state = QTCONN_INIT; + + ret = qtfs_sm_active(pvar); + if (ret) { + qtfs_err("qtfs fifo get param active new param faile, ret:%d state:%s", ret, QTCONN_CUR_STATE(pvar)); + pvar->conn_ops->conn_var_fini(pvar); + kmem_cache_free(qtfs_fifo_pvar_cache, pvar); + return NULL; + } + mutex_lock(&g_fifo_mutex); + list_add(&pvar->lst, &g_fifo_lst); + mutex_unlock(&g_fifo_mutex); + qtfs_info("qtfs create new fifo param state:%s", QTCONN_CUR_STATE(pvar)); + return pvar; +} + +void qtfs_fifo_put_param(struct qtfs_conn_var_s *pvar) +{ + mutex_lock(&g_fifo_mutex); + list_del(&pvar->lst); + mutex_unlock(&g_fifo_mutex); + qtfs_sm_exit(pvar); + pvar->conn_ops->conn_var_fini(pvar); + kmem_cache_free(qtfs_fifo_pvar_cache, pvar); + return; +} +#endif + void qtfs_conn_list_cnt(void) { struct list_head *entry; diff --git a/qtfs/qtfs_common/socket.c b/qtfs/qtfs_common/socket.c index 31ed4dffc3f76ae63cf423acfa28f5599a8b2152..3cfa0b2b324ea2efa529c6a66d6ceff903ce1bc7 100644 --- a/qtfs/qtfs_common/socket.c +++ b/qtfs/qtfs_common/socket.c @@ -177,7 +177,7 @@ static int qtfs_conn_sock_init(void *connvar, qtfs_conn_type_e type) { struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; struct socket *sock; - int ret; + int ret = 0; int sock_family = AF_VSOCK; #ifdef QTFS_TEST_MODE struct sockaddr_in saddr; @@ -192,7 +192,12 @@ static int qtfs_conn_sock_init(void *connvar, qtfs_conn_type_e type) saddr.svm_port = sockvar->vm_port; saddr.svm_cid = sockvar->vm_cid; #endif - if (type >= QTFS_CONN_TYPE_INV || qtfs_server_main_sock[type] != NULL) { + if (type >= QTFS_CONN_TYPE_INV) { + qtfs_err("invalid type in sock init:%u", type); + ret = -EINVAL; + goto err_end_type; + } + if (qtfs_server_main_sock[type] != NULL) { qtfs_info("qtfs conn type:%u main sock is set, valid or out-of-date?", type); return 0; } @@ -230,6 +235,7 @@ static int qtfs_conn_sock_init(void *connvar, qtfs_conn_type_e type) err_end: sock_release(sock); +err_end_type: return ret; } #endif @@ -299,24 +305,53 @@ static int qtfs_conn_sock_recv(void *connvar, void *buf, size_t len, bool block) { struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; struct kvec v; - memset(&sockvar->msg_recv, 0, sizeof(sockvar->msg_recv)); + struct msghdr msg_recv; + memset(&msg_recv, 0, sizeof(msg_recv)); v.iov_base = buf; v.iov_len = len; - return kernel_recvmsg(sockvar->client_sock, &sockvar->msg_recv, &v, 1, + return kernel_recvmsg(sockvar->client_sock, &msg_recv, &v, 1, len, (block == true) ? MSG_WAITALL : MSG_DONTWAIT); } +static int qtfs_conn_sock_recv_iter(void *connvar, struct iov_iter *iov, bool block) +{ + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct msghdr msg_recv; + if (iov == NULL) { + qtfs_err("sock recv iter is invalid"); + return -EINVAL; + } + memset(&msg_recv, 0, sizeof(msg_recv)); + msg_recv.msg_iter = *iov; + return sock_recvmsg(sockvar->client_sock, &msg_recv, (block == true) ? MSG_WAITALL : MSG_DONTWAIT); +} + +static int qtfs_conn_sock_send_iter(void *connvar, struct iov_iter *iov) +{ + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct msghdr msg_send; + if (iov == NULL) { + qtfs_err("sock send iter is invalid"); + return -EINVAL; + } + memset(&msg_send, 0, sizeof(msg_send)); + msg_send.msg_iter = *iov; + return sock_sendmsg(sockvar->client_sock, &msg_send); +} + static int qtfs_conn_sock_send(void *connvar, void *buf, size_t len) { struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; struct kvec v; int ret; + struct msghdr msg_send; + memset(&msg_send, 0, sizeof(struct msghdr)); v.iov_base = buf; v.iov_len = len; - ret = kernel_sendmsg(sockvar->client_sock, &sockvar->msg_send, &v, 1, len); + ret = kernel_sendmsg(sockvar->client_sock, &msg_send, &v, 1, len); if (ret < 0) { qtfs_err("qtfs sock send error, ret:%d.\n", ret); } @@ -421,6 +456,8 @@ struct qtfs_conn_ops_s qtfs_conn_sock_ops = { .conn_fini = qtfs_conn_sock_fini, .conn_send = qtfs_conn_sock_send, .conn_recv = qtfs_conn_sock_recv, + .conn_send_iter = qtfs_conn_sock_send_iter, + .conn_recv_iter = qtfs_conn_sock_recv_iter, #ifdef QTFS_SERVER .conn_new_connection = qtfs_conn_sock_server_accept, .conn_inited = qtfs_conn_sock_inited, diff --git a/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml b/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml index ffcb507d7e2f96920f3c37554108af7737730066..803b5edef0a110d1bcf1516728eef1970f7db7ad 100644 --- a/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml +++ b/qtfs/qtfs_server/qtfs_fifo_server/Cargo.toml @@ -5,4 +5,5 @@ edition = "2021" [dependencies] tokio = { version = "1.29.1", features = ["full"]} -libc = "0.2" \ No newline at end of file +libc = "0.2" +rlimit = "0.10.1" \ No newline at end of file diff --git a/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs b/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs index bb52fbcc9ed539c43c135c600eefc2953f0c81f8..9ddf5ee793a606bdb8324d544a2e0b783368a814 100644 --- a/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs +++ b/qtfs/qtfs_server/qtfs_fifo_server/src/cofifo.rs @@ -14,7 +14,6 @@ *******************************************************************************/ use tokio::net::TcpStream; -use std::net::TcpStream as StdTcpStream; use std::mem; use tokio::fs::File; use tokio::fs; @@ -23,136 +22,99 @@ use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -#[derive(Debug)] +#[derive(Debug, Clone)] #[repr(C, packed)] struct Qtreq { // magic: [u8; 4], //magic: 0x5aa55aa5 msgtype: u32, error: u32, + seq_num: u64, len: usize, } -async fn qtfs_req_head(mut stream: TcpStream, _idx: usize) -> Qtreq { - const HEADSIZE: usize = mem::size_of::(); - - let mut msghead = [0; HEADSIZE]; - let _ = stream.read_exact(&mut msghead).await; - let head = Qtreq { - msgtype: u32::from_le_bytes(msghead[0..4].try_into().unwrap()), - error: u32::from_le_bytes(msghead[4..8].try_into().unwrap()), - len: usize::from_le_bytes(msghead[8..8+mem::size_of::()].try_into().unwrap()), - }; - println!("Recv new head:{:?}", head); - head -} - +const QTFS_REQ_OPEN: u32 = 2; +const QTFS_REQ_CLOSE: u32 = 3; +const QTFS_REQ_READ: u32 = 5; +const QTFS_REQ_WRITE: u32 = 6; pub async fn qtfs_fifo_server(stream: TcpStream, idx: usize) { - const QTFS_REQ_OPEN: u32 = 2; - const QTFS_REQ_CLOSE: u32 = 3; - const QTFS_REQ_READ: u32 = 5; - const QTFS_REQ_WRITE: u32 = 6; - - let stdstream: StdTcpStream = stream.into_std().unwrap(); - let s1 = stdstream.try_clone().unwrap(); - let s2 = stdstream.try_clone().unwrap(); - let s3 = stdstream.try_clone().unwrap(); - - let s1 = TcpStream::from_std(s1).unwrap(); - let s2 = TcpStream::from_std(s2).unwrap(); - let s3 = TcpStream::from_std(s3).unwrap(); + let mut conn = Conn {stream}; + let mut head: Qtreq; - //stdstream is still alive - - let mut conn = Conn {stream: s1}; - conn.package_sync().await; - - let head: Qtreq = qtfs_req_head(s2, idx.clone()).await; + match conn.qtfs_req_head(idx.clone()).await { + Ok(h) => head = h, + Err(e) => { + println!("Recv invalid head exit this proc :{}.", e); + return; + } + } if head.msgtype != QTFS_REQ_OPEN { println!("first msg type is invalid"); return; } - let file = match qtfs_fifo_open(s3, head).await { + let file = match conn.qtfs_fifo_open(head.clone()).await { Ok(f) => { + head.len = mem::size_of::(); + conn.req_head_ack(head).await; conn.open_ack(0).await; f } Err(e) => { + head.len = mem::size_of::(); println!("Open fifo error:{}", e); + conn.req_head_ack(head).await; conn.open_ack(1).await; return; } }; 'main: loop { - conn.package_sync().await; - let s1 = stdstream.try_clone().unwrap(); - let s2 = stdstream.try_clone().unwrap(); - let s1 = TcpStream::from_std(s1).unwrap(); - let s2 = TcpStream::from_std(s2).unwrap(); - - let head: Qtreq = qtfs_req_head(s1, idx.clone()).await; + let mut head: Qtreq; + match conn.qtfs_req_head(idx.clone()).await { + Ok(h) => head = h, + Err(e) => { + println!("head recv failed, {}.", e); + return; + } + } match head.msgtype { QTFS_REQ_OPEN => { println!("Fifo is opened and recv open request again!"); + head.len = mem::size_of::(); + conn.req_head_ack(head).await; conn.open_ack(1).await; } QTFS_REQ_CLOSE => { println!("Close req idx:{}", idx.clone()); - conn.close_ack().await; + head.len = 0; + conn.req_head_ack(head).await; break 'main; } QTFS_REQ_READ => { println!("Read req idx:{}", idx.clone()); - qtfs_fifo_read(s2, file.try_clone().await.unwrap()).await; + conn.qtfs_fifo_read(file.try_clone().await.unwrap(), head).await; } QTFS_REQ_WRITE => { println!("Write req idx:{}", idx.clone()); - qtfs_fifo_write(s2, file.try_clone().await.unwrap()).await; + conn.qtfs_fifo_write(file.try_clone().await.unwrap(), head).await; } _ => { println!("Recv invalid msg type"); } } } + println!("Fifo server idx:{} is closed.", idx); } +#[derive(Debug)] +#[repr(C, packed)] +struct Qtreqopen { + flags: u64, + mode: u32, +} #[repr(C, packed)] struct Qtrspopen { ret: i32, } -async fn qtfs_fifo_open(mut stream: TcpStream, head: Qtreq) -> Result { - if head.len >= 4096 { - println!("qtfs fifo len invalid"); - return Err(1); - } - - let mut path = Vec::with_capacity(head.len); - path.resize(head.len, 0); - stream.read_exact(&mut path).await.unwrap(); - - let getstr = String::from_utf8(path).unwrap(); - let pathstr = getstr.trim_end_matches('\0').trim(); - match fs::metadata(pathstr.clone()).await { - Ok(meta) => { - if meta.file_type().is_fifo() == false { - println!("Requst path:{} not fifo!", pathstr); - return Err(1); - } - } - Err(_) => { - println!("path:{} check failed.", pathstr); - return Err(1); - } - }; - println!("Recv open path:{}", pathstr); - let file = OpenOptions::new() - .read(true) - .write(true) - .custom_flags(libc::O_NONBLOCK) - .open(pathstr).await.unwrap(); - - Ok(file) -} #[repr(C, packed)] struct Qtreqread { @@ -161,52 +123,11 @@ struct Qtreqread { #[repr(C, packed)] struct Qtrspread { - ret: i32, errno: i32, len: u64, } -async fn qtfs_fifo_read(mut stream: TcpStream, mut file: File) { - let mut head = [0; mem::size_of::()]; - stream.read_exact(&mut head).await.unwrap(); - let req = Qtreqread { - len: u64::from_le_bytes(head[0..8].try_into().unwrap()), - }; - let len = std::cmp::min(req.len, 4096); - - let mut rsp = Qtrspread { - ret: 0, - errno: 0, - len: 0, - }; - - let mut buf = Vec::with_capacity(len.try_into().unwrap()); - buf.resize(len.try_into().unwrap(), 0); - - match file.read(&mut buf).await { - Ok(n) => { - rsp.len = n as u64; - let send = unsafe { - let ptr = &rsp as *const Qtrspread as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - stream.write_all(&send[..mem::size_of::()]).await.unwrap(); - let _ = stream.write_all(&buf[..n]).await.unwrap(); - } - Err(e) => { - rsp.errno = -1; - rsp.ret = 1; - rsp.len = 0; - let send = unsafe { - let ptr = &rsp as *const Qtrspread as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - stream.write_all(&send[..mem::size_of::()]).await.unwrap(); - println!("Read from fifo error:{}", e); - } - } -} #[repr(C, packed)] struct Qtreqwrite { @@ -214,51 +135,9 @@ struct Qtreqwrite { } #[repr(C, packed)] struct Qtrspwrite { - ret: i32, errno: i32, len: u64, } -async fn qtfs_fifo_write(mut stream: TcpStream, mut file: File) { - let mut whead = [0; mem::size_of::()]; - stream.read_exact(&mut whead).await.unwrap(); - let len = u64::from_le_bytes(whead[0..8].try_into().unwrap()); - - // 最大接收一次性写入4k - let len = std::cmp::min(len, 4096); - - let mut rsp = Qtrspwrite { - ret: 0, - errno: 0, - len: 0, - }; - println!("Qtfs fifo write len:{}", len); - let stdstream: StdTcpStream = stream.into_std().unwrap(); - let s = stdstream.try_clone().unwrap(); - let mut stream = TcpStream::from_std(stdstream).unwrap(); - let s = TcpStream::from_std(s).unwrap(); - let mut conn = Conn {stream: s}; - - let mut buf = Vec::with_capacity(len.try_into().unwrap()); - buf.resize(len.try_into().unwrap(), 0); - stream.read_exact(&mut buf).await.unwrap(); - - match file.write_all(&mut buf[..len as usize]).await { - Ok(_) => { - rsp.len = len as u64; - conn.write_ack(rsp).await; - } - Err(e) => { - rsp.len = 0; - conn.write_ack(rsp).await; - println!("Write failed {}.", e); - } - } -} - -#[repr(C, packed)] -struct Qtrspclose { - ret: i32, -} struct Conn { stream: TcpStream, @@ -282,6 +161,163 @@ impl Conn { } } + async fn send_magic_head(&mut self) { + const MAGIC: [u8; 4] = [0x5a, 0xa5, 0x5a, 0xa5]; + let _ = self.stream.write_all(&MAGIC[0..4]).await; + } + + async fn qtfs_req_head(&mut self, _idx: usize) -> Result { + const HEADSIZE: usize = mem::size_of::(); + self.package_sync().await; + let mut msghead = [0; HEADSIZE]; + self.stream.read_exact(&mut msghead).await?; + let head = Qtreq { + msgtype: u32::from_le_bytes(msghead[0..4].try_into().unwrap()), + error: u32::from_le_bytes(msghead[4..8].try_into().unwrap()), + seq_num: u64::from_le_bytes(msghead[8..16].try_into().unwrap()), + len: usize::from_le_bytes(msghead[16..16+mem::size_of::()].try_into().unwrap()), + }; + let reqtype: String = match head.msgtype { + QTFS_REQ_OPEN => String::from("Open"), + QTFS_REQ_CLOSE => String::from("Close"), + QTFS_REQ_READ => String::from("Read"), + QTFS_REQ_WRITE => String::from("Write"), + _ => String::from("Unknown"), + }; + println!("Recv new head type:{} msg:{:?}", reqtype, head); + Ok(head) + } + + async fn qtfs_fifo_open(&mut self, head: Qtreq) -> Result { + const HEADSIZE: usize = mem::size_of::(); + let mut openhead = [0; HEADSIZE]; + + if head.len >= 4096 + HEADSIZE { + println!("qtfs fifo len invalid"); + return Err(1); + } + self.stream.read_exact(&mut openhead).await.unwrap(); + let openhead1 = Qtreqopen { + flags: u64::from_le_bytes(openhead[0..8].try_into().unwrap()), + mode: u32::from_le_bytes(openhead[8..12].try_into().unwrap()), + }; + println!("open head:{:?}", openhead1); + let mut path = Vec::with_capacity(head.len - HEADSIZE); + path.resize(head.len - HEADSIZE, 0); + self.stream.read_exact(&mut path).await.unwrap(); + + let getstr = String::from_utf8(path).unwrap(); + let pathstr = getstr.trim_end_matches('\0').trim(); + match fs::metadata(pathstr.clone()).await { + Ok(meta) => { + if meta.file_type().is_fifo() == false { + println!("Requst path:{} not fifo!", pathstr); + return Err(1); + } + } + Err(_) => { + println!("path:{} check failed.", pathstr); + return Err(1); + } + }; + println!("Recv open path:{}", pathstr); + let file = OpenOptions::new() + .read(true) + .write(true) + //.custom_flags(libc::O_NONBLOCK) + .open(pathstr).await.unwrap(); + + Ok(file) + } + + async fn qtfs_fifo_read(&mut self, mut file: File, mut reqhead: Qtreq) { + let mut head = [0; mem::size_of::()]; + self.stream.read_exact(&mut head).await.unwrap(); + let req = Qtreqread { + len: u64::from_le_bytes(head[0..8].try_into().unwrap()), + }; + let len = std::cmp::min(req.len, 4096); + + let mut rsp = Qtrspread { + errno: 0, + len: 0, + }; + + let mut buf = Vec::with_capacity(len.try_into().unwrap()); + buf.resize(len.try_into().unwrap(), 0); + + match file.read(&mut buf).await { + Ok(n) => { + rsp.len = n as u64; + let send = unsafe { + let ptr = &rsp as *const Qtrspread as *const u8; + std::slice::from_raw_parts(ptr, mem::size_of::()) + }; + println!("Read {} bytes from fifo", n.clone()); + reqhead.len = mem::size_of::(); + self.req_head_ack(reqhead).await; + self.stream.write_all(&send[..mem::size_of::()]).await.unwrap(); + let _ = self.stream.write_all(&buf[..n]).await.unwrap(); + } + Err(e) => { + rsp.errno = -1; + rsp.len = 0; + let send = unsafe { + let ptr = &rsp as *const Qtrspread as *const u8; + std::slice::from_raw_parts(ptr, mem::size_of::()) + }; + reqhead.len = mem::size_of::(); + self.req_head_ack(reqhead).await; + self.stream.write_all(&send[..mem::size_of::()]).await.unwrap(); + println!("Read from fifo error:{}", e); + } + } + } + + async fn qtfs_fifo_write(&mut self, mut file: File, mut reqhead: Qtreq) { + let mut whead = [0; mem::size_of::()]; + self.stream.read_exact(&mut whead).await.unwrap(); + let len = u64::from_le_bytes(whead[0..8].try_into().unwrap()); + + // 最大接收一次性写入4k + let len = std::cmp::min(len, 4096); + + let mut rsp = Qtrspwrite { + errno: 0, + len: 0, + }; + + let mut buf = Vec::with_capacity(len.try_into().unwrap()); + buf.resize(len.try_into().unwrap(), 0); + self.stream.read_exact(&mut buf).await.unwrap(); + + match file.write_all(&mut buf[..len as usize]).await { + Ok(_) => { + rsp.len = len as u64; + reqhead.len = mem::size_of::(); + self.req_head_ack(reqhead).await; + self.write_ack(rsp).await; + println!("Write fifo ok, send ack."); + } + Err(e) => { + rsp.len = 0; + reqhead.len = mem::size_of::(); + self.req_head_ack(reqhead).await; + self.write_ack(rsp).await; + println!("Write failed {}.", e); + } + } + } + + async fn req_head_ack(&mut self, head: Qtreq) { + let send = unsafe { + let ptr = &head as *const Qtreq as *const u8; + std::slice::from_raw_parts(ptr, mem::size_of::()) + }; + self.send_magic_head().await; + self.stream.write_all(&send[..mem::size_of::()]).await.expect("req head ack failed"); + } + async fn open_ack(&mut self, retcode: i32) { let rsp = Qtrspopen {ret: retcode,}; let send = unsafe { @@ -291,15 +327,6 @@ impl Conn { self.stream.write_all(&send[..mem::size_of::()]).await.expect("Response open failed"); } - async fn close_ack(&mut self) { - let rsp = Qtrspclose {ret: 0}; - let send = unsafe { - let ptr = &rsp as *const Qtrspclose as *const u8; - std::slice::from_raw_parts(ptr, mem::size_of::()) - }; - self.stream.write_all(&send[..mem::size_of::()]).await.expect("Response close failed"); - } - async fn write_ack(&mut self, rsp: Qtrspwrite) { let send = unsafe { let ptr = &rsp as *const Qtrspwrite as *const u8; diff --git a/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs b/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs index 9745e3c7f2964b933340c34dc75039dda05023e0..4f8c0c4050d99ffbc81027dd8366d9f545d21031 100644 --- a/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs +++ b/qtfs/qtfs_server/qtfs_fifo_server/src/main.rs @@ -18,8 +18,20 @@ use std::net::TcpListener; use tokio::net::TcpListener as AsyncTcpListener; use tokio::runtime::Builder; +extern crate rlimit; +use rlimit::Resource; +use rlimit::setrlimit; mod cofifo; +async fn set_rlimit_fd(){ + let rlimit = Resource::NOFILE; + let fd_limit = 65536; + match setrlimit(rlimit, fd_limit, fd_limit) { + Ok(_) => {}, + Err(e) => println!("Set file rlimit to {} failed {}.", fd_limit, e), + } +} + #[tokio::main] async fn main() { let args: Vec = env::args().collect(); @@ -29,6 +41,7 @@ async fn main() { println!(" {} 192.168.1.10:12310 10", bin); return; } + set_rlimit_fd().await; let addr: String = args[1].trim().parse().expect("Input address: '192.168.1.10:12310'"); let max_block_threads: usize = args[2].trim().parse().expect("Input max blocking threads number in arg 2: like '10'"); let listener = TcpListener::bind(addr.clone()).unwrap();