On Wed, Feb 16, 2022 at 11:32:52AM +0100, Karsten Graul wrote: >On 16/02/2022 04:49, Dust Li wrote: >> This patch adds autocork support for SMC which could improve >> throughput for small message by x2 ~ x4. >> >> The main idea is borrowed from TCP autocork with some RDMA >> specific modification: > >Sounds like a valuable improvement, thank you! > >> --- >> net/smc/smc.h | 2 + >> net/smc/smc_cdc.c | 11 +++-- >> net/smc/smc_tx.c | 118 ++++++++++++++++++++++++++++++++++++++++------ >> 3 files changed, 114 insertions(+), 17 deletions(-) >> >> diff --git a/net/smc/smc.h b/net/smc/smc.h >> index a096d8af21a0..bc7df235281c 100644 >> --- a/net/smc/smc.h >> +++ b/net/smc/smc.h >> @@ -192,6 +192,8 @@ struct smc_connection { >> * - dec on polled tx cqe >> */ >> wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/ >> + atomic_t tx_pushing; /* nr_threads trying tx push */ >> + > >Is this extra empty line needed? Will remove this empty line in the next version. > >> struct delayed_work tx_work; /* retry of smc_cdc_msg_send */ >> u32 tx_off; /* base offset in peer rmb */ >> >> diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c >> index 9d5a97168969..2b37bec90824 100644 >> --- a/net/smc/smc_cdc.c >> +++ b/net/smc/smc_cdc.c >> @@ -48,9 +48,14 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, >> conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; >> } >> >> - if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) && >> - unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) >> - wake_up(&conn->cdc_pend_tx_wq); >> + if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { >> + /* If this is the last pending WR complete, we must push to >> + * prevent hang when autocork enabled. >> + */ >> + smc_tx_sndbuf_nonempty(conn); >> + if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) >> + wake_up(&conn->cdc_pend_tx_wq); >> + } >> WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); >> >> smc_tx_sndbuf_nonfull(smc); >> diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c >> index 5df3940d4543..bc737ac79805 100644 >> --- a/net/smc/smc_tx.c >> +++ b/net/smc/smc_tx.c >> @@ -31,6 +31,7 @@ >> #include "smc_tracepoint.h" >> >> #define SMC_TX_WORK_DELAY 0 >> +#define SMC_DEFAULT_AUTOCORK_SIZE (64 * 1024) >> >> /***************************** sndbuf producer *******************************/ >> >> @@ -127,10 +128,52 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) >> static bool smc_tx_is_corked(struct smc_sock *smc) >> { >> struct tcp_sock *tp = tcp_sk(smc->clcsock->sk); >> - >> return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; >> } >> >> +/* If we have pending CDC messages, do not send: >> + * Because CQE of this CDC message will happen shortly, it gives >> + * a chance to coalesce future sendmsg() payload in to one RDMA Write, >> + * without need for a timer, and with no latency trade off. >> + * Algorithm here: >> + * 1. First message should never cork >> + * 2. If we have pending CDC messages, wait for the first >> + * message's completion >> + * 3. Don't cork to much data in a single RDMA Write to prevent burst, >> + * total corked message should not exceed min(64k, sendbuf/2) >> + */ >> +static bool smc_should_autocork(struct smc_sock *smc, struct msghdr *msg, >> + int size_goal) >> +{ >> + struct smc_connection *conn = &smc->conn; >> + >> + if (atomic_read(&conn->cdc_pend_tx_wr) == 0 || >> + smc_tx_prepared_sends(conn) > min(size_goal, >> + conn->sndbuf_desc->len >> 1)) >> + return false; >> + return true; >> +} >> + >> +static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) >> +{ >> + struct smc_connection *conn = &smc->conn; >> + >> + if (smc_should_autocork(smc, msg, SMC_DEFAULT_AUTOCORK_SIZE)) >> + return true; >> + >> + if ((msg->msg_flags & MSG_MORE || >> + smc_tx_is_corked(smc) || >> + msg->msg_flags & MSG_SENDPAGE_NOTLAST) && >> + (atomic_read(&conn->sndbuf_space))) >> + /* for a corked socket defer the RDMA writes if >> + * sndbuf_space is still available. The applications >> + * should known how/when to uncork it. >> + */ >> + return true; >> + >> + return false; >> +} >> + >> /* sndbuf producer: main API called by socket layer. >> * called under sock lock. >> */ >> @@ -177,6 +220,13 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) >> if (msg->msg_flags & MSG_OOB) >> conn->local_tx_ctrl.prod_flags.urg_data_pending = 1; >> >> + /* If our send queue is full but peer have RMBE space, >> + * we should send them out before wait >> + */ >> + if (!atomic_read(&conn->sndbuf_space) && >> + atomic_read(&conn->peer_rmbe_space) > 0) >> + smc_tx_sndbuf_nonempty(conn); >> + >> if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) { >> if (send_done) >> return send_done; >> @@ -235,15 +285,12 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) >> */ >> if ((msg->msg_flags & MSG_OOB) && !send_remaining) >> conn->urg_tx_pend = true; >> - if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) || >> - msg->msg_flags & MSG_SENDPAGE_NOTLAST) && >> - (atomic_read(&conn->sndbuf_space))) >> - /* for a corked socket defer the RDMA writes if >> - * sndbuf_space is still available. The applications >> - * should known how/when to uncork it. >> - */ >> - continue; >> - smc_tx_sndbuf_nonempty(conn); >> + >> + /* If we need to cork, do nothing and wait for the next >> + * sendmsg() call or push on tx completion >> + */ >> + if (!smc_tx_should_cork(smc, msg)) >> + smc_tx_sndbuf_nonempty(conn); >> >> trace_smc_tx_sendmsg(smc, copylen); >> } /* while (msg_data_left(msg)) */ >> @@ -590,13 +637,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) >> return rc; >> } >> >> -int smc_tx_sndbuf_nonempty(struct smc_connection *conn) >> +static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn) >> { >> - int rc; >> + int rc = 0; >> + struct smc_sock *smc = container_of(conn, struct smc_sock, conn); > >Reverse Christmas tree style please. Sure, will do. Thank you !