diff --git a/qtfs/include/comm.h b/qtfs/include/comm.h index 201ee00c64c682280c677a6f8a5a7d8d62762321..b275a1df53d23bffce2cd7399076e784042d99fd 100644 --- a/qtfs/include/comm.h +++ b/qtfs/include/comm.h @@ -169,7 +169,7 @@ typedef enum { QTCONN_INIT, QTCONN_CONNECTING, QTCONN_ACTIVE, -} qtfs_conn_type_e; +} qtfs_conn_state_e; struct qtinfo_client { unsigned long cnts[QTINF_NUM]; diff --git a/qtfs/include/conn.h b/qtfs/include/conn.h index 3658eb39e9600e7c710649865de1f03723a3ebe8..fb0146065e59f8f07722410cf74c1b0b5072a4bd 100644 --- a/qtfs/include/conn.h +++ b/qtfs/include/conn.h @@ -34,9 +34,7 @@ extern char qtfs_server_ip[20]; extern int qtfs_server_port; extern unsigned int qtfs_server_vsock_port; extern unsigned int qtfs_server_vsock_cid; -extern int qtfs_conn_max_conn; -extern struct socket *qtfs_server_main_sock; extern struct qtfs_conn_var_s *qtfs_thread_var[QTFS_MAX_THREADS]; extern struct qtfs_conn_var_s *qtfs_epoll_var; extern char qtfs_log_level[QTFS_LOGLEVEL_STRLEN]; @@ -91,6 +89,15 @@ typedef enum { QTFS_CONN_SOCK_CLIENT, } qtfs_conn_cs_e; +// 使用pvar的主题类型,为了更好区分类型,socket下server有主socket下建立多个链接 +// 的需求,通过类型告知socket模块,且解藕conn层和下面的不同消息通道类型 +typedef enum { + QTFS_CONN_TYPE_QTFS, // QTFS tunnel type + QTFS_CONN_TYPE_EPOLL, // EPOLL thread type + QTFS_CONN_TYPE_FIFO, // FIFO type + QTFS_CONN_TYPE_INV, +} qtfs_conn_type_e; + struct qtfs_pcie_var_s { int srcid; int dstid; @@ -99,25 +106,18 @@ struct qtfs_pcie_var_s { struct qtfs_sock_var_s { struct socket *sock; struct socket *client_sock; +#ifdef QTFS_TEST_MODE char addr[20]; unsigned short port; - +#else // for vsock unsigned int vm_port; unsigned int vm_cid; +#endif + struct msghdr msg_recv; + struct msghdr msg_send; }; -struct qtfs_pvar_ops_s { - // channel-specific parameter parsing function - int (*parse_param)(void); - // channel-specific global param init - int (*param_init)(void); - int (*param_fini)(void); - // init pvar with channel specific ops - int (*pvar_init)(struct qtfs_conn_var_s *pvar); -}; -extern struct qtfs_pvar_ops_s *g_pvar_ops; - struct qtfs_conn_ops_s { // conn message buffer initialization and releasement. int (*conn_var_init)(struct qtfs_conn_var_s *pvar); @@ -126,16 +126,27 @@ struct qtfs_conn_ops_s { void *(*get_conn_msg_buf)(struct qtfs_conn_var_s *pvar, int dir); // connection related ops - int (*conn_init)(struct qtfs_conn_var_s *pvar); - void (*conn_fini)(struct qtfs_conn_var_s *pvar); - int (*conn_send)(struct qtfs_conn_var_s *pvar); - int (*conn_recv)(struct qtfs_conn_var_s *pvar, bool block); - int (*conn_server_accept)(struct qtfs_conn_var_s *pvar); - int (*conn_client_connect)(struct qtfs_conn_var_s *pvar); - bool (*conn_inited)(struct qtfs_conn_var_s *pvar); - bool (*conn_connected)(struct qtfs_conn_var_s *pvar); - void (*conn_recv_buff_drop)(struct qtfs_conn_var_s *pvar); + int (*conn_init)(void *connvar, qtfs_conn_type_e type); + void (*conn_fini)(void *connvar, qtfs_conn_type_e type); + int (*conn_send)(void *connvar, void *buf, size_t len); + int (*conn_recv)(void *connvar, void *buf, size_t len, bool block); + int (*conn_server_accept)(void *connvar, qtfs_conn_type_e type); + int (*conn_client_connect)(void *connvar); + bool (*conn_inited)(void *connvar, qtfs_conn_type_e type); + bool (*conn_connected)(void *connvar); + void (*conn_recv_buff_drop)(void *connvar); +}; + +struct qtfs_pvar_ops_s { + // channel-specific parameter parsing function + int (*parse_param)(void); + // channel-specific global param init + int (*param_init)(void); + int (*param_fini)(void); + // init pvar with channel specific ops + int (*pvar_init)(void *connvar, struct qtfs_conn_ops_s **conn_ops, qtfs_conn_type_e type); }; +extern struct qtfs_pvar_ops_s *g_pvar_ops; struct qtfs_conn_var_s { struct list_head lst; @@ -144,7 +155,8 @@ struct qtfs_conn_var_s { int cur_threadidx; int miss_proc; unsigned long seq_num; - qtfs_conn_type_e state; + qtfs_conn_state_e state; + qtfs_conn_type_e user_type; // type of pvar's user: qtfs/epoll deamon/fifo char who_using[QTFS_FUNCTION_LEN]; union { struct qtfs_sock_var_s sock_var; @@ -157,8 +169,6 @@ struct qtfs_conn_var_s { unsigned long send_valid; struct kvec vec_recv; struct kvec vec_send; - struct msghdr msg_recv; - struct msghdr msg_send; }; struct qtfs_wl_cap { diff --git a/qtfs/qtfs/qtfs-mod.c b/qtfs/qtfs/qtfs-mod.c index e68bb1e20787a583312a9c58ba0d60df222af1fa..31e92b0dcf0f10967e68bcb45d6a7b50f7599272 100644 --- a/qtfs/qtfs/qtfs-mod.c +++ b/qtfs/qtfs/qtfs-mod.c @@ -55,7 +55,7 @@ void *qtfs_remote_run(struct qtfs_conn_var_s *pvar, unsigned int type, unsigned req->len = len; req->seq_num = pvar->seq_num; - pvar->conn_ops->conn_recv_buff_drop(pvar); + pvar->conn_ops->conn_recv_buff_drop(&pvar->conn_var); // 调用qtfs_remote_run之前,调用者应该先把消息在iov_base里面封装好 // 如果不是socket通信,则是在其他通信模式定义的buf里,消息协议统一 // 都是struct qtreq *xx @@ -150,7 +150,7 @@ connecting: while (!kthread_should_stop()) { ret = qtfs_conn_recv(pvar); - if (ret == -EPIPE || pvar->conn_ops->conn_connected(pvar) == false) + if (ret == -EPIPE || pvar->conn_ops->conn_connected(&pvar->conn_var) == false) goto connecting; if (ret < 0 || req->event_nums <= 0 || req->event_nums >= QTFS_MAX_EPEVENTS_NUM) { continue; @@ -305,19 +305,6 @@ static void __exit qtfs_exit(void) return; } -#ifdef QTFS_TEST_MODE -module_param_string(qtfs_server_ip, qtfs_server_ip, sizeof(qtfs_server_ip), 0600); -MODULE_PARM_DESC(qtfs_server_ip, "qtfs server ip"); -module_param(qtfs_server_port, int, 0600); -#else -module_param(qtfs_server_vsock_port, uint, 0600); -module_param(qtfs_server_vsock_cid, uint, 0600); -#endif - -module_param(qtfs_conn_max_conn, int, 0600); -module_param_string(qtfs_log_level, qtfs_log_level, sizeof(qtfs_log_level), 0600); -module_param_string(qtfs_conn_type, qtfs_conn_type, sizeof(qtfs_conn_type), 0600); - module_init(qtfs_init); module_exit(qtfs_exit); MODULE_AUTHOR("liqiang64@huawei.com"); diff --git a/qtfs/qtfs_common/conn.c b/qtfs/qtfs_common/conn.c index 1c197b6d958a80b513451fafb56357f8d0f36895..6926d6dfc0938d9a6dd0b8e144b1636f7e9ef896 100644 --- a/qtfs/qtfs_common/conn.c +++ b/qtfs/qtfs_common/conn.c @@ -28,7 +28,7 @@ struct qtfs_pvar_ops_s *g_pvar_ops = NULL; char qtfs_log_level[QTFS_LOGLEVEL_STRLEN] = {0}; char qtfs_conn_type[20] = QTFS_CONN_SOCK_TYPE; int log_level = LOG_ERROR; -int qtfs_conn_max_conn = QTFS_MAX_THREADS; +static int qtfs_conn_max_conn = QTFS_MAX_THREADS; struct qtinfo *qtfs_diag_info = NULL; bool qtfs_epoll_mode = false; // true: support any mode; false: only support fifo @@ -196,12 +196,12 @@ end: int qtfs_conn_init(struct qtfs_conn_var_s *pvar) { - return pvar->conn_ops->conn_init(pvar); + return pvar->conn_ops->conn_init(&pvar->conn_var, pvar->user_type); } void qtfs_conn_fini(struct qtfs_conn_var_s *pvar) { - return pvar->conn_ops->conn_fini(pvar); + return pvar->conn_ops->conn_fini(&pvar->conn_var, pvar->user_type); } int qtfs_conn_send(struct qtfs_conn_var_s *pvar) @@ -209,16 +209,69 @@ int qtfs_conn_send(struct qtfs_conn_var_s *pvar) if (pvar->vec_send.iov_len > QTFS_MSG_LEN) return -EMSGSIZE; pvar->send_valid = pvar->vec_send.iov_len; - return pvar->conn_ops->conn_send(pvar); + return pvar->conn_ops->conn_send(&pvar->conn_var, pvar->vec_send.iov_base, pvar->vec_send.iov_len); } int do_qtfs_conn_recv(struct qtfs_conn_var_s *pvar, bool block) { - int ret = pvar->conn_ops->conn_recv(pvar, block); - if (ret > 0) { - pvar->recv_valid = ret; + int ret; + int headlen = 0; + int total = 0; + struct qtreq *rsp = NULL; + struct kvec load; + unsigned long retrytimes = 0; + + headlen = pvar->conn_ops->conn_recv(&pvar->conn_var, pvar->vec_recv.iov_base, QTFS_MSG_HEAD_LEN, block); + + if (headlen <= 0) { + return headlen; } - return ret; + + load.iov_base = pvar->vec_recv.iov_base + QTFS_MSG_HEAD_LEN; + load.iov_len = pvar->vec_recv.iov_len - QTFS_MSG_HEAD_LEN; + total = 0; + rsp = pvar->vec_recv.iov_base; + if (rsp->len > load.iov_len) { + qtfs_err("qtfs recv head invalid len is:%lu", rsp->len); + return -EINVAL; + } + while (total < rsp->len) { +retry: + ret = pvar->conn_ops->conn_recv(&pvar->conn_var, load.iov_base, rsp->len - total, block); + if (ret == 0) break; + if (ret == -EAGAIN) + goto retry; + if (ret == -ERESTARTSYS || ret == -EINTR) { +#ifdef QTFS_CLIENT + if (retrytimes == 0) { + qtinfo_cntinc(QTINF_RESTART_SYS); + qtinfo_recverrinc(rsp->type); + } +#endif + retrytimes++; + msleep(1); + goto retry; + } + if (ret < 0) { + qtfs_err("qtfs recv get invalidelen is :%d", ret); + return ret; + } + total += ret; + load.iov_base += ret; + load.iov_len -= ret; + if (load.iov_base > (pvar->vec_recv.iov_base + pvar->vec_recv.iov_len)) { + qtfs_err("qtfs recv error, total:%d iovlen:%lu ret:%d rsplen:%lu", total, + pvar->vec_recv.iov_len, ret, rsp->len); + WARN_ON(1); + break; + } + } + if (total > rsp->len) { + qtfs_crit("recv total:%d msg len:%lu\n", total, rsp->len); + WARN_ON(1); + } + pvar->recv_valid = total + headlen; + return pvar->recv_valid; } int qtfs_conn_recv_block(struct qtfs_conn_var_s *pvar) @@ -301,7 +354,7 @@ static int qtfs_sm_connecting(struct qtfs_conn_var_s *pvar) int ret = QTERROR; #ifdef QTFS_SERVER - ret = pvar->conn_ops->conn_server_accept(pvar); + ret = pvar->conn_ops->conn_server_accept(&pvar->conn_var, pvar->user_type); if (ret == 0) { qtfs_info("qtfs sm connecting accept a new connection"); } else { @@ -313,7 +366,7 @@ static int qtfs_sm_connecting(struct qtfs_conn_var_s *pvar) qtfs_info("qtfs sm connecting wait for server thread:%d", pvar->cur_threadidx); retry = 3; while (qtfs_mod_exiting == false && retry-- > 0) { - ret = pvar->conn_ops->conn_client_connect(pvar); + ret = pvar->conn_ops->conn_client_connect(&pvar->conn_var); if (ret == 0) { qtfs_info("qtfs sm connecting connect to a new connection."); break; @@ -542,7 +595,7 @@ retry: if (pvar != NULL) { int ret; - if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(pvar) == false) { + if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(&pvar->conn_var) == false) { qtfs_warn("qtfs get param thread:%d disconnected, try to reconnect.", pvar->cur_threadidx); ret = qtfs_sm_reconnect(pvar); } else { @@ -578,7 +631,8 @@ retry: } memset(pvar, 0, sizeof(struct qtfs_conn_var_s)); // initialize conn_pvar here - g_pvar_ops->pvar_init(pvar); + pvar->user_type = QTFS_CONN_TYPE_QTFS; + g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type); if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) { qtfs_err("qtfs sock var init failed.\n"); kfree(pvar); @@ -611,7 +665,7 @@ retry: qtfs_thread_var[pvar->cur_threadidx] = pvar; #else pvar->cs = QTFS_CONN_SOCK_SERVER; - if (!pvar->conn_ops->conn_inited(pvar)) { + if (!pvar->conn_ops->conn_inited(pvar, pvar->user_type)) { if ((ret = qtfs_sm_active(pvar)) != 0) { qtfs_err("qtfs get param active connection failed, ret:%d, curstate:%s", ret, QTCONN_CUR_STATE(pvar)); // put to vld list @@ -643,7 +697,7 @@ struct qtfs_conn_var_s *qtfs_epoll_establish_conn(void) pvar = qtfs_epoll_var; if (pvar) { - if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(pvar) == false) { + if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(&pvar->conn_var) == false) { qtfs_warn("qtfs epoll get param thread:%d disconnected, try to reconnect.", pvar->cur_threadidx); ret = qtfs_sm_reconnect(pvar); } else { @@ -662,8 +716,9 @@ struct qtfs_conn_var_s *qtfs_epoll_establish_conn(void) } memset(pvar, 0, sizeof(struct qtfs_conn_var_s)); qtfs_epoll_var = pvar; + pvar->user_type = QTFS_CONN_TYPE_EPOLL; pvar->cur_threadidx = QTFS_EPOLL_THREADIDX; - g_pvar_ops->pvar_init(pvar); + g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type); if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) { qtfs_err("qtfs sock var init failed.\n"); kfree(pvar); @@ -750,3 +805,6 @@ void qtfs_conn_list_cnt(void) #endif } +module_param(qtfs_conn_max_conn, int, 0600); +module_param_string(qtfs_log_level, qtfs_log_level, sizeof(qtfs_log_level), 0600); +module_param_string(qtfs_conn_type, qtfs_conn_type, sizeof(qtfs_conn_type), 0600); \ No newline at end of file diff --git a/qtfs/qtfs_common/socket.c b/qtfs/qtfs_common/socket.c index 47bda410401fca7b4c233ae0cf493d77c3942eeb..9e999f5ea7622e352287c1541a6fd42617dcf45f 100644 --- a/qtfs/qtfs_common/socket.c +++ b/qtfs/qtfs_common/socket.c @@ -21,7 +21,7 @@ unsigned int qtfs_server_vsock_cid = 2; // host cid in vm is always 2 #endif #ifdef QTFS_SERVER -struct socket *qtfs_server_main_sock = NULL; +struct socket *qtfs_server_main_sock[QTFS_CONN_TYPE_INV] = {NULL}; #endif #ifdef KVER_4_19 @@ -108,9 +108,9 @@ void sock_set_reuseaddr(struct sock *sk) #define QTSOCK_SET_KEEPX(sock, val) sock_set_keepalive(sock->sk); tcp_sock_set_keepcnt(sock->sk, val);\ tcp_sock_set_keepidle(sock->sk, val); tcp_sock_set_keepintvl(sock->sk, val); -static int qtfs_conn_sock_recv(struct qtfs_conn_var_s *pvar, bool block); -static int qtfs_conn_sock_send(struct qtfs_conn_var_s *pvar); -static void qtfs_conn_sock_fini(struct qtfs_conn_var_s *pvar); +static int qtfs_conn_sock_recv(void *connvar, void *buf, size_t len, bool block); +static int qtfs_conn_sock_send(void *connvar, void *buf, size_t len); +static void qtfs_conn_sock_fini(void *connvar, qtfs_conn_type_e type); void qtfs_sock_recvtimeo_set(struct socket *sock, __s64 sec, __s64 usec) { @@ -142,23 +142,23 @@ void qtfs_sock_recvtimeo_set(struct socket *sock, __s64 sec, __s64 usec) } #ifdef QTFS_SERVER -static int qtfs_conn_sock_server_accept(struct qtfs_conn_var_s *pvar) +static int qtfs_conn_sock_server_accept(void *connvar, qtfs_conn_type_e type) { - struct socket *sock = NULL; + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; int ret; - - if (!QTCONN_IS_EPOLL_CONN(pvar)) { - sock = qtfs_server_main_sock; - } else { - sock = pvar->conn_var.sock_var.sock; + struct socket *sock = NULL; + if (type >= QTFS_CONN_TYPE_INV || qtfs_server_main_sock[type] == NULL) { + qtfs_err("invalid type:%u or main sock is invalid.", type); + return -EFAULT; } + sock = qtfs_server_main_sock[type]; if (sock == NULL) { WARN_ON(1); - qtfs_err("qtfs server accept failed, main sock is NULL, threadidx:%d.", pvar->cur_threadidx); + qtfs_err("qtfs server accept failed, main sock is NULL."); return -EINVAL; } - ret = kernel_accept(sock, &pvar->conn_var.sock_var.client_sock, SOCK_NONBLOCK); + ret = kernel_accept(sock, &sockvar->client_sock, SOCK_NONBLOCK); if (ret < 0) { return ret; } @@ -169,12 +169,13 @@ static int qtfs_conn_sock_server_accept(struct qtfs_conn_var_s *pvar) #endif qtfs_info("qtfs accept a client connection.\n"); - qtfs_sock_recvtimeo_set(pvar->conn_var.sock_var.client_sock, QTFS_SOCK_RCVTIMEO, 0); + qtfs_sock_recvtimeo_set(sockvar->client_sock, QTFS_SOCK_RCVTIMEO, 0); return 0; } -static int qtfs_conn_sockserver_init(struct qtfs_conn_var_s *pvar) +static int qtfs_conn_sock_init(void *connvar, qtfs_conn_type_e type) { + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; struct socket *sock; int ret; int sock_family = AF_VSOCK; @@ -182,25 +183,21 @@ static int qtfs_conn_sockserver_init(struct qtfs_conn_var_s *pvar) struct sockaddr_in saddr; sock_family = AF_INET; saddr.sin_family = sock_family; - saddr.sin_port = htons(pvar->conn_var.sock_var.port); - saddr.sin_addr.s_addr = in_aton(pvar->conn_var.sock_var.addr); + saddr.sin_port = htons(sockvar->port); + saddr.sin_addr.s_addr = in_aton(sockvar->addr); #else struct sockaddr_vm saddr; sock_family = AF_VSOCK; saddr.svm_family = sock_family; - saddr.svm_port = pvar->conn_var.sock_var.vm_port; - saddr.svm_cid = pvar->conn_var.sock_var.vm_cid; + saddr.svm_port = sockvar->vm_port; + saddr.svm_cid = sockvar->vm_cid; #endif - - if (!QTCONN_IS_EPOLL_CONN(pvar) && qtfs_server_main_sock != NULL) { - qtfs_info("qtfs server main sock is set, valid or out-of-date?"); - return 0; - } - if (QTCONN_IS_EPOLL_CONN(pvar) && pvar->conn_var.sock_var.sock != NULL) { - qtfs_info("qtfs server epoll sock is set, valid or out-of-date?"); + if (type >= QTFS_CONN_TYPE_INV || qtfs_server_main_sock[type] != NULL) { + qtfs_info("qtfs conn type:%u main sock is set, valid or out-of-date?", type); return 0; } - qtfs_info("qtfs sock server init enter threadidx:%d", pvar->cur_threadidx); + + qtfs_info("qtfs sock server init enter"); ret = sock_create_kern(&init_net, sock_family, SOCK_STREAM, 0, &sock); if (ret) { @@ -215,7 +212,6 @@ static int qtfs_conn_sockserver_init(struct qtfs_conn_var_s *pvar) sock_set_keepalive(sock->sk); #endif - ret = sock->ops->bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)); if (ret < 0) { qtfs_err("qtfs sock server bind error: %d.\n", ret); @@ -228,14 +224,8 @@ static int qtfs_conn_sockserver_init(struct qtfs_conn_var_s *pvar) goto err_end; } - if (!QTCONN_IS_EPOLL_CONN(pvar)) { - qtfs_server_main_sock = sock; - qtfs_info("qtfs thread main sock get, threadidx:%d.", pvar->cur_threadidx); - } else { - pvar->conn_var.sock_var.sock = sock; - qtfs_info("qtfs epoll main sock get, threadidx:%d.", pvar->cur_threadidx); - } - + qtfs_info("qtfs socket init sock OK!"); + qtfs_server_main_sock[type] = sock; return 0; err_end: @@ -244,20 +234,21 @@ err_end: } #endif #ifdef QTFS_CLIENT -static int qtfs_conn_sock_client_connect(struct qtfs_conn_var_s *pvar) +static int qtfs_conn_sock_client_connect(void *connvar) { - struct socket *sock = pvar->conn_var.sock_var.client_sock; + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct socket *sock = sockvar->client_sock; int ret; #ifdef QTFS_TEST_MODE struct sockaddr_in saddr; saddr.sin_family = AF_INET; - saddr.sin_port = htons(pvar->conn_var.sock_var.port); - saddr.sin_addr.s_addr = in_aton(pvar->conn_var.sock_var.addr); + saddr.sin_port = htons(sockvar->port); + saddr.sin_addr.s_addr = in_aton(sockvar->addr); #else struct sockaddr_vm saddr; saddr.svm_family = AF_VSOCK; - saddr.svm_port = pvar->conn_var.sock_var.vm_port; - saddr.svm_cid = pvar->conn_var.sock_var.vm_cid; + saddr.svm_port = sockvar->vm_port; + saddr.svm_cid = sockvar->vm_cid; #endif if (!sock) { qtfs_err("Invalid client sock, which is null\n"); @@ -266,7 +257,11 @@ static int qtfs_conn_sock_client_connect(struct qtfs_conn_var_s *pvar) ret = sock->ops->connect(sock, (struct sockaddr *)&saddr, sizeof(saddr), 0); if (ret < 0) { - qtfs_err("sock addr(%s): connect get ret: %d\n", pvar->conn_var.sock_var.addr, ret); +#ifdef QTFS_TEST_MODE + qtfs_err("sock addr(%s): connect get ret: %d\n", sockvar->addr, ret); +#else + qtfs_err("vsock cid:%u: connect get ret: %d\n", sockvar->vm_cid, ret); +#endif return ret; } #ifdef QTFS_TEST_MODE @@ -275,11 +270,13 @@ static int qtfs_conn_sock_client_connect(struct qtfs_conn_var_s *pvar) sock_set_keepalive(sock->sk); #endif - qtfs_sock_recvtimeo_set(pvar->conn_var.sock_var.client_sock, QTFS_SOCK_RCVTIMEO, 0); + qtfs_sock_recvtimeo_set(sockvar->client_sock, QTFS_SOCK_RCVTIMEO, 0); return 0; } -static int qtfs_conn_sockclient_init(struct qtfs_conn_var_s *pvar) +//client侧type用不上 +static int qtfs_conn_sock_init(void *connvar, qtfs_conn_type_e type) { + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; struct socket *sock; int ret; @@ -297,140 +294,81 @@ static int qtfs_conn_sockclient_init(struct qtfs_conn_var_s *pvar) #else sock_set_keepalive(sock->sk); #endif - pvar->conn_var.sock_var.client_sock = sock; + sockvar->client_sock = sock; return 0; } #endif -int qtfs_conn_sock_init(struct qtfs_conn_var_s *pvar) +static int qtfs_conn_sock_recv(void *connvar, void *buf, size_t len, bool block) { - int ret = -EINVAL; - - if (pvar->conn_var.sock_var.client_sock != NULL) { - WARN_ON(1); - qtfs_err("qtfs connection socket init client_sock not NULL!"); - } -#ifdef QTFS_SERVER - ret = qtfs_conn_sockserver_init(pvar); -#endif -#ifdef QTFS_CLIENT - ret = qtfs_conn_sockclient_init(pvar); -#endif - return ret; + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct kvec v; + memset(&sockvar->msg_recv, 0, sizeof(sockvar->msg_recv)); + v.iov_base = buf; + v.iov_len = len; + + return kernel_recvmsg(sockvar->client_sock, &sockvar->msg_recv, &v, 1, + len, (block == true) ? 0 : MSG_DONTWAIT); } -static int qtfs_conn_sock_recv(struct qtfs_conn_var_s *pvar, bool block) +static int qtfs_conn_sock_send(void *connvar, void *buf, size_t len) { + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct kvec v; int ret; - int headlen = 0; - int total = 0; - struct qtreq *rsp = NULL; - struct kvec load; - unsigned long retrytimes = 0; - - memset(&pvar->msg_recv, 0, sizeof(pvar->msg_recv)); - - headlen = kernel_recvmsg(pvar->conn_var.sock_var.client_sock, &pvar->msg_recv, &pvar->vec_recv, 1, - QTFS_MSG_HEAD_LEN, (block == true) ? 0 : MSG_DONTWAIT); - if (headlen <= 0) { - return headlen; - } + + v.iov_base = buf; + v.iov_len = len; - load.iov_base = pvar->vec_recv.iov_base + QTFS_MSG_HEAD_LEN; - load.iov_len = pvar->vec_recv.iov_len - QTFS_MSG_HEAD_LEN; - total = 0; - rsp = pvar->vec_recv.iov_base; - if (rsp->len > load.iov_len) { - qtfs_err("qtfs recv head invalid len is:%lu", rsp->len); - return -EINVAL; - } - while (total < rsp->len) { -retry: - ret = kernel_recvmsg(pvar->conn_var.sock_var.client_sock, &pvar->msg_recv, &load, 1, - rsp->len - total, (block == true) ? 0 : MSG_DONTWAIT); - if (ret == 0) break; - if (ret == -EAGAIN) - goto retry; - if (ret == -ERESTARTSYS || ret == -EINTR) { -#ifdef QTFS_CLIENT - if (retrytimes == 0) { - qtinfo_cntinc(QTINF_RESTART_SYS); - qtinfo_recverrinc(rsp->type); - } -#endif - retrytimes++; - msleep(1); - goto retry; - } - if (ret < 0) { - qtfs_err("qtfs recv get invalidelen is:%d", ret); - return ret; - } - total += ret; - load.iov_base += ret; - load.iov_len -= ret; - if (load.iov_base > (pvar->vec_recv.iov_base + pvar->vec_recv.iov_len)) { - qtfs_err("qtfs recv error, total:%d iovlen:%lu ret:%d rsplen:%lu", total, - pvar->vec_recv.iov_len, ret, rsp->len); - WARN_ON(1); - break; - } - } - if (total > rsp->len) { - qtfs_crit("recv total:%d msg len:%lu\n", total, rsp->len); - WARN_ON(1); - } - - return total + headlen; -} - -static int qtfs_conn_sock_send(struct qtfs_conn_var_s *pvar) -{ - int ret = kernel_sendmsg(pvar->conn_var.sock_var.client_sock, &pvar->msg_send, &pvar->vec_send, 1, - pvar->vec_send.iov_len); + ret = kernel_sendmsg(sockvar->client_sock, &sockvar->msg_send, &v, 1, len); if (ret < 0) { qtfs_err("qtfs sock send error, ret:%d.\n", ret); } return ret; } -static void qtfs_conn_sock_fini(struct qtfs_conn_var_s *pvar) +static void qtfs_conn_sock_fini(void *connvar, qtfs_conn_type_e type) { - if (pvar->conn_var.sock_var.client_sock == NULL) { + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + if (sockvar->client_sock == NULL) { qtfs_err("qtfs client sock is NULL during sock_fini"); } - if (pvar->conn_var.sock_var.client_sock != NULL) { - qtfs_err("qtfs conn sock finish threadidx:%d.", pvar->cur_threadidx); - sock_release(pvar->conn_var.sock_var.client_sock); - pvar->conn_var.sock_var.client_sock = NULL; + if (sockvar->client_sock != NULL) { + qtfs_err("qtfs conn sock finish."); + sock_release(sockvar->client_sock); + sockvar->client_sock = NULL; } - if (pvar->conn_var.sock_var.sock != NULL) { - sock_release(pvar->conn_var.sock_var.sock); - pvar->conn_var.sock_var.sock = NULL; +#ifdef QTFS_SERVER + if (type < QTFS_CONN_TYPE_INV && qtfs_server_main_sock[type] != NULL) { + sock_release(qtfs_server_main_sock[type]); + qtfs_server_main_sock[type] = NULL; } +#endif return; } -static bool qtfs_conn_sock_connected(struct qtfs_conn_var_s *pvar) +static bool qtfs_conn_sock_connected(void *connvar) { - struct socket *sock = pvar->conn_var.sock_var.client_sock; + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + struct socket *sock = sockvar->client_sock; __u8 tcpi_state; if (sock == NULL) return false; tcpi_state = inet_sk_state_load(sock->sk); if (tcpi_state == TCP_ESTABLISHED) return true; - qtfs_warn("qtfs threadidx:%d tcpi state:%u(define:TCP_ESTABLISHED=1 is connected) disconnect!", pvar->cur_threadidx, tcpi_state); + qtfs_warn("qtfs tcpi state:%u(define:TCP_ESTABLISHED=1 is connected) disconnect!", tcpi_state); return false; } #ifdef QTFS_CLIENT -void qtfs_sock_drop_recv_buf(struct qtfs_conn_var_s *pvar) +void qtfs_sock_drop_recv_buf(void *connvar) { #define TMP_STACK_LEN 64 + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; int ret = 0; char buf[TMP_STACK_LEN]; struct kvec vec_recv; @@ -438,7 +376,7 @@ void qtfs_sock_drop_recv_buf(struct qtfs_conn_var_s *pvar) vec_recv.iov_base = buf; vec_recv.iov_len = TMP_STACK_LEN; do { - ret = kernel_recvmsg(pvar->conn_var.sock_var.client_sock, &msg_recv, &vec_recv, 1, + ret = kernel_recvmsg(sockvar->client_sock, &msg_recv, &vec_recv, 1, vec_recv.iov_len, MSG_DONTWAIT); if (ret > 0) { qtfs_err("drop invalid data len:%d", ret); @@ -449,9 +387,13 @@ void qtfs_sock_drop_recv_buf(struct qtfs_conn_var_s *pvar) #endif #ifdef QTFS_SERVER -static bool qtfs_conn_sock_inited(struct qtfs_conn_var_s *pvar) +static bool qtfs_conn_sock_inited(void *connvar, qtfs_conn_type_e type) { - return qtfs_server_main_sock != NULL; + if (type >= QTFS_CONN_TYPE_INV) { + qtfs_err("invalid type:%u", type); + return false; + } + return qtfs_server_main_sock[type] != NULL; } #endif @@ -463,9 +405,13 @@ int qtfs_sock_param_init(void) int qtfs_sock_param_fini(void) { #ifdef QTFS_SERVER - if (qtfs_server_main_sock != NULL) { - sock_release(qtfs_server_main_sock); - qtfs_server_main_sock = NULL; + if (qtfs_server_main_sock[QTFS_CONN_TYPE_QTFS] != NULL) { + sock_release(qtfs_server_main_sock[QTFS_CONN_TYPE_QTFS]); + qtfs_server_main_sock[QTFS_CONN_TYPE_QTFS] = NULL; + } + if (qtfs_server_main_sock[QTFS_CONN_TYPE_FIFO] != NULL) { + sock_release(qtfs_server_main_sock[QTFS_CONN_TYPE_FIFO]); + qtfs_server_main_sock[QTFS_CONN_TYPE_FIFO] = NULL; } #endif return 0; @@ -491,26 +437,25 @@ struct qtfs_conn_ops_s qtfs_conn_sock_ops = { .conn_connected = qtfs_conn_sock_connected, }; -int qtfs_sock_pvar_init(struct qtfs_conn_var_s *pvar) +int qtfs_sock_pvar_init(void *connvar, struct qtfs_conn_ops_s **conn_ops, qtfs_conn_type_e type) { + struct qtfs_sock_var_s *sockvar = (struct qtfs_sock_var_s *)connvar; + + if (type >= QTFS_CONN_TYPE_INV) { + qtfs_err("invalid type:%u", type); + return -1; + } + #ifdef QTFS_TEST_MODE // fill conn_pvar struct here - strlcpy(pvar->conn_var.sock_var.addr, qtfs_server_ip, sizeof(pvar->conn_var.sock_var.addr)); - if (QTCONN_IS_EPOLL_CONN(pvar)) { - pvar->conn_var.sock_var.port = qtfs_server_port + 1; - } else { - pvar->conn_var.sock_var.port = qtfs_server_port; - } + strlcpy(sockvar->addr, qtfs_server_ip, sizeof(sockvar->addr)); + sockvar->port = qtfs_server_port + type; #else // vsock - if (QTCONN_IS_EPOLL_CONN(pvar)) { - pvar->conn_var.sock_var.vm_port = qtfs_server_vsock_port + 1; - } else { - pvar->conn_var.sock_var.vm_port = qtfs_server_vsock_port; - } - pvar->conn_var.sock_var.vm_cid = qtfs_server_vsock_cid; + sockvar->vm_cid = qtfs_server_vsock_cid; + sockvar->vm_port = qtfs_server_vsock_port + type; #endif - pvar->conn_ops = &qtfs_conn_sock_ops; + *conn_ops = &qtfs_conn_sock_ops; return 0; } @@ -526,3 +471,12 @@ struct qtfs_pvar_ops_s qtfs_conn_sock_pvar_ops = { .param_fini = qtfs_sock_param_fini, .pvar_init = qtfs_sock_pvar_init }; + +#ifdef QTFS_TEST_MODE +module_param_string(qtfs_server_ip, qtfs_server_ip, sizeof(qtfs_server_ip), 0600); +MODULE_PARM_DESC(qtfs_server_ip, "qtfs server ip"); +module_param(qtfs_server_port, int, 0600); +#else +module_param(qtfs_server_vsock_port, uint, 0600); +module_param(qtfs_server_vsock_cid, uint, 0600); +#endif \ No newline at end of file diff --git a/qtfs/qtfs_server/qtfs-server.c b/qtfs/qtfs_server/qtfs-server.c index b993b8be461dff037d5b7741d923fe92edc96242..fb0255bdb8e702f0a4b308c1c15bda5a68fcb3da 100644 --- a/qtfs/qtfs_server/qtfs-server.c +++ b/qtfs/qtfs_server/qtfs-server.c @@ -60,7 +60,7 @@ long qtfs_server_epoll_thread(struct qtfs_conn_var_s *pvar) qtfs_err("qtfs epoll wait error, epfd is invalid."); return QTERROR; } - if (false == pvar->conn_ops->conn_connected(pvar)) { + if (false == pvar->conn_ops->conn_connected(&pvar->conn_var)) { qtfs_warn("qtfs epoll thread disconnected, now try to reconnect."); ret = qtfs_sm_reconnect(pvar); } else { @@ -419,19 +419,6 @@ static void __exit qtfs_server_exit(void) return; } -#ifdef QTFS_TEST_MODE -module_param_string(qtfs_server_ip, qtfs_server_ip, sizeof(qtfs_server_ip), 0600); -MODULE_PARM_DESC(qtfs_server_ip, "qtfs server ip"); -module_param(qtfs_server_port, int, 0600); -#else -module_param(qtfs_server_vsock_port, uint, 0600); -module_param(qtfs_server_vsock_cid, uint, 0600); -#endif - -module_param(qtfs_conn_max_conn, int, 0600); -module_param_string(qtfs_log_level, qtfs_log_level, sizeof(qtfs_log_level), 0600); -module_param_string(qtfs_conn_type, qtfs_conn_type, sizeof(qtfs_conn_type), 0600); - module_init(qtfs_server_init); module_exit(qtfs_server_exit); MODULE_AUTHOR("liqiang64@huawei.com");