[PATCH v2 05/13] xprtrdma: Fix client lock-up after application signal fires

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



After a signal, the RPC client aborts synchronous RPCs running on
behalf of the signaled application.

The server is still executing those RPCs, and will write the results
back into the client's memory when it's done. By the time the server
writes the results, that memory is likely being used for other
purposes. Therefore xprtrdma has to immediately invalidate all
memory regions used by those aborted RPCs to prevent the server's
writes from clobbering that re-used memory.

With FMR memory registration, invalidation takes a relatively long
time. In fact, the invalidation is often still running when the
server tries to write the results into the memory regions that are
being invalidated.

This sets up a race between two processes:

1.  After the signal, xprt_rdma_free calls ro_unmap_safe.
2.  While ro_unmap_safe is still running, the server replies and
    rpcrdma_reply_handler runs, calling ro_unmap_sync.

Both processes invoke ib_unmap_fmr on the same FMR.

The mlx4 driver allows two ib_unmap_fmr calls on the same FMR at
the same time, but HCAs generally don't tolerate this. Sometimes
this can result in a system crash.

If the HCA happens to survive, rpcrdma_reply_handler continues. It
removes the rpc_rqst from rq_list and releases the transport_lock.
This enables xprt_rdma_free to run in another process, and the
rpc_rqst is released while rpcrdma_reply_handler is still waiting
for the ib_unmap_fmr call to finish.

But further down in rpcrdma_reply_handler, the transport_lock is
taken again, and "rqst" is dereferenced. If "rqst" has already been
released, this triggers a general protection fault. Since bottom-
halves are disabled, the system locks up.

Address both issues by reversing the order of the xprt_lookup_rqst
call and the ro_unmap_sync call. Introduce a separate lookup
mechanism for rpcrdma_req's to enable calling ro_unmap_sync before
xprt_lookup_rqst. Now the handler takes the transport_lock once
and holds it for the XID lookup and RPC completion.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649a725 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   79 +++++++++++++++++++++++++--------------
 net/sunrpc/xprtrdma/transport.c |    3 +
 net/sunrpc/xprtrdma/verbs.c     |    3 +
 net/sunrpc/xprtrdma/xprt_rdma.h |   30 +++++++++++++++
 4 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index c88132d..b6584ae 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -734,6 +734,9 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 		rpclen = 0;
 	}
 
+	req->rl_xid = rqst->rq_xid;
+	rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
 	 *
@@ -987,11 +990,12 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 {
 	struct rpcrdma_rep *rep =
 			container_of(work, struct rpcrdma_rep, rr_work);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
 	int rdmalen, status, rmerr;
 	unsigned long cwnd;
@@ -1013,28 +1017,45 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock_bh(&xprt->transport_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (!rqst)
+	spin_lock(&buf->rb_lock);
+	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+					headerp->rm_xid);
+	if (!req)
 		goto out_nomatch;
-
-	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply)
 		goto out_duplicate;
 
-	/* Sanity checking has passed. We are now committed
-	 * to complete this transaction.
-	 */
 	list_replace_init(&req->rl_registered, &mws);
 	rpcrdma_mark_remote_invalidation(&mws, rep);
-	list_del_init(&rqst->rq_list);
+
+	/* Avoid races with signals and duplicate replies
+	 * by marking this req as matched.
+	 */
 	req->rl_reply = rep;
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
+
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-	xprt->reestablish_timeout = 0;
+	/* Invalidate and unmap the data payloads before waking the
+	 * waiting application. This guarantees the memory regions
+	 * are properly fenced from the server before the application
+	 * accesses the data. It also ensures proper send flow control:
+	 * waking the next RPC waits until this RPC has relinquished
+	 * all its Send Queue entries.
+	 */
+	if (!list_empty(&mws))
+		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+	/* Perform XID lookup, reconstruction of the RPC reply, and
+	 * RPC completion while holding the transport lock to ensure
+	 * the rep, rqst, and rq_task pointers remain stable.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (!rqst)
+		goto out_norqst;
+	xprt->reestablish_timeout = 0;
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
 
@@ -1109,17 +1130,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	}
 
 out:
-	/* Invalidate and flush the data payloads before waking the
-	 * waiting application. This guarantees the memory region is
-	 * properly fenced from the server before the application
-	 * accesses the data. It also ensures proper send flow
-	 * control: waking the next RPC waits until this RPC has
-	 * relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&mws))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
-
-	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
@@ -1128,7 +1138,7 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	xprt_complete_rqst(rqst->rq_task, status);
 	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-			__func__, xprt, rqst, status);
+		__func__, xprt, rqst, status);
 	return;
 
 out_badstatus:
@@ -1177,26 +1187,37 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+	spin_unlock_bh(&xprt->transport_lock);
+	rpcrdma_buffer_put(req);
+	dprintk("RPC:       %s: race, no rqst left for req %p\n",
+		__func__, req);
+	return;
+
 out_shortreply:
 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
 out_nomatch:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
 		__func__, be32_to_cpu(headerp->rm_xid),
 		rep->rr_len);
 	goto repost;
 
 out_duplicate:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
 	r_xprt->rx_stats.bad_reply_count++;
 	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 62ecbcc..d1c458e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,8 @@
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	if (unlikely(!list_empty(&req->rl_registered)))
+	rpcrdma_remove_req(&r_xprt->rx_buf, req);
+	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index df72224..a215a87 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1032,6 +1032,7 @@ struct rpcrdma_rep *
 	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
+	INIT_LIST_HEAD(&buf->rb_pending);
 	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 			  rpcrdma_mr_refresh_worker);
@@ -1084,7 +1085,7 @@ struct rpcrdma_rep *
 
 	req = list_first_entry(&buf->rb_send_bufs,
 			       struct rpcrdma_req, rl_list);
-	list_del(&req->rl_list);
+	list_del_init(&req->rl_list);
 	return req;
 }
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index ad918c8..b282d3f 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -341,6 +341,7 @@ enum {
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_list;
+	__be32			rl_xid;
 	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -402,6 +403,7 @@ struct rpcrdma_buffer {
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
+	struct list_head	rb_pending;
 	u32			rb_max_requests;
 	atomic_t		rb_credits;	/* most recent credit grant */
 
@@ -550,6 +552,34 @@ int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&req->rl_list))
+		list_add_tail(&req->rl_list, &buffers->rb_pending);
+	spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+	struct rpcrdma_req *pos;
+
+	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+		if (pos->rl_xid == xid)
+			return pos;
+	return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	list_del(&req->rl_list);
+	spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux