[PATCH v4 06/30] xprtrdma: Don't wake pending tasks until disconnect is done

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Transport disconnect processing does a "wake pending tasks" at
various points.

Suppose an RPC Reply is being processed. The RPC task that Reply
goes with is waiting on the pending queue. If a disconnect wake-up
happens before reply processing is done, that reply, even if it is
good, is thrown away, and the RPC has to be sent again.

This window apparently does not exist for socket transports because
there is a lock held while a reply is being received which prevents
the wake-up call until after reply processing is done.

To resolve this, all RPC replies being processed on an RPC-over-RDMA
transport have to complete before pending tasks are awoken due to a
transport disconnect.

Callers that already hold the transport write lock may invoke
->ops->close directly. Others use a generic helper that schedules
a close when the write lock can be taken safely.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/xprt.h                |    1 +
 net/sunrpc/xprt.c                          |   19 +++++++++++++++++++
 net/sunrpc/xprtrdma/backchannel.c          |   13 +++++++------
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |    8 +++++---
 net/sunrpc/xprtrdma/transport.c            |   16 ++++++++++------
 net/sunrpc/xprtrdma/verbs.c                |    5 ++---
 6 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a4ab4f8..ee94ed0 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -401,6 +401,7 @@ static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *
 bool			xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req);
 void			xprt_disconnect_done(struct rpc_xprt *xprt);
 void			xprt_force_disconnect(struct rpc_xprt *xprt);
+void			xprt_disconnect_nowake(struct rpc_xprt *xprt);
 void			xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
 
 bool			xprt_lock_connect(struct rpc_xprt *, struct rpc_task *, void *);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ce92700..afe412e 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -685,6 +685,25 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
 
+/**
+ * xprt_disconnect_nowake - force a call to xprt->ops->close
+ * @xprt: transport to disconnect
+ *
+ * The caller must ensure that xprt_wake_pending_tasks() is
+ * called later.
+ */
+void xprt_disconnect_nowake(struct rpc_xprt *xprt)
+{
+       /* Don't race with the test_bit() in xprt_clear_locked() */
+       spin_lock_bh(&xprt->transport_lock);
+       set_bit(XPRT_CLOSE_WAIT, &xprt->state);
+       /* Try to schedule an autoclose RPC call */
+       if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
+               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
+       spin_unlock_bh(&xprt->transport_lock);
+}
+EXPORT_SYMBOL_GPL(xprt_disconnect_nowake);
+
 static unsigned int
 xprt_connect_cookie(struct rpc_xprt *xprt)
 {
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2cb07a3..5d462e8 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -193,14 +193,15 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
  */
 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
 {
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	int rc;
 
-	if (!xprt_connected(rqst->rq_xprt))
-		goto drop_connection;
+	if (!xprt_connected(xprt))
+		return -ENOTCONN;
 
-	if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
+	if (!xprt_request_get_cong(xprt, rqst))
 		return -EBADSLT;
 
 	rc = rpcrdma_bc_marshal_reply(rqst);
@@ -215,7 +216,7 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
 	if (rc != -ENOTCONN)
 		return rc;
 drop_connection:
-	xprt_disconnect_done(rqst->rq_xprt);
+	xprt->ops->close(xprt);
 	return -ENOTCONN;
 }
 
@@ -338,7 +339,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 
 out_overflow:
 	pr_warn("RPC/RDMA backchannel overflow\n");
-	xprt_disconnect_done(xprt);
+	xprt_disconnect_nowake(xprt);
 	/* This receive buffer gets reposted automatically
 	 * when the connection is re-established.
 	 */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index f3c147d..b908f2c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -200,11 +200,10 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 		svc_rdma_send_ctxt_put(rdma, ctxt);
 		goto drop_connection;
 	}
-	return rc;
+	return 0;
 
 drop_connection:
 	dprintk("svcrdma: failed to send bc call\n");
-	xprt_disconnect_done(xprt);
 	return -ENOTCONN;
 }
 
@@ -225,8 +224,11 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 
 	ret = -ENOTCONN;
 	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
-	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
 		ret = rpcrdma_bc_send_request(rdma, rqst);
+		if (ret == -ENOTCONN)
+			svc_close_xprt(sxprt);
+	}
 
 	mutex_unlock(&sxprt->xpt_mutex);
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 91c476a..a16296b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -453,13 +453,13 @@
 
 	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
 		rpcrdma_ia_remove(ia);
-		return;
+		goto out;
 	}
+
 	if (ep->rep_connected == -ENODEV)
 		return;
 	if (ep->rep_connected > 0)
 		xprt->reestablish_timeout = 0;
-	xprt_disconnect_done(xprt);
 	rpcrdma_ep_disconnect(ep, ia);
 
 	/* Prepare @xprt for the next connection by reinitializing
@@ -467,6 +467,10 @@
 	 */
 	r_xprt->rx_buf.rb_credits = 1;
 	xprt->cwnd = RPC_CWNDSHIFT;
+
+out:
+	++xprt->connect_cookie;
+	xprt_disconnect_done(xprt);
 }
 
 /**
@@ -515,7 +519,7 @@
 static void
 xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	xprt_force_disconnect(xprt);
+	xprt_disconnect_nowake(xprt);
 }
 
 /**
@@ -717,7 +721,7 @@
 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 
 	if (!xprt_connected(xprt))
-		goto drop_connection;
+		return -ENOTCONN;
 
 	if (!xprt_request_get_cong(xprt, rqst))
 		return -EBADSLT;
@@ -749,8 +753,8 @@
 	if (rc != -ENOTCONN)
 		return rc;
 drop_connection:
-	xprt_disconnect_done(xprt);
-	return -ENOTCONN;	/* implies disconnect */
+	xprt_rdma_close(xprt);
+	return -ENOTCONN;
 }
 
 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 9a0a765..38a757c 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -252,7 +252,7 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 #endif
 		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
 		ep->rep_connected = -ENODEV;
-		xprt_force_disconnect(xprt);
+		xprt_disconnect_nowake(xprt);
 		wait_for_completion(&ia->ri_remove_done);
 
 		ia->ri_id = NULL;
@@ -280,10 +280,9 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 			ep->rep_connected = -EAGAIN;
 		goto disconnected;
 	case RDMA_CM_EVENT_DISCONNECTED:
-		++xprt->connect_cookie;
 		ep->rep_connected = -ECONNABORTED;
 disconnected:
-		xprt_force_disconnect(xprt);
+		xprt_disconnect_nowake(xprt);
 		wake_up_all(&ep->rep_connect_wait);
 		break;
 	default:




[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux