> On Dec 17, 2018, at 12:28 PM, Trond Myklebust <trondmy@xxxxxxxxxxxxxxx> wrote: > > On Mon, 2018-12-17 at 11:39 -0500, Chuck Lever wrote: >> Transport disconnect processing does a "wake pending tasks" at >> various points. >> >> Suppose an RPC Reply is being processed. The RPC task that Reply >> goes with is waiting on the pending queue. If a disconnect wake-up >> happens before reply processing is done, that reply, even if it is >> good, is thrown away, and the RPC has to be sent again. >> >> This window apparently does not exist for socket transports because >> there is a lock held while a reply is being received which prevents >> the wake-up call until after reply processing is done. >> >> To resolve this, all RPC replies being processed on an RPC-over-RDMA >> transport have to complete before pending tasks are awoken due to a >> transport disconnect. >> >> Callers that already hold the transport write lock may invoke >> ->ops->close directly. Others use a generic helper that schedules >> a close when the write lock can be taken safely. >> >> Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> >> --- >> include/linux/sunrpc/xprt.h | 1 + >> net/sunrpc/xprt.c | 19 >> +++++++++++++++++++ >> net/sunrpc/xprtrdma/backchannel.c | 13 +++++++------ >> net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 8 +++++--- >> net/sunrpc/xprtrdma/transport.c | 16 ++++++++++------ >> net/sunrpc/xprtrdma/verbs.c | 5 ++--- >> 6 files changed, 44 insertions(+), 18 deletions(-) >> >> diff --git a/include/linux/sunrpc/xprt.h >> b/include/linux/sunrpc/xprt.h >> index a4ab4f8..ee94ed0 100644 >> --- a/include/linux/sunrpc/xprt.h >> +++ b/include/linux/sunrpc/xprt.h >> @@ -401,6 +401,7 @@ static inline __be32 >> *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 * >> bool xprt_request_get_cong(struct rpc_xprt *xprt, >> struct rpc_rqst *req); >> void xprt_disconnect_done(struct rpc_xprt *xprt); >> void xprt_force_disconnect(struct rpc_xprt *xprt); >> +void xprt_disconnect_nowake(struct rpc_xprt *xprt); >> void xprt_conditional_disconnect(struct rpc_xprt >> *xprt, unsigned int cookie); >> >> bool xprt_lock_connect(struct rpc_xprt *, struct >> rpc_task *, void *); >> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c >> index ce92700..afe412e 100644 >> --- a/net/sunrpc/xprt.c >> +++ b/net/sunrpc/xprt.c >> @@ -685,6 +685,25 @@ void xprt_force_disconnect(struct rpc_xprt >> *xprt) >> } >> EXPORT_SYMBOL_GPL(xprt_force_disconnect); >> >> +/** >> + * xprt_disconnect_nowake - force a call to xprt->ops->close >> + * @xprt: transport to disconnect >> + * >> + * The caller must ensure that xprt_wake_pending_tasks() is >> + * called later. >> + */ >> +void xprt_disconnect_nowake(struct rpc_xprt *xprt) >> +{ >> + /* Don't race with the test_bit() in xprt_clear_locked() */ >> + spin_lock_bh(&xprt->transport_lock); >> + set_bit(XPRT_CLOSE_WAIT, &xprt->state); >> + /* Try to schedule an autoclose RPC call */ >> + if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) >> + queue_work(xprtiod_workqueue, &xprt->task_cleanup); >> + spin_unlock_bh(&xprt->transport_lock); >> +} >> +EXPORT_SYMBOL_GPL(xprt_disconnect_nowake); >> + > > We shouldn't need both xprt_disconnect_nowake() and > xprt_force_disconnect() to be exported given that you can build the > latter from the former + xprt_wake_pending_tasks() (which is also > already exported). Thanks for your review! I can get rid of xprt_disconnect_nowake. There are some variations, depending on why wake_pending_tasks is protected by xprt->transport_lock. static void xs_tcp_force_close(struct rpc_xprt *xprt) { xprt_force_disconnect(xprt); xprt_wake_pending_tasks(xprt, -EAGAIN); } Or, static void xs_tcp_force_close(struct rpc_xprt *xprt) { xprt_force_disconnect(xprt); spin_lock_bh(&xprt->transport_lock); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_lock_bh(&xprt->transport_lock); } Or, void xprt_force_disconnect(struct rpc_xprt *xprt, bool wake) { /* Don't race with the test_bit() in xprt_clear_locked() */ spin_lock_bh(&xprt->transport_lock); set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) queue_work(xprtiod_workqueue, &xprt->task_cleanup); if (wake) xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } Which do you prefer? >> static unsigned int >> xprt_connect_cookie(struct rpc_xprt *xprt) >> { >> diff --git a/net/sunrpc/xprtrdma/backchannel.c >> b/net/sunrpc/xprtrdma/backchannel.c >> index 2cb07a3..5d462e8 100644 >> --- a/net/sunrpc/xprtrdma/backchannel.c >> +++ b/net/sunrpc/xprtrdma/backchannel.c >> @@ -193,14 +193,15 @@ static int rpcrdma_bc_marshal_reply(struct >> rpc_rqst *rqst) >> */ >> int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) >> { >> - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); >> + struct rpc_xprt *xprt = rqst->rq_xprt; >> + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); >> struct rpcrdma_req *req = rpcr_to_rdmar(rqst); >> int rc; >> >> - if (!xprt_connected(rqst->rq_xprt)) >> - goto drop_connection; >> + if (!xprt_connected(xprt)) >> + return -ENOTCONN; >> >> - if (!xprt_request_get_cong(rqst->rq_xprt, rqst)) >> + if (!xprt_request_get_cong(xprt, rqst)) >> return -EBADSLT; >> >> rc = rpcrdma_bc_marshal_reply(rqst); >> @@ -215,7 +216,7 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst >> *rqst) >> if (rc != -ENOTCONN) >> return rc; >> drop_connection: >> - xprt_disconnect_done(rqst->rq_xprt); >> + xprt->ops->close(xprt); > > Why use an indirect call here? Is this ever going to be different to > xprt_rdma_close()? > >> return -ENOTCONN; >> } >> >> @@ -338,7 +339,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt >> *r_xprt, >> >> out_overflow: >> pr_warn("RPC/RDMA backchannel overflow\n"); >> - xprt_disconnect_done(xprt); >> + xprt_disconnect_nowake(xprt); >> /* This receive buffer gets reposted automatically >> * when the connection is re-established. >> */ >> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c >> b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c >> index f3c147d..b908f2c 100644 >> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c >> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c >> @@ -200,11 +200,10 @@ static int svc_rdma_bc_sendto(struct >> svcxprt_rdma *rdma, >> svc_rdma_send_ctxt_put(rdma, ctxt); >> goto drop_connection; >> } >> - return rc; >> + return 0; >> >> drop_connection: >> dprintk("svcrdma: failed to send bc call\n"); >> - xprt_disconnect_done(xprt); >> return -ENOTCONN; >> } >> >> @@ -225,8 +224,11 @@ static int svc_rdma_bc_sendto(struct >> svcxprt_rdma *rdma, >> >> ret = -ENOTCONN; >> rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); >> - if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) >> + if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) { >> ret = rpcrdma_bc_send_request(rdma, rqst); >> + if (ret == -ENOTCONN) >> + svc_close_xprt(sxprt); >> + } >> >> mutex_unlock(&sxprt->xpt_mutex); >> >> diff --git a/net/sunrpc/xprtrdma/transport.c >> b/net/sunrpc/xprtrdma/transport.c >> index 91c476a..a16296b 100644 >> --- a/net/sunrpc/xprtrdma/transport.c >> +++ b/net/sunrpc/xprtrdma/transport.c >> @@ -453,13 +453,13 @@ >> >> if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) { >> rpcrdma_ia_remove(ia); >> - return; >> + goto out; >> } >> + >> if (ep->rep_connected == -ENODEV) >> return; >> if (ep->rep_connected > 0) >> xprt->reestablish_timeout = 0; >> - xprt_disconnect_done(xprt); >> rpcrdma_ep_disconnect(ep, ia); >> >> /* Prepare @xprt for the next connection by reinitializing >> @@ -467,6 +467,10 @@ >> */ >> r_xprt->rx_buf.rb_credits = 1; >> xprt->cwnd = RPC_CWNDSHIFT; >> + >> +out: >> + ++xprt->connect_cookie; >> + xprt_disconnect_done(xprt); >> } >> >> /** >> @@ -515,7 +519,7 @@ >> static void >> xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) >> { >> - xprt_force_disconnect(xprt); >> + xprt_disconnect_nowake(xprt); >> } >> >> /** >> @@ -717,7 +721,7 @@ >> #endif /* CONFIG_SUNRPC_BACKCHANNEL */ >> >> if (!xprt_connected(xprt)) >> - goto drop_connection; >> + return -ENOTCONN; >> >> if (!xprt_request_get_cong(xprt, rqst)) >> return -EBADSLT; >> @@ -749,8 +753,8 @@ >> if (rc != -ENOTCONN) >> return rc; >> drop_connection: >> - xprt_disconnect_done(xprt); >> - return -ENOTCONN; /* implies disconnect */ >> + xprt_rdma_close(xprt); >> + return -ENOTCONN; >> } >> >> void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file >> *seq) >> diff --git a/net/sunrpc/xprtrdma/verbs.c >> b/net/sunrpc/xprtrdma/verbs.c >> index 9a0a765..38a757c 100644 >> --- a/net/sunrpc/xprtrdma/verbs.c >> +++ b/net/sunrpc/xprtrdma/verbs.c >> @@ -252,7 +252,7 @@ static void rpcrdma_xprt_drain(struct >> rpcrdma_xprt *r_xprt) >> #endif >> set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); >> ep->rep_connected = -ENODEV; >> - xprt_force_disconnect(xprt); >> + xprt_disconnect_nowake(xprt); >> wait_for_completion(&ia->ri_remove_done); >> >> ia->ri_id = NULL; >> @@ -280,10 +280,9 @@ static void rpcrdma_xprt_drain(struct >> rpcrdma_xprt *r_xprt) >> ep->rep_connected = -EAGAIN; >> goto disconnected; >> case RDMA_CM_EVENT_DISCONNECTED: >> - ++xprt->connect_cookie; >> ep->rep_connected = -ECONNABORTED; >> disconnected: >> - xprt_force_disconnect(xprt); >> + xprt_disconnect_nowake(xprt); >> wake_up_all(&ep->rep_connect_wait); >> break; >> default: >> > > -- > Trond Myklebust > Linux NFS client maintainer, Hammerspace > trond.myklebust@xxxxxxxxxxxxxxx -- Chuck Lever