Currently the Receive completion handler refreshes the Receive Queue whenever a successful Receive completion occurs. On disconnect, xprtrdma drains the Receive Queue. The first few Receive completions after a disconnect are typically successful, until the first flushed Receive. This means the Receive completion handler continues to post more Receive WRs after the drain sentinel has been posted. The late- posted Receives flush after the drain sentinel has completed, leading to a crash later in rpcrdma_xprt_disconnect(). To prevent this crash, xprtrdma has to ensure that the Receive handler stops posting Receives before ib_drain_rq() posts its drain sentinel. Suggested-by: Tom Talpey <tom@xxxxxxxxxx> Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- net/sunrpc/xprtrdma/verbs.c | 13 +++++++++++++ net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 2 files changed, 14 insertions(+) diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index ec912cf9c618..d8ed69442219 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rdma_cm_id *id = ep->re_id; + /* Wait for rpcrdma_post_recvs() to leave its critical + * section. + */ + if (atomic_inc_return(&ep->re_receiving) > 1) + wait_for_completion(&ep->re_done); + /* Flush Receives, then wait for deferred Reply work * to complete. */ @@ -414,6 +420,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) __module_get(THIS_MODULE); device = id->device; ep->re_id = id; + reinit_completion(&ep->re_done); ep->re_max_requests = r_xprt->rx_xprt.max_reqs; ep->re_inline_send = xprt_rdma_max_inline_write; @@ -1385,6 +1392,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) if (!temp) needed += RPCRDMA_MAX_RECV_BATCH; + if (atomic_inc_return(&ep->re_receiving) > 1) + goto out; + /* fast path: all needed reps can be found on the free list */ wr = NULL; while (needed) { @@ -1410,6 +1420,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rc = ib_post_recv(ep->re_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); + if (atomic_dec_return(&ep->re_receiving) > 0) + complete(&ep->re_done); + out: trace_xprtrdma_post_recvs(r_xprt, count, rc); if (rc) { diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index fe3be985e239..31404326f29f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -83,6 +83,7 @@ struct rpcrdma_ep { unsigned int re_max_inline_recv; int re_async_rc; int re_connect_status; + atomic_t re_receiving; atomic_t re_force_disconnect; struct ib_qp_init_attr re_attr; wait_queue_head_t re_connect_wait;