[PATCH v1] xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Adopt the use of xprt_pin_rqst to eliminate contention between
Call-side users of rb_lock and the use of rb_lock in
rpcrdma_reply_handler.

This replaces the mechanism introduced in 431af645cf66 ("xprtrdma:
Fix client lock-up after application signal fires").

Use recv_lock to quickly find the completing rqst, pin it, then
drop the lock. At that point invalidation and pull-up of the Reply
XDR can be done. Both are often expensive operations.

Finally, take recv_lock again to signal completion to the RPC
layer. It also protects adjustment of "cwnd".

This greatly reduces the amount of time a lock is held by the
reply handler. Comparing lock_stat results shows a marked decrease
in contention on rb_lock and recv_lock.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
Hi-

If Trond's lock contention series is going into v4.14, I'd like you
to consider this one (applied at the end of that series) as well.
Without it, NFS/RDMA performance regresses a bit after the
first xprt_pin_rqst patch is applied. Thanks!


 net/sunrpc/xprt.c               |    2 +
 net/sunrpc/xprtrdma/rpc_rdma.c  |   57 ++++++++++-----------------------------
 net/sunrpc/xprtrdma/transport.c |    1 -
 net/sunrpc/xprtrdma/verbs.c     |    1 -
 net/sunrpc/xprtrdma/xprt_rdma.h |   30 ---------------------
 5 files changed, 16 insertions(+), 75 deletions(-)

diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 2af189c..e741ec2 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -855,6 +855,7 @@ void xprt_pin_rqst(struct rpc_rqst *req)
 {
 	set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
 }
+EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 
 /**
  * xprt_unpin_rqst - Unpin a request on the transport receive list
@@ -870,6 +871,7 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
 	if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
 		wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
 }
+EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
 
 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
 __must_hold(&req->rq_xprt->recv_lock)
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index dfa748a..577a29e 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -734,9 +734,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 		rpclen = 0;
 	}
 
-	req->rl_xid = rqst->rq_xid;
-	rpcrdma_insert_req(&r_xprt->rx_buf, req);
-
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
 	 *
@@ -991,7 +988,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	struct rpcrdma_rep *rep =
 			container_of(work, struct rpcrdma_rep, rr_work);
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
@@ -999,7 +995,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	__be32 *iptr;
 	int rdmalen, status, rmerr;
 	unsigned long cwnd;
-	struct list_head mws;
 
 	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
@@ -1017,22 +1012,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock(&buf->rb_lock);
-	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
-					headerp->rm_xid);
-	if (!req)
-		goto out_nomatch;
-	if (req->rl_reply)
-		goto out_duplicate;
-
-	list_replace_init(&req->rl_registered, &mws);
-	rpcrdma_mark_remote_invalidation(&mws, rep);
-
-	/* Avoid races with signals and duplicate replies
-	 * by marking this req as matched.
-	 */
+	spin_lock(&xprt->recv_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (!rqst)
+		goto out_norqst;
+	xprt_pin_rqst(rqst);
+	spin_unlock(&xprt->recv_lock);
+	req = rpcr_to_rdmar(rqst);
 	req->rl_reply = rep;
-	spin_unlock(&buf->rb_lock);
 
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
@@ -1044,17 +1031,12 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	 * waking the next RPC waits until this RPC has relinquished
 	 * all its Send Queue entries.
 	 */
-	if (!list_empty(&mws))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
+	if (!list_empty(&req->rl_registered)) {
+		rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
+		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
+						    &req->rl_registered);
+	}
 
-	/* Perform XID lookup, reconstruction of the RPC reply, and
-	 * RPC completion while holding the transport lock to ensure
-	 * the rep, rqst, and rq_task pointers remain stable.
-	 */
-	spin_lock(&xprt->recv_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (!rqst)
-		goto out_norqst;
 	xprt->reestablish_timeout = 0;
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
@@ -1130,12 +1112,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	}
 
 out:
+	spin_lock(&xprt->recv_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(rqst->rq_task);
 
 	xprt_complete_rqst(rqst->rq_task, status);
+	xprt_unpin_rqst(rqst);
 	spin_unlock(&xprt->recv_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 		__func__, xprt, rqst, status);
@@ -1202,19 +1186,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
-out_nomatch:
-	spin_unlock(&buf->rb_lock);
-	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
-		__func__, be32_to_cpu(headerp->rm_xid),
-		rep->rr_len);
-	goto repost;
-
-out_duplicate:
-	spin_unlock(&buf->rb_lock);
-	dprintk("RPC:       %s: "
-		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
-		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
-
 /* If no pending RPC transaction was matched, post a replacement
  * receive buffer before returning.
  */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index d1c458e..1cb1a07 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,6 @@
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	rpcrdma_remove_req(&r_xprt->rx_buf, req);
 	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e4171f2..23ec6ed 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1004,7 +1004,6 @@ struct rpcrdma_rep *
 	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
-	INIT_LIST_HEAD(&buf->rb_pending);
 	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 			  rpcrdma_mr_refresh_worker);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b282d3f..ad918c8 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -341,7 +341,6 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_list;
-	__be32			rl_xid;
 	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -403,7 +402,6 @@ struct rpcrdma_buffer {
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
-	struct list_head	rb_pending;
 	u32			rb_max_requests;
 	atomic_t		rb_credits;	/* most recent credit grant */
 
@@ -552,34 +550,6 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
-static inline void
-rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
-{
-	spin_lock(&buffers->rb_lock);
-	if (list_empty(&req->rl_list))
-		list_add_tail(&req->rl_list, &buffers->rb_pending);
-	spin_unlock(&buffers->rb_lock);
-}
-
-static inline struct rpcrdma_req *
-rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
-{
-	struct rpcrdma_req *pos;
-
-	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
-		if (pos->rl_xid == xid)
-			return pos;
-	return NULL;
-}
-
-static inline void
-rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
-{
-	spin_lock(&buffers->rb_lock);
-	list_del(&req->rl_list);
-	spin_unlock(&buffers->rb_lock);
-}
-
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux