On 16/02/2022 04:49, Dust Li wrote: > This patch adds autocork support for SMC which could improve > throughput for small message by x2 ~ x4. > > The main idea is borrowed from TCP autocork with some RDMA > specific modification: Sounds like a valuable improvement, thank you! > --- > net/smc/smc.h | 2 + > net/smc/smc_cdc.c | 11 +++-- > net/smc/smc_tx.c | 118 ++++++++++++++++++++++++++++++++++++++++------ > 3 files changed, 114 insertions(+), 17 deletions(-) > > diff --git a/net/smc/smc.h b/net/smc/smc.h > index a096d8af21a0..bc7df235281c 100644 > --- a/net/smc/smc.h > +++ b/net/smc/smc.h > @@ -192,6 +192,8 @@ struct smc_connection { > * - dec on polled tx cqe > */ > wait_queue_head_t cdc_pend_tx_wq; /* wakeup on no cdc_pend_tx_wr*/ > + atomic_t tx_pushing; /* nr_threads trying tx push */ > + Is this extra empty line needed? > struct delayed_work tx_work; /* retry of smc_cdc_msg_send */ > u32 tx_off; /* base offset in peer rmb */ > > diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c > index 9d5a97168969..2b37bec90824 100644 > --- a/net/smc/smc_cdc.c > +++ b/net/smc/smc_cdc.c > @@ -48,9 +48,14 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, > conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; > } > > - if (atomic_dec_and_test(&conn->cdc_pend_tx_wr) && > - unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) > - wake_up(&conn->cdc_pend_tx_wq); > + if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) { > + /* If this is the last pending WR complete, we must push to > + * prevent hang when autocork enabled. > + */ > + smc_tx_sndbuf_nonempty(conn); > + if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) > + wake_up(&conn->cdc_pend_tx_wq); > + } > WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); > > smc_tx_sndbuf_nonfull(smc); > diff --git a/net/smc/smc_tx.c b/net/smc/smc_tx.c > index 5df3940d4543..bc737ac79805 100644 > --- a/net/smc/smc_tx.c > +++ b/net/smc/smc_tx.c > @@ -31,6 +31,7 @@ > #include "smc_tracepoint.h" > > #define SMC_TX_WORK_DELAY 0 > +#define SMC_DEFAULT_AUTOCORK_SIZE (64 * 1024) > > /***************************** sndbuf producer *******************************/ > > @@ -127,10 +128,52 @@ static int smc_tx_wait(struct smc_sock *smc, int flags) > static bool smc_tx_is_corked(struct smc_sock *smc) > { > struct tcp_sock *tp = tcp_sk(smc->clcsock->sk); > - > return (tp->nonagle & TCP_NAGLE_CORK) ? true : false; > } > > +/* If we have pending CDC messages, do not send: > + * Because CQE of this CDC message will happen shortly, it gives > + * a chance to coalesce future sendmsg() payload in to one RDMA Write, > + * without need for a timer, and with no latency trade off. > + * Algorithm here: > + * 1. First message should never cork > + * 2. If we have pending CDC messages, wait for the first > + * message's completion > + * 3. Don't cork to much data in a single RDMA Write to prevent burst, > + * total corked message should not exceed min(64k, sendbuf/2) > + */ > +static bool smc_should_autocork(struct smc_sock *smc, struct msghdr *msg, > + int size_goal) > +{ > + struct smc_connection *conn = &smc->conn; > + > + if (atomic_read(&conn->cdc_pend_tx_wr) == 0 || > + smc_tx_prepared_sends(conn) > min(size_goal, > + conn->sndbuf_desc->len >> 1)) > + return false; > + return true; > +} > + > +static bool smc_tx_should_cork(struct smc_sock *smc, struct msghdr *msg) > +{ > + struct smc_connection *conn = &smc->conn; > + > + if (smc_should_autocork(smc, msg, SMC_DEFAULT_AUTOCORK_SIZE)) > + return true; > + > + if ((msg->msg_flags & MSG_MORE || > + smc_tx_is_corked(smc) || > + msg->msg_flags & MSG_SENDPAGE_NOTLAST) && > + (atomic_read(&conn->sndbuf_space))) > + /* for a corked socket defer the RDMA writes if > + * sndbuf_space is still available. The applications > + * should known how/when to uncork it. > + */ > + return true; > + > + return false; > +} > + > /* sndbuf producer: main API called by socket layer. > * called under sock lock. > */ > @@ -177,6 +220,13 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) > if (msg->msg_flags & MSG_OOB) > conn->local_tx_ctrl.prod_flags.urg_data_pending = 1; > > + /* If our send queue is full but peer have RMBE space, > + * we should send them out before wait > + */ > + if (!atomic_read(&conn->sndbuf_space) && > + atomic_read(&conn->peer_rmbe_space) > 0) > + smc_tx_sndbuf_nonempty(conn); > + > if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) { > if (send_done) > return send_done; > @@ -235,15 +285,12 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len) > */ > if ((msg->msg_flags & MSG_OOB) && !send_remaining) > conn->urg_tx_pend = true; > - if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) || > - msg->msg_flags & MSG_SENDPAGE_NOTLAST) && > - (atomic_read(&conn->sndbuf_space))) > - /* for a corked socket defer the RDMA writes if > - * sndbuf_space is still available. The applications > - * should known how/when to uncork it. > - */ > - continue; > - smc_tx_sndbuf_nonempty(conn); > + > + /* If we need to cork, do nothing and wait for the next > + * sendmsg() call or push on tx completion > + */ > + if (!smc_tx_should_cork(smc, msg)) > + smc_tx_sndbuf_nonempty(conn); > > trace_smc_tx_sendmsg(smc, copylen); > } /* while (msg_data_left(msg)) */ > @@ -590,13 +637,26 @@ static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn) > return rc; > } > > -int smc_tx_sndbuf_nonempty(struct smc_connection *conn) > +static int __smc_tx_sndbuf_nonempty(struct smc_connection *conn) > { > - int rc; > + int rc = 0; > + struct smc_sock *smc = container_of(conn, struct smc_sock, conn); Reverse Christmas tree style please.