Adopt the use of xprt_pin_rqst to eliminate contention between Call-side users of rb_lock and the use of rb_lock in rpcrdma_reply_handler. This replaces the mechanism introduced in 431af645cf66 ("xprtrdma: Fix client lock-up after application signal fires"). Use recv_lock to quickly find the completing rqst, pin it, then drop the lock. At that point invalidation and pull-up of the Reply XDR can be done. Both are often expensive operations. Finally, take recv_lock again to signal completion to the RPC layer. It also protects adjustment of "cwnd". This greatly reduces the amount of time a lock is held by the reply handler. Comparing lock_stat results shows a marked decrease in contention on rb_lock and recv_lock. Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- Hi- If Trond's lock contention series is going into v4.14, I'd like you to consider this one (applied at the end of that series) as well. Without it, NFS/RDMA performance regresses a bit after the first xprt_pin_rqst patch is applied. Thanks! net/sunrpc/xprt.c | 2 + net/sunrpc/xprtrdma/rpc_rdma.c | 57 ++++++++++----------------------------- net/sunrpc/xprtrdma/transport.c | 1 - net/sunrpc/xprtrdma/verbs.c | 1 - net/sunrpc/xprtrdma/xprt_rdma.h | 30 --------------------- 5 files changed, 16 insertions(+), 75 deletions(-) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 2af189c..e741ec2 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -855,6 +855,7 @@ void xprt_pin_rqst(struct rpc_rqst *req) { set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate); } +EXPORT_SYMBOL_GPL(xprt_pin_rqst); /** * xprt_unpin_rqst - Unpin a request on the transport receive list @@ -870,6 +871,7 @@ void xprt_unpin_rqst(struct rpc_rqst *req) if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate)) wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV); } +EXPORT_SYMBOL_GPL(xprt_unpin_rqst); static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) __must_hold(&req->rq_xprt->recv_lock) diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index dfa748a..577a29e 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -734,9 +734,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, rpclen = 0; } - req->rl_xid = rqst->rq_xid; - rpcrdma_insert_req(&r_xprt->rx_buf, req); - /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -991,7 +988,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep = container_of(work, struct rpcrdma_rep, rr_work); struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpcrdma_msg *headerp; struct rpcrdma_req *req; @@ -999,7 +995,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, __be32 *iptr; int rdmalen, status, rmerr; unsigned long cwnd; - struct list_head mws; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); @@ -1017,22 +1012,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ - spin_lock(&buf->rb_lock); - req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, - headerp->rm_xid); - if (!req) - goto out_nomatch; - if (req->rl_reply) - goto out_duplicate; - - list_replace_init(&req->rl_registered, &mws); - rpcrdma_mark_remote_invalidation(&mws, rep); - - /* Avoid races with signals and duplicate replies - * by marking this req as matched. - */ + spin_lock(&xprt->recv_lock); + rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); + if (!rqst) + goto out_norqst; + xprt_pin_rqst(rqst); + spin_unlock(&xprt->recv_lock); + req = rpcr_to_rdmar(rqst); req->rl_reply = rep; - spin_unlock(&buf->rb_lock); dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", __func__, rep, req, be32_to_cpu(headerp->rm_xid)); @@ -1044,17 +1031,12 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, * waking the next RPC waits until this RPC has relinquished * all its Send Queue entries. */ - if (!list_empty(&mws)) - r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws); + if (!list_empty(&req->rl_registered)) { + rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, + &req->rl_registered); + } - /* Perform XID lookup, reconstruction of the RPC reply, and - * RPC completion while holding the transport lock to ensure - * the rep, rqst, and rq_task pointers remain stable. - */ - spin_lock(&xprt->recv_lock); - rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); - if (!rqst) - goto out_norqst; xprt->reestablish_timeout = 0; if (headerp->rm_vers != rpcrdma_version) goto out_badversion; @@ -1130,12 +1112,14 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, } out: + spin_lock(&xprt->recv_lock); cwnd = xprt->cwnd; xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); xprt_complete_rqst(rqst->rq_task, status); + xprt_unpin_rqst(rqst); spin_unlock(&xprt->recv_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", __func__, xprt, rqst, status); @@ -1202,19 +1186,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; -out_nomatch: - spin_unlock(&buf->rb_lock); - dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", - __func__, be32_to_cpu(headerp->rm_xid), - rep->rr_len); - goto repost; - -out_duplicate: - spin_unlock(&buf->rb_lock); - dprintk("RPC: %s: " - "duplicate reply %p to RPC request %p: xid 0x%08x\n", - __func__, rep, req, be32_to_cpu(headerp->rm_xid)); - /* If no pending RPC transaction was matched, post a replacement * receive buffer before returning. */ diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index d1c458e..1cb1a07 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -684,7 +684,6 @@ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - rpcrdma_remove_req(&r_xprt->rx_buf, req); if (!list_empty(&req->rl_registered)) ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task)); rpcrdma_unmap_sges(ia, req); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index e4171f2..23ec6ed 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1004,7 +1004,6 @@ struct rpcrdma_rep * spin_lock_init(&buf->rb_recovery_lock); INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_all); - INIT_LIST_HEAD(&buf->rb_pending); INIT_LIST_HEAD(&buf->rb_stale_mrs); INIT_DELAYED_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index b282d3f..ad918c8 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -341,7 +341,6 @@ enum { struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_list; - __be32 rl_xid; unsigned int rl_mapped_sges; unsigned int rl_connect_cookie; struct rpcrdma_buffer *rl_buffer; @@ -403,7 +402,6 @@ struct rpcrdma_buffer { int rb_send_count, rb_recv_count; struct list_head rb_send_bufs; struct list_head rb_recv_bufs; - struct list_head rb_pending; u32 rb_max_requests; atomic_t rb_credits; /* most recent credit grant */ @@ -552,34 +550,6 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *, int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); -static inline void -rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) -{ - spin_lock(&buffers->rb_lock); - if (list_empty(&req->rl_list)) - list_add_tail(&req->rl_list, &buffers->rb_pending); - spin_unlock(&buffers->rb_lock); -} - -static inline struct rpcrdma_req * -rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid) -{ - struct rpcrdma_req *pos; - - list_for_each_entry(pos, &buffers->rb_pending, rl_list) - if (pos->rl_xid == xid) - return pos; - return NULL; -} - -static inline void -rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) -{ - spin_lock(&buffers->rb_lock); - list_del(&req->rl_list); - spin_unlock(&buffers->rb_lock); -} - struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *); void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html