[PATCH v1 06/20] xprtrdma: Refactor MR recovery work queues

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.

The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.

Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 net/sunrpc/xprtrdma/fmr_ops.c   |   85 +++++++++++----------------------------
 net/sunrpc/xprtrdma/frwr_ops.c  |   68 ++++++-------------------------
 net/sunrpc/xprtrdma/transport.c |   11 -----
 net/sunrpc/xprtrdma/verbs.c     |   42 +++++++++++++++++++
 net/sunrpc/xprtrdma/xprt_rdma.h |   11 +++--
 5 files changed, 87 insertions(+), 130 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b581a28..45800c9 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,30 +35,6 @@
 /* Maximum scatter/gather per FMR */
 #define RPCRDMA_MAX_FMR_SGES	(64)
 
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND)
-
-int
-fmr_alloc_recovery_wq(void)
-{
-	fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
-	return !fmr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-fmr_destroy_recovery_wq(void)
-{
-	struct workqueue_struct *wq;
-
-	if (!fmr_recovery_wq)
-		return;
-
-	wq = fmr_recovery_wq;
-	fmr_recovery_wq = NULL;
-	destroy_workqueue(wq);
-}
-
 static int
 __fmr_unmap(struct rpcrdma_mw *mw)
 {
@@ -68,54 +44,30 @@ __fmr_unmap(struct rpcrdma_mw *mw)
 	return ib_unmap_fmr(&l);
 }
 
-static void
-__fmr_dma_unmap(struct rpcrdma_mw *mw)
-{
-	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
-
-	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
-			mw->mw_sg, mw->mw_nents, mw->mw_dir);
-	rpcrdma_put_mw(r_xprt, mw);
-}
-
+/* Reset of a single FMR.
+ *
+ * There's no recovery if this fails. The FMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
 static void
 __fmr_reset_and_unmap(struct rpcrdma_mw *mw)
 {
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
 	int rc;
 
 	/* ORDER */
 	rc = __fmr_unmap(mw);
+	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+			mw->mw_sg, mw->mw_nents, mw->mw_dir);
 	if (rc) {
 		pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
 			rc, mw);
 		return;
 	}
-	__fmr_dma_unmap(mw);
+	rpcrdma_put_mw(r_xprt, mw);
  }
 
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
- */
-static void
-__fmr_recovery_worker(struct work_struct *work)
-{
-	struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
-					     mw_work);
-
-	__fmr_reset_and_unmap(mw);
-	return;
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
-{
-	INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
-	queue_work(fmr_recovery_wq, &mw->mw_work);
-}
-
 static void
 __fmr_release(struct rpcrdma_mw *r)
 {
@@ -142,6 +94,12 @@ fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	return 0;
 }
 
+static void
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
+{
+	__fmr_reset_and_unmap(mw);
+}
+
 /* FMR mode conveys up to 64 pages of payload per chunk segment.
  */
 static size_t
@@ -287,7 +245,9 @@ out_maperr:
 	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
 	       len, (unsigned long long)dma_pages[0],
 	       pageoff, mw->mw_nents, rc);
-	__fmr_dma_unmap(mw);
+	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+			mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	rpcrdma_put_mw(r_xprt, mw);
 	return rc;
 }
 
@@ -331,7 +291,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		seg = &req->rl_segments[i];
 		mw = seg->rl_mw;
 
