diff --git a/Documentation/networking/smc-sysctl.rst b/Documentation/networking/smc-sysctl.rst new file mode 100644 index 0000000000000000000000000000000000000000..c53f8c61c9e488c5945520f4dadce3caf3a994c7 --- /dev/null +++ b/Documentation/networking/smc-sysctl.rst @@ -0,0 +1,23 @@ +.. SPDX-License-Identifier: GPL-2.0 + +========= +SMC Sysctl +========= + +/proc/sys/net/smc/* Variables +============================== + +autocorking_size - INTEGER + Setting SMC auto corking size: + SMC auto corking is like TCP auto corking from the application's + perspective of view. When applications do consecutive small + write()/sendmsg() system calls, we try to coalesce these small writes + as much as possible, to lower total amount of CDC and RDMA Write been + sent. + autocorking_size limits the maximum corked bytes that can be sent to + the under device in 1 single sending. If set to 0, the SMC auto corking + is disabled. + Applications can still use TCP_CORK for optimal behavior when they + know how/when to uncork their sockets. + + Default: 64K diff --git a/include/linux/socket.h b/include/linux/socket.h index fee0fdcd63c26e4b78c0a679df95b16c90abefff..4005895fe296b4d2fffc9c13873acdab16092196 100644 --- a/include/linux/socket.h +++ b/include/linux/socket.h @@ -360,7 +360,6 @@ struct ucred { #define SOL_KCM 281 #define SOL_TLS 282 #define SOL_XDP 283 - #define SOL_SMC 286 /* IPX options */ diff --git a/include/net/netns/smc.h b/include/net/netns/smc.h index a31a6390c6291b32e1b82f243144256e78c1ddb5..c6609ca1b104f2c72bd766edca816066732c952c 100644 --- a/include/net/netns/smc.h +++ b/include/net/netns/smc.h @@ -19,17 +19,16 @@ struct netns_smc { /* protect fback_rsn */ struct mutex mutex_fback_rsn; struct smc_stats_rsn *fback_rsn; + bool limit_smc_hs; /* constraint on handshake */ struct smc_convert smc_conv; #ifdef CONFIG_SYSCTL struct ctl_table_header *smc_hdr; #endif + unsigned int sysctl_autocorking_size; int sysctl_wmem_default; int sysctl_rmem_default; int sysctl_tcp2smc; - int sysctl_autocorking; int sysctl_allow_different_subnet; - bool limit_smc_hs; /* constraint on handshake */ int sysctl_keep_first_contact_clcsock; }; - #endif diff --git a/include/uapi/linux/smc.h b/include/uapi/linux/smc.h index 41a446b379a0156bcfd8718ea451b0702505026d..759bcb2ff03effa84d434dd6bdafa0a3564fa2a2 100644 --- a/include/uapi/linux/smc.h +++ b/include/uapi/linux/smc.h @@ -59,12 +59,12 @@ enum { SMC_NETLINK_DUMP_SEID, SMC_NETLINK_ENABLE_SEID, SMC_NETLINK_DISABLE_SEID, - SMC_NETLINK_ADD_TCP2SMC_WLIST, - SMC_NETLINK_DEL_TCP2SMC_WLIST, - SMC_NETLINK_GET_TCP2SMC_WLIST, SMC_NETLINK_DUMP_HS_LIMITATION, SMC_NETLINK_ENABLE_HS_LIMITATION, SMC_NETLINK_DISABLE_HS_LIMITATION, + SMC_NETLINK_ADD_TCP2SMC_WLIST, + SMC_NETLINK_DEL_TCP2SMC_WLIST, + SMC_NETLINK_GET_TCP2SMC_WLIST, }; /* SMC_GENL_FAMILY top level attributes */ diff --git a/include/uapi/linux/smc_diag.h b/include/uapi/linux/smc_diag.h index b9b7bf4dacc87e2b7c8069e937c21fbc9c15ce94..182efdd3ec915e7d38f49c99061e681c17db6b9b 100644 --- a/include/uapi/linux/smc_diag.h +++ b/include/uapi/linux/smc_diag.h @@ -95,8 +95,6 @@ struct smc_diag_linkinfo { __u8 ibport; /* RDMA device port number */ __u8 gid[40]; /* local GID */ __u8 peer_gid[40]; /* peer GID */ - __u64 link_down_cnt_smc; /* link down caused by SMC-R protocol */ - __u64 link_down_cnt_ib; /* link down caused by IB net device */ }; struct smc_diag_lgrinfo { diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c index 503f7df8d21f69e7ec765b46909407f994f3d948..bff2e05b33a29ffa1e42e3977b6cb3e73d724137 100644 --- a/net/smc/af_smc.c +++ b/net/smc/af_smc.c @@ -52,6 +52,7 @@ #include "smc_close.h" #include "smc_stats.h" #include "smc_tracepoint.h" +#include "smc_sysctl.h" #include "smc_proc.h" #include "smc_conv.h" @@ -135,7 +136,7 @@ static struct sock *smc_tcp_syn_recv_sock(const struct sock *sk, goto drop; } - /* passthrough to origin syn recv sock fct */ + /* passthrough to original syn recv sock fct */ return smc->ori_af_ops->syn_recv_sock(sk, skb, req, dst, req_unhash, own_req); @@ -197,12 +198,27 @@ void smc_unhash_sk(struct sock *sk) } EXPORT_SYMBOL_GPL(smc_unhash_sk); +/* This will be called before user really release sock_lock. So do the + * work which we didn't do because of user hold the sock_lock in the + * BH context + */ +static void smc_release_cb(struct sock *sk) +{ + struct smc_sock *smc = smc_sk(sk); + + if (smc->conn.tx_in_release_sock) { + smc_tx_pending(&smc->conn); + smc->conn.tx_in_release_sock = false; + } +} + struct proto smc_proto = { .name = "SMC", .owner = THIS_MODULE, .keepalive = smc_set_keepalive, .hash = smc_hash_sk, .unhash = smc_unhash_sk, + .release_cb = smc_release_cb, .obj_size = sizeof(struct smc_sock), .h.smc_hash = &smc_v4_hashinfo, .slab_flags = SLAB_TYPESAFE_BY_RCU, @@ -215,6 +231,7 @@ struct proto smc_proto6 = { .keepalive = smc_set_keepalive, .hash = smc_hash_sk, .unhash = smc_unhash_sk, + .release_cb = smc_release_cb, .obj_size = sizeof(struct smc_sock), .h.smc_hash = &smc_v6_hashinfo, .slab_flags = SLAB_TYPESAFE_BY_RCU, @@ -273,7 +290,7 @@ static int smc_release(struct socket *sock) { struct sock *sk = sock->sk; struct smc_sock *smc; - int rc = 0; + int old_state, rc = 0; if (!sk) goto out; @@ -281,8 +298,10 @@ static int smc_release(struct socket *sock) sock_hold(sk); /* sock_put below */ smc = smc_sk(sk); + old_state = sk->sk_state; + /* cleanup for a dangling non-blocking connect */ - if (smc->connect_nonblock && sk->sk_state == SMC_INIT) + if (smc->connect_nonblock && old_state == SMC_INIT) tcp_abort(smc->clcsock->sk, ECONNABORTED); if (cancel_work_sync(&smc->connect_work)) @@ -296,6 +315,10 @@ static int smc_release(struct socket *sock) else lock_sock(sk); + if (old_state == SMC_INIT && sk->sk_state == SMC_ACTIVE && + !smc->use_fallback) + smc_close_active_abort(smc); + rc = __smc_release(smc); /* detach socket */ @@ -1034,9 +1057,13 @@ static int smc_connect_clc(struct smc_sock *smc, rc = smc_clc_send_proposal(smc, ini); if (rc) return rc; + + release_sock(&smc->sk); /* receive SMC Accept CLC message */ - return smc_clc_wait_msg(smc, aclc2, SMC_CLC_MAX_ACCEPT_LEN, + rc = smc_clc_wait_msg(smc, aclc2, SMC_CLC_MAX_ACCEPT_LEN, SMC_CLC_ACCEPT, CLC_WAIT_TIME); + lock_sock(&smc->sk); + return rc; } void smc_fill_gid_list(struct smc_link_group *lgr, @@ -1396,8 +1423,14 @@ static int __smc_connect(struct smc_sock *smc) /* perform CLC handshake */ rc = smc_connect_clc(smc, aclc2, ini); - if (rc) + if (rc) { + /* -EAGAIN on timeout, see tcp_recvmsg() */ + if (rc == -EAGAIN) { + rc = -ETIMEDOUT; + smc->sk.sk_err = ETIMEDOUT; + } goto vlan_cleanup; + } /* check if smc modes and versions of CLC proposal and accept match */ rc = smc_connect_check_aclc(ini, aclc); @@ -2384,7 +2417,7 @@ static int smc_listen(struct socket *sock, int backlog) smc->clcsock->sk->sk_user_data = (void *)((uintptr_t)smc | SK_USER_DATA_NOCOPY); - /* save origin ops */ + /* save original ops */ smc->ori_af_ops = inet_csk(smc->clcsock->sk)->icsk_af_ops; smc->af_ops = *smc->ori_af_ops; @@ -2732,10 +2765,14 @@ static int __smc_setsockopt(struct socket *sock, int level, int optname, lock_sock(sk); switch (optname) { case SMC_LIMIT_HS: - if (optlen < sizeof(int)) - return -EINVAL; - if (copy_from_sockptr(&val, optval, sizeof(int))) - return -EFAULT; + if (optlen < sizeof(int)) { + rc = -EINVAL; + break; + } + if (copy_from_sockptr(&val, optval, sizeof(int))) { + rc = -EFAULT; + break; + } smc->limit_smc_hs = !!val; rc = 0; @@ -2808,8 +2845,8 @@ static int smc_setsockopt(struct socket *sock, int level, int optname, sk->sk_state != SMC_CLOSED) { if (val) { SMC_STAT_INC(smc, ndly_cnt); - mod_delayed_work(smc->conn.lgr->tx_wq, - &smc->conn.tx_work, 0); + smc_tx_pending(&smc->conn); + cancel_delayed_work(&smc->conn.tx_work); } } break; @@ -3173,7 +3210,6 @@ static __net_init int smc_net_init(struct net *net) init_net.smc.sysctl_rmem_default; net->smc.sysctl_tcp2smc = 0; net->smc.sysctl_allow_different_subnet = 0; - net->smc.sysctl_autocorking = 1; net->smc.sysctl_keep_first_contact_clcsock = 1; } @@ -3321,16 +3357,19 @@ static int __init smc_init(void) init_net.smc.sysctl_rmem_default = 384 * 1024; init_net.smc.sysctl_tcp2smc = 0; init_net.smc.sysctl_allow_different_subnet = 0; - init_net.smc.sysctl_autocorking = 1; init_net.smc.sysctl_keep_first_contact_clcsock = 1; -#ifdef CONFIG_SYSCTL - smc_sysctl_init(); -#endif + rc = smc_sysctl_init(); + if (rc) { + pr_err("%s: sysctl_init fails with %d\n", __func__, rc); + goto out_ulp; + } static_branch_enable(&tcp_have_smc); return 0; +out_ulp: + tcp_unregister_ulp(&smc_ulp_ops); out_conv: smc_conv_exit(); out_proc: @@ -3362,6 +3401,7 @@ static int __init smc_init(void) static void __exit smc_exit(void) { static_branch_disable(&tcp_have_smc); + smc_sysctl_exit(); tcp_unregister_ulp(&smc_ulp_ops); smc_conv_exit(); smc_proc_exit(); @@ -3378,9 +3418,6 @@ static void __exit smc_exit(void) smc_clc_exit(); unregister_pernet_subsys(&smc_net_stat_ops); unregister_pernet_subsys(&smc_net_ops); -#ifdef CONFIG_SYSCTL - smc_sysctl_exit(); -#endif rcu_barrier(); } diff --git a/net/smc/smc.h b/net/smc/smc.h index f5edc13955b84060eddb908a38486f711fc032fd..040c6a592c6b369bea153ebf0982d74c702f9f52 100644 --- a/net/smc/smc.h +++ b/net/smc/smc.h @@ -25,6 +25,7 @@ #define SMC_MAX_ISM_DEVS 8 /* max # of proposed non-native ISM * devices */ +#define SMC_AUTOCORKING_DEFAULT_SIZE 0x10000 /* 64K by default */ extern struct proto smc_proto; extern struct proto smc_proto6; @@ -189,7 +190,6 @@ struct smc_connection { */ wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/ atomic_t tx_pushing; /* nr_threads trying tx push */ - struct delayed_work tx_work; /* retry of smc_cdc_msg_send */ u32 tx_off; /* base offset in peer rmb */ @@ -209,6 +209,10 @@ struct smc_connection { * data still pending */ char urg_rx_byte; /* urgent byte */ + bool tx_in_release_sock; + /* flush pending tx data in + * sock release_cb() + */ atomic_t bytes_to_rcv; /* arrived data, * not yet received */ @@ -342,11 +346,6 @@ void smc_fill_gid_list(struct smc_link_group *lgr, struct smc_gidlist *gidlist, struct smc_ib_device *known_dev, u8 *known_gid); -#ifdef CONFIG_SYSCTL -int smc_sysctl_init(void); -void smc_sysctl_exit(void); -#endif - /* smc handshake limitation interface for netlink */ int smc_nl_dump_hs_limitation(struct sk_buff *skb, struct netlink_callback *cb); int smc_nl_enable_hs_limitation(struct sk_buff *skb, struct genl_info *info); diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c index 7727a8fdca0fc486bb168e61bbdf51f339f6c3f0..84eed367699e686f0dc4fe5e1770658ff0be4e26 100644 --- a/net/smc/smc_cdc.c +++ b/net/smc/smc_cdc.c @@ -49,10 +49,15 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, } if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { - /* If this is the last pending WR complete, push them to prevent - * no one trying to push when corked. + /* If user owns the sock_lock, mark the connection need sending. + * User context will later try to send when it release sock_lock + * in smc_release_cb() */ - smc_tx_sndbuf_nonempty(conn); + if (sock_owned_by_user(&smc->sk)) + conn->tx_in_release_sock = true; + else + smc_tx_pending(conn); + if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) wake_up(&conn->cdc_pend_tx_wq); } @@ -360,8 +365,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc, /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ if ((diff_cons && smc_tx_prepared_sends(conn)) || conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || - conn->local_rx_ctrl.prod_flags.urg_data_pending) - smc_tx_sndbuf_nonempty(conn); + conn->local_rx_ctrl.prod_flags.urg_data_pending) { + if (!sock_owned_by_user(&smc->sk)) + smc_tx_pending(conn); + else + conn->tx_in_release_sock = true; + } if (diff_cons && conn->urg_tx_pend && atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) { diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c index 5397009113155d68f4a5d034266997e4437051fb..96abaf4ea122d47c0f91a5d6aab665c4407df59e 100644 --- a/net/smc/smc_core.c +++ b/net/smc/smc_core.c @@ -1186,8 +1186,8 @@ void smc_conn_free(struct smc_connection *conn) cancel_work_sync(&conn->abort_work); } if (!list_empty(&lgr->list)) { - smc_lgr_unregister_conn(conn); smc_buf_unuse(conn, lgr); /* allow buffer reuse */ + smc_lgr_unregister_conn(conn); } if (!lgr->conns_num) @@ -1907,7 +1907,8 @@ int smc_conn_create(struct smc_sock *smc, struct smc_init_info *ini) (ini->smcd_version == SMC_V2 || lgr->vlan_id == ini->vlan_id) && (role == SMC_CLNT || ini->is_smcd || - lgr->conns_num < SMC_RMBS_PER_LGR_MAX)) { + (lgr->conns_num < SMC_RMBS_PER_LGR_MAX && + !bitmap_full(lgr->rtokens_used_mask, SMC_RMBS_PER_LGR_MAX)))) { /* link group found */ ini->first_contact_local = 0; conn->lgr = lgr; @@ -2035,7 +2036,7 @@ static struct smc_buf_desc *smc_buf_get_slot(int compressed_bufsize, */ static inline int smc_rmb_wnd_update_limit(int rmbe_size) { - return min_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2); + return max_t(int, rmbe_size / 10, SOCK_MIN_SNDBUF / 2); } /* map an rmb buf to a link */ diff --git a/net/smc/smc_netlink.c b/net/smc/smc_netlink.c index e1c7ca925ceede3f8fcaa81dbed8026c69869cd1..52dba083b70e600296164ab6403d028545660497 100644 --- a/net/smc/smc_netlink.c +++ b/net/smc/smc_netlink.c @@ -112,21 +112,6 @@ static const struct genl_ops smc_gen_nl_ops[] = { .flags = GENL_ADMIN_PERM, .doit = smc_nl_disable_seid, }, - { - .cmd = SMC_NETLINK_ADD_TCP2SMC_WLIST, - /* can be retrieved by unprivileged users */ - .doit = smc_nl_add_tcp2smc_wlist, - }, - { - .cmd = SMC_NETLINK_DEL_TCP2SMC_WLIST, - /* can be retrieved by unprivileged users */ - .doit = smc_nl_del_tcp2smc_wlist, - }, - { - .cmd = SMC_NETLINK_GET_TCP2SMC_WLIST, - /* can be retrieved by unprivileged users */ - .dumpit = smc_nl_get_tcp2smc_wlist, - }, { .cmd = SMC_NETLINK_DUMP_HS_LIMITATION, /* can be retrieved by unprivileged users */ @@ -142,6 +127,21 @@ static const struct genl_ops smc_gen_nl_ops[] = { .flags = GENL_ADMIN_PERM, .doit = smc_nl_disable_hs_limitation, }, + { + .cmd = SMC_NETLINK_ADD_TCP2SMC_WLIST, + /* can be retrieved by unprivileged users */ + .doit = smc_nl_add_tcp2smc_wlist, + }, + { + .cmd = SMC_NETLINK_DEL_TCP2SMC_WLIST, + /* can be retrieved by unprivileged users */ + .doit = smc_nl_del_tcp2smc_wlist, + }, + { + .cmd = SMC_NETLINK_GET_TCP2SMC_WLIST, + /* can be retrieved by unprivileged users */ + .dumpit = smc_nl_get_tcp2smc_wlist, + }, }; static const struct nla_policy smc_gen_nl_policy[SMC_CMD_MAX_ATTR + 1] = { diff --git a/net/smc/smc_sysctl.c b/net/smc/smc_sysctl.c index b2c3dae5543e6969d1d601690763119409494d6f..c2ff96ffb35a0130cfde0cc7e7a95bfb5d179092 100644 --- a/net/smc/smc_sysctl.c +++ b/net/smc/smc_sysctl.c @@ -1,17 +1,34 @@ // SPDX-License-Identifier: GPL-2.0 +/* + * Shared Memory Communications over RDMA (SMC-R) and RoCE + * + * smc_sysctl.c: sysctl interface to SMC subsystem. + * + * Copyright (c) 2022, Alibaba Inc. + * + * Author: Tony Lu + * + */ -#include #include #include -#include #include +#include "smc.h" +#include "smc_sysctl.h" #include "smc_core.h" static int min_sndbuf = SMC_BUF_MIN_SIZE; static int min_rcvbuf = SMC_BUF_MIN_SIZE; static struct ctl_table smc_table[] = { + { + .procname = "autocorking_size", + .data = &init_net.smc.sysctl_autocorking_size, + .maxlen = sizeof(unsigned int), + .mode = 0644, + .proc_handler = proc_douintvec, + }, { .procname = "wmem_default", .data = &init_net.smc.sysctl_wmem_default, @@ -53,15 +70,6 @@ static struct ctl_table smc_table[] = { .extra1 = SYSCTL_ZERO, .extra2 = SYSCTL_ONE, }, - { - .procname = "autocorking", - .data = &init_net.smc.sysctl_autocorking, - .maxlen = sizeof(int), - .mode = 0644, - .proc_handler = proc_dointvec_minmax, - .extra1 = SYSCTL_ZERO, - .extra2 = SYSCTL_ONE, - }, { .procname = "keep_first_contact_clcsock", .data = &init_net.smc.sysctl_keep_first_contact_clcsock, @@ -94,6 +102,8 @@ static __net_init int smc_sysctl_init_net(struct net *net) if (!net->smc.smc_hdr) goto err_reg; + net->smc.sysctl_autocorking_size = SMC_AUTOCORKING_DEFAULT_SIZE; + return 0; err_reg: diff --git a/net/smc/smc_sysctl.h b/net/smc/smc_sysctl.h new file mode 100644 index 0000000000000000000000000000000000000000..49553ac236b64cf56812ba1c225d6ee6b7c8bf06 --- /dev/null +++ b/net/smc/smc_sysctl.h @@ -0,0 +1,32 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Shared Memory Communications over RDMA (SMC-R) and RoCE + * + * smc_sysctl.c: sysctl interface to SMC subsystem. + * + * Copyright (c) 2022, Alibaba Inc. + * + * Author: Tony Lu + * + */ + +#ifndef _SMC_SYSCTL_H +#define _SMC_SYSCTL_H + +#ifdef CONFIG_SYSCTL + +int smc_sysctl_init(void); +void smc_sysctl_exit(void); + +#else + +int smc_sysctl_init(void) +{ + return 0; +} + +void smc_sysctl_exit(void) { } + +#endif /* CONFIG_SYSCTL */ + +#endif /* _SMC_SYSCTL_H */ diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c index dac3f9634fd4023b982226b1eb378e3a4c92880a..a12dde653e27278a60d6a189e8498b8823981f4e 100644 --- a/net/smc/smc_tx.c +++ b/net/smc/smc_tx.c @@ -124,44 +124,56 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) return rc; } -/* Strategy: Nagle algorithm - * 1. The first message should never cork - * 2. If we have any inflight messages, wait for the first - * message back - * 3. The total corked message should not exceed min(64k, sendbuf/2) +static bool smc_tx_is_corked(struct smc_sock *smc) +{ + struct tcp_sock *tp = tcp_sk(smc->clcsock->sk); + + return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; +} + +/* If we have pending CDC messages, do not send: + * Because CQE of this CDC message will happen shortly, it gives + * a chance to coalesce future sendmsg() payload in to one RDMA Write, + * without need for a timer, and with no latency trade off. + * Algorithm here: + * 1. First message should never cork + * 2. If we have pending Tx CDC messages, wait for the first CDC + * message's completion + * 3. Don't cork to much data in a single RDMA Write to prevent burst + * traffic, total corked message should not exceed sendbuf/2 */ -static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) +static bool smc_should_autocork(struct smc_sock *smc) { struct smc_connection *conn = &smc->conn; - int prepared_send; + int corking_size; - /* First request && no more message should always pass */ - if (atomic_read(&conn->cdc_pend_tx_wr) == 0 && - !(msg->msg_flags & MSG_MORE)) - return false; + corking_size = min_t(unsigned int, conn->sndbuf_desc->len >> 1, + sock_net(&smc->sk)->smc.sysctl_autocorking_size); - /* If We have enough data in the send queue that have not been - * pushed, send immediately. - * Note, here we only care about the prepared_sends, but not - * sendbuf_space because sendbuf_space has nothing to do with - * corked data size. - */ - prepared_send = smc_tx_prepared_sends(conn); - if (prepared_send > min(64 * 1024, conn->sndbuf_desc->len >> 1)) + if (atomic_read(&conn->cdc_pend_tx_wr) == 0 || + smc_tx_prepared_sends(conn) > corking_size) return false; - - if (!sock_net(&smc->sk)->smc.sysctl_autocorking) - return false; - - /* All the other conditions should cork */ return true; } -static bool smc_tx_is_corked(struct smc_sock *smc) +static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) { - struct tcp_sock *tp = tcp_sk(smc->clcsock->sk); + struct smc_connection *conn = &smc->conn; - return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; + if (smc_should_autocork(smc)) + return true; + + /* for a corked socket defer the RDMA writes if + * sndbuf_space is still available. The applications + * should known how/when to uncork it. + */ + if ((msg->msg_flags & MSG_MORE || + smc_tx_is_corked(smc) || + msg->msg_flags & MSG_SENDPAGE_NOTLAST) && + atomic_read(&conn->sndbuf_space)) + return true; + + return false; } /* sndbuf producer: main API called by socket layer. @@ -210,13 +222,6 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) if (msg->msg_flags & MSG_OOB) conn->local_tx_ctrl.prod_flags.urg_data_pending = 1; - /* If our send queue is full but peer have RMBE space, - * we should send them out before wait - */ - if (!atomic_read(&conn->sndbuf_space) && - atomic_read(&conn->peer_rmbe_space) > 0) - smc_tx_sndbuf_nonempty(conn); - if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) { rc = smc_tx_wait(smc, msg->msg_flags); if (rc) { @@ -276,23 +281,18 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) */ if ((msg->msg_flags & MSG_OOB) && !send_remaining) conn->urg_tx_pend = true; - if (((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) || - msg->msg_flags & MSG_SENDPAGE_NOTLAST) && - (atomic_read(&conn->sndbuf_space))) || - smc_tx_should_cork(smc, msg)) { - /* for a corked socket defer the RDMA writes if - * sndbuf_space is still available. The applications - * should known how/when to uncork it. - */ + /* If we need to cork, do nothing and wait for the next + * sendmsg() call or push on tx completion + */ + if (!smc_tx_should_cork(smc, msg)) { + conn->tx_bytes += copylen; + ++conn->tx_cnt; + smc_tx_sndbuf_nonempty(conn); + } else { conn->tx_corked_bytes += copylen; ++conn->tx_corked_cnt; - continue; } - conn->tx_bytes += copylen; - ++conn->tx_cnt; - smc_tx_sndbuf_nonempty(conn); - trace_smc_tx_sendmsg(smc, copylen); } /* while (msg_data_left(msg)) */ @@ -638,17 +638,10 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) return rc; } -int smc_tx_sndbuf_nonempty(struct smc_connection *conn) +static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn) { - int rc = 0; struct smc_sock *smc = container_of(conn, struct smc_sock, conn); - - /* Only let one to push to prevent wasting of CPU and CDC slot */ - if (atomic_inc_return(&conn->tx_pushing) > 1) - return 0; - -again: - atomic_set(&conn->tx_pushing, 1); + int rc = 0; /* No data in the send queue */ if (unlikely(smc_tx_prepared_sends(conn) <= 0)) @@ -672,16 +665,34 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn) if (!rc) { /* trigger socket release if connection is closing */ - struct smc_sock *smc = container_of(conn, struct smc_sock, - conn); smc_close_wake_tx_prepared(smc); } out: + return rc; +} + +int smc_tx_sndbuf_nonempty(struct smc_connection *conn) +{ + int rc; + + /* This make sure only one can send simultaneously to prevent wasting + * of CPU and CDC slot. + * Record whether someone has tried to push while we are pushing. + */ + if (atomic_inc_return(&conn->tx_pushing) > 1) + return 0; + +again: + atomic_set(&conn->tx_pushing, 1); + smp_wmb(); /* Make sure tx_pushing is 1 before real send */ + rc = __smc_tx_sndbuf_nonempty(conn); + /* We need to check whether someone else have added some data into - * the send queue and tried to push but failed when we are pushing. - * If so, we need to try push again to prevent those data in the - * send queue may never been pushed out + * the send queue and tried to push but failed after the atomic_set() + * when we are pushing. + * If so, we need to push again to prevent those data hang in the send + * queue. */ if (unlikely(!atomic_dec_and_test(&conn->tx_pushing))) goto again; @@ -689,6 +700,10 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn) return rc; } +/* Wakeup sndbuf consumers from process context + * since there is more data to transmit. The caller + * must hold sock lock. + */ void smc_tx_pending(struct smc_connection *conn) { struct smc_sock *smc = container_of(conn, struct smc_sock, conn); @@ -704,7 +719,8 @@ void smc_tx_pending(struct smc_connection *conn) } /* Wakeup sndbuf consumers from process context - * since there is more data to transmit + * since there is more data to transmit in locked + * sock. */ void smc_tx_work(struct work_struct *work) {