-		__fmr_dma_unmap(mw);
+		ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+				mw->mw_sg, mw->mw_nents, mw->mw_dir);
+		rpcrdma_put_mw(r_xprt, mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
@@ -359,7 +321,7 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 		if (sync)
 			__fmr_reset_and_unmap(mw);
 		else
-			__fmr_queue_recovery(mw);
+			rpcrdma_defer_mr_recovery(mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
@@ -384,6 +346,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
 	.ro_map				= fmr_op_map,
 	.ro_unmap_sync			= fmr_op_unmap_sync,
 	.ro_unmap_safe			= fmr_op_unmap_safe,
+	.ro_recover_mr			= fmr_op_recover_mr,
 	.ro_open			= fmr_op_open,
 	.ro_maxpages			= fmr_op_maxpages,
 	.ro_init			= fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a476fa6..19bddfe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -73,31 +73,6 @@
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND | WQ_MEM_RECLAIM)
-
-int
-frwr_alloc_recovery_wq(void)
-{
-	frwr_recovery_wq = alloc_workqueue("frwr_recovery",
-					   FRWR_RECOVERY_WQ_FLAGS, 0);
-	return !frwr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-frwr_destroy_recovery_wq(void)
-{
-	struct workqueue_struct *wq;
-
-	if (!frwr_recovery_wq)
-		return;
-
-	wq = frwr_recovery_wq;
-	frwr_recovery_wq = NULL;
-	destroy_workqueue(wq);
-}
-
 static int
 __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 {
@@ -124,6 +99,12 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 	return 0;
 }
 
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
 static void
 __frwr_reset_and_unmap(struct rpcrdma_mw *mw)
 {
@@ -138,30 +119,10 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw)
 	rpcrdma_put_mw(r_xprt, mw);
 }
 
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
- */
-static void
-__frwr_recovery_worker(struct work_struct *work)
-{
-	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-					    mw_work);
-
-	__frwr_reset_and_unmap(r);
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
 static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
 {
-	INIT_WORK(&r->mw_work, __frwr_recovery_worker);
-	queue_work(frwr_recovery_wq, &r->mw_work);
+	__frwr_reset_and_unmap(mw);
 }
 
 static int
@@ -399,7 +360,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	seg1->rl_mw = NULL;
 	do {
 		if (mw)
-			__frwr_queue_recovery(mw);
+			rpcrdma_defer_mr_recovery(mw);
 		mw = rpcrdma_get_mw(r_xprt);
 		if (!mw)
 			return -ENOMEM;
@@ -481,12 +442,11 @@ out_mapmr_err:
 	pr_err("rpcrdma: failed to register mr %p (%u/%u)\n",
 	       frmr->fr_mr, n, mw->mw_nents);
 	rc = n < 0 ? n : -EIO;
-	__frwr_queue_recovery(mw);
+	rpcrdma_defer_mr_recovery(mw);
 	return rc;
 
 out_senderr:
-	pr_err("rpcrdma: ib_post_send status %i\n", rc);
-	__frwr_queue_recovery(mw);
+	rpcrdma_defer_mr_recovery(mw);
 	return rc;
 }
 
@@ -627,7 +587,7 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 		if (sync)
 			__frwr_reset_and_unmap(mw);
 		else
-			__frwr_queue_recovery(mw);
+			rpcrdma_defer_mr_recovery(mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
@@ -640,9 +600,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_mw *r;
 
-	/* Ensure stale MWs for "buf" are no longer in flight */
-	flush_workqueue(frwr_recovery_wq);
-
 	while (!list_empty(&buf->rb_all)) {
 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
 		list_del(&r->mw_all);
@@ -655,6 +612,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 	.ro_map				= frwr_op_map,
 	.ro_unmap_sync			= frwr_op_unmap_sync,
 	.ro_unmap_safe			= frwr_op_unmap_safe,
+	.ro_recover_mr			= frwr_op_recover_mr,
 	.ro_open			= frwr_op_open,
 	.ro_maxpages			= frwr_op_maxpages,
 	.ro_init			= frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 99d2e5b..29c91bf 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -741,7 +741,6 @@ void xprt_rdma_cleanup(void)
 			__func__, rc);
 
 	rpcrdma_destroy_wq();
-	frwr_destroy_recovery_wq();
 
 	rc = xprt_unregister_transport(&xprt_rdma_bc);
 	if (rc)
@@ -753,20 +752,13 @@ int xprt_rdma_init(void)
 {
 	int rc;
 
-	rc = frwr_alloc_recovery_wq();
-	if (rc)
-		return rc;
-
 	rc = rpcrdma_alloc_wq();
-	if (rc) {
-		frwr_destroy_recovery_wq();
+	if (rc)
 		return rc;
-	}
 
 	rc = xprt_register_transport(&xprt_rdma);
 	if (rc) {
 		rpcrdma_destroy_wq();
-		frwr_destroy_recovery_wq();
 		return rc;
 	}
 
@@ -774,7 +766,6 @@ int xprt_rdma_init(void)
 	if (rc) {
 		xprt_unregister_transport(&xprt_rdma);
 		rpcrdma_destroy_wq();
-		frwr_destroy_recovery_wq();
 		return rc;
 	}
 
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4f7e1ba..e9f7f727 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -762,6 +762,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	ib_drain_qp(ia->ri_id->qp);
 }
 
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+						  rb_recovery_worker.work);
+	struct rpcrdma_mw *mw;
+
+	spin_lock(&buf->rb_recovery_lock);
+	while (!list_empty(&buf->rb_stale_mrs)) {
+		mw = list_first_entry(&buf->rb_stale_mrs,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
+		spin_unlock(&buf->rb_recovery_lock);
+
+		dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
+		mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+		spin_lock(&buf->rb_recovery_lock);
+	};
+	spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+	spin_lock(&buf->rb_recovery_lock);
+	list_add(&mw->mw_list, &buf->rb_stale_mrs);
+	spin_unlock(&buf->rb_recovery_lock);
+
+	schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
 struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
@@ -863,6 +898,11 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 	}
 
+	INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+			  rpcrdma_mr_recovery_worker);
+	spin_lock_init(&buf->rb_recovery_lock);
+	INIT_LIST_HEAD(&buf->rb_stale_mrs);
+
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -911,6 +951,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
+	cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3472844..8eb02ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -244,7 +244,6 @@ struct rpcrdma_mw {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
 	};
-	struct work_struct	mw_work;
 	struct rpcrdma_xprt	*mw_xprt;
 	struct list_head	mw_all;
 };
@@ -336,6 +335,10 @@ struct rpcrdma_buffer {
 	struct list_head	rb_allreqs;
 
 	u32			rb_bc_max_requests;
+
+	spinlock_t		rb_recovery_lock;
+	struct list_head	rb_stale_mrs;
+	struct delayed_work	rb_recovery_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -395,6 +398,7 @@ struct rpcrdma_memreg_ops {
 					 struct rpcrdma_req *);
 	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *, bool);
+	void            (*ro_recover_mr)(struct rpcrdma_mw *);
 	int		(*ro_open)(struct rpcrdma_ia *,
 				   struct rpcrdma_ep *,
 				   struct rpcrdma_create_data_internal *);
@@ -471,6 +475,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
 struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
 					    size_t, gfp_t);
 void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -478,9 +484,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
 int rpcrdma_alloc_wq(void);
 void rpcrdma_destroy_wq(void);
 

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]
  Powered by Linux