[PATCH v1 06/19] svcrdma: Introduce svc_rdma_recv_ctxt

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
To reduce contention further, separate the use of these objects in
the Receive and Send paths in svcrdma.

Subsequent patches will take advantage of this separation by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.

I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and recv_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.

As a final clean up, helpers related to recv_ctxt are moved closer
to the functions that use them.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/svc_rdma.h          |   24 ++
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c  |  318 ++++++++++++++++++++++++++----
 net/sunrpc/xprtrdma/svc_rdma_rw.c        |   84 ++++----
 net/sunrpc/xprtrdma/svc_rdma_sendto.c    |    2 
 net/sunrpc/xprtrdma/svc_rdma_transport.c |  142 +------------
 5 files changed, 349 insertions(+), 221 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 88da0c9..37f759d 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -128,6 +128,9 @@ struct svcxprt_rdma {
 	unsigned long	     sc_flags;
 	struct list_head     sc_read_complete_q;
 	struct work_struct   sc_work;
+
+	spinlock_t	     sc_recv_lock;
+	struct list_head     sc_recv_ctxts;
 };
 /* sc_flags */
 #define RDMAXPRT_CONN_PENDING	3
@@ -142,6 +145,19 @@ struct svcxprt_rdma {
 
 #define RPCSVC_MAXPAYLOAD_RDMA	RPCSVC_MAXPAYLOAD
 
+struct svc_rdma_recv_ctxt {
+	struct list_head	rc_list;
+	struct ib_recv_wr	rc_recv_wr;
+	struct ib_cqe		rc_cqe;
+	struct xdr_buf		rc_arg;
+	u32			rc_byte_len;
+	unsigned int		rc_page_count;
+	unsigned int		rc_hdr_count;
+	struct ib_sge		rc_sges[1 +
+					RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
+	struct page		*rc_pages[RPCSVC_MAXPAGES];
+};
+
 /* Track DMA maps for this transport and context */
 static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
 					   struct svc_rdma_op_ctxt *ctxt)
@@ -155,13 +171,19 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 				    struct xdr_buf *rcvbuf);
 
 /* svc_rdma_recvfrom.c */
+extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma);
+extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma);
+extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
+				   struct svc_rdma_recv_ctxt *ctxt,
+				   int free_pages);
+extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma);
 extern int svc_rdma_recvfrom(struct svc_rqst *);
 
 /* svc_rdma_rw.c */
 extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
 extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
 				    struct svc_rqst *rqstp,
-				    struct svc_rdma_op_ctxt *head, __be32 *p);
+				    struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 				     __be32 *wr_ch, struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 330d542..b7d9c55 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
 /*
- * Copyright (c) 2016, 2017 Oracle. All rights reserved.
+ * Copyright (c) 2016-2018 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
@@ -61,7 +61,7 @@
  * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
  * data payload from the client. svc_rdma_recvfrom sets up the
  * RDMA Reads using pages in svc_rqst::rq_pages, which are
- * transferred to an svc_rdma_op_ctxt for the duration of the
+ * transferred to an svc_rdma_recv_ctxt for the duration of the
  * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
  * is still not yet ready.
  *
@@ -70,18 +70,18 @@
  * svc_rdma_recvfrom again. This second call may use a different
  * svc_rqst than the first one, thus any information that needs
  * to be preserved across these two calls is kept in an
- * svc_rdma_op_ctxt.
+ * svc_rdma_recv_ctxt.
  *
  * The second call to svc_rdma_recvfrom performs final assembly
  * of the RPC Call message, using the RDMA Read sink pages kept in
- * the svc_rdma_op_ctxt. The xdr_buf is copied from the
- * svc_rdma_op_ctxt to the second svc_rqst. The second call returns
+ * the svc_rdma_recv_ctxt. The xdr_buf is copied from the
+ * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns
  * the length of the completed RPC Call message.
  *
  * Page Management
  *
  * Pages under I/O must be transferred from the first svc_rqst to an
- * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns.
+ * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns.
  *
  * The first svc_rqst supplies pages for RDMA Reads. These are moved
  * from rqstp::rq_pages into ctxt::pages. The consumed elements of
@@ -89,7 +89,7 @@
  * svc_rdma_recvfrom call returns.
  *
  * During the second svc_rdma_recvfrom call, RDMA Read sink pages
- * are transferred from the svc_rdma_op_ctxt to the second svc_rqst
+ * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst
  * (see rdma_read_complete() below).
  */
 
@@ -108,13 +108,247 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
+
+static inline struct svc_rdma_recv_ctxt *
+svc_rdma_next_recv_ctxt(struct list_head *list)
+{
+	return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
+					rc_list);
+}
+
+/**
+ * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt
+ * @rdma: svcxprt_rdma being torn down
+ *
+ */
+void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_recv_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) {
+		list_del(&ctxt->rc_list);
+		kfree(ctxt);
+	}
+}
+
+static struct svc_rdma_recv_ctxt *
+svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_recv_ctxt *ctxt;
+
+	spin_lock(&rdma->sc_recv_lock);
+	ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts);
+	if (!ctxt)
+		goto out_empty;
+	list_del(&ctxt->rc_list);
+	spin_unlock(&rdma->sc_recv_lock);
+
+out:
+	ctxt->rc_recv_wr.num_sge = 0;
+	ctxt->rc_page_count = 0;
+	return ctxt;
+
+out_empty:
+	spin_unlock(&rdma->sc_recv_lock);
+
+	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+	if (!ctxt)
+		return NULL;
+	goto out;
+}
+
+static void svc_rdma_recv_ctxt_unmap(struct svcxprt_rdma *rdma,
+				     struct svc_rdma_recv_ctxt *ctxt)
+{
+	struct ib_device *device = rdma->sc_cm_id->device;
+	int i;
+
+	for (i = 0; i < ctxt->rc_recv_wr.num_sge; i++)
+		ib_dma_unmap_page(device,
+				  ctxt->rc_sges[i].addr,
+				  ctxt->rc_sges[i].length,
+				  DMA_FROM_DEVICE);
+}
+
+/**
+ * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list
+ * @rdma: controlling svcxprt_rdma
+ * @ctxt: object to return to the free list
+ * @free_pages: Non-zero if rc_pages should be freed
+ *
+ */
+void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
+			    struct svc_rdma_recv_ctxt *ctxt,
+			    int free_pages)
+{
+	unsigned int i;
+
+	if (free_pages)
+		for (i = 0; i < ctxt->rc_page_count; i++)
+			put_page(ctxt->rc_pages[i]);
+	spin_lock(&rdma->sc_recv_lock);
+	list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts);
+	spin_unlock(&rdma->sc_recv_lock);
+}
+
+static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
+{
+	struct ib_device *device = rdma->sc_cm_id->device;
+	struct svc_rdma_recv_ctxt *ctxt;
+	struct ib_recv_wr *bad_recv_wr;
+	int sge_no, buflen, ret;
+	struct page *page;
+	dma_addr_t pa;
+
+	ctxt = svc_rdma_recv_ctxt_get(rdma);
+	if (!ctxt)
+		return -ENOMEM;
+
+	buflen = 0;
+	ctxt->rc_cqe.done = svc_rdma_wc_receive;
+	for (sge_no = 0; buflen < rdma->sc_max_req_size; sge_no++) {
+		if (sge_no >= rdma->sc_max_sge) {
+			pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+			goto err_put_ctxt;
+		}
+
+		page = alloc_page(GFP_KERNEL);
+		if (!page)
+			goto err_put_ctxt;
+		ctxt->rc_pages[sge_no] = page;
+		ctxt->rc_page_count++;
+
+		pa = ib_dma_map_page(device, ctxt->rc_pages[sge_no],
+				     0, PAGE_SIZE, DMA_FROM_DEVICE);
+		if (ib_dma_mapping_error(device, pa))
+			goto err_put_ctxt;
+		ctxt->rc_sges[sge_no].addr = pa;
+		ctxt->rc_sges[sge_no].length = PAGE_SIZE;
+		ctxt->rc_sges[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+		ctxt->rc_recv_wr.num_sge++;
+
+		buflen += PAGE_SIZE;
+	}
+	ctxt->rc_recv_wr.next = NULL;
+	ctxt->rc_recv_wr.sg_list = &ctxt->rc_sges[0];
+	ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
+
+	svc_xprt_get(&rdma->sc_xprt);
+	ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, &bad_recv_wr);
+	trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret);
+	if (ret)
+		goto err_post;
+	return 0;
+
+err_put_ctxt:
+	svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+	svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+	return -ENOMEM;
+err_post:
+	svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+	svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+	svc_xprt_put(&rdma->sc_xprt);
+	return ret;
+}
+
+/**
+ * svc_rdma_post_recvs - Post initial set of Recv WRs
+ * @rdma: fresh svcxprt_rdma
+ *
+ * Returns true if successful, otherwise false.
+ */
+bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
+{
+	unsigned int i;
+	int ret;
+
+	for (i = 0; i < rdma->sc_max_requests; i++) {
+		ret = svc_rdma_post_recv(rdma);
+		if (ret) {
+			pr_err("svcrdma: failure posting recv buffers: %d\n",
+			       ret);
+			return false;
+		}
+	}
+	return true;
+}
+
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq: Completion Queue context
+ * @wc: Work Completion object
+ *
+ * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
+ * the Receive completion handler could be running.
+ */
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct svcxprt_rdma *rdma = cq->cq_context;
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_recv_ctxt *ctxt;
+
+	trace_svcrdma_wc_receive(wc);
+
+	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
+	ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
+	svc_rdma_recv_ctxt_unmap(rdma, ctxt);
+
+	if (wc->status != IB_WC_SUCCESS)
+		goto flushed;
+
+	if (svc_rdma_post_recv(rdma))
+		goto post_err;
+
+	/* All wc fields are now known to be valid */
+	ctxt->rc_byte_len = wc->byte_len;
+	spin_lock(&rdma->sc_rq_dto_lock);
+	list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
+	spin_unlock(&rdma->sc_rq_dto_lock);
+	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
+		svc_xprt_enqueue(&rdma->sc_xprt);
+	goto out;
+
+flushed:
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
+post_err:
+	svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	svc_xprt_enqueue(&rdma->sc_xprt);
+out:
+	svc_xprt_put(&rdma->sc_xprt);
+}
+
+/**
+ * svc_rdma_flush_recv_queues - Drain pending Receive work
+ * @rdma: svcxprt_rdma being shut down
+ *
+ */
+void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_recv_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) {
+		list_del(&ctxt->rc_list);
+		svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+	}
+	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
+		list_del(&ctxt->rc_list);
+		svc_rdma_recv_ctxt_put(rdma, ctxt, 1);
+	}
+}
+
 /*
  * Replace the pages in the rq_argpages array with the pages from the SGE in
  * the RDMA_RECV completion. The SGL should contain full pages up until the
  * last one.
  */
 static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
-				   struct svc_rdma_op_ctxt *ctxt)
+				   struct svc_rdma_recv_ctxt *ctxt)
 {
 	struct page *page;
 	int sge_no;
@@ -123,30 +357,30 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
 	/* The reply path assumes the Call's transport header resides
 	 * in rqstp->rq_pages[0].
 	 */
-	page = ctxt->pages[0];
+	page = ctxt->rc_pages[0];
 	put_page(rqstp->rq_pages[0]);
 	rqstp->rq_pages[0] = page;
 
 	/* Set up the XDR head */
 	rqstp->rq_arg.head[0].iov_base = page_address(page);
 	rqstp->rq_arg.head[0].iov_len =
-		min_t(size_t, ctxt->byte_len, ctxt->sge[0].length);
-	rqstp->rq_arg.len = ctxt->byte_len;
-	rqstp->rq_arg.buflen = ctxt->byte_len;
+		min_t(size_t, ctxt->rc_byte_len, ctxt->rc_sges[0].length);
+	rqstp->rq_arg.len = ctxt->rc_byte_len;
+	rqstp->rq_arg.buflen = ctxt->rc_byte_len;
 
 	/* Compute bytes past head in the SGL */
-	len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len;
+	len = ctxt->rc_byte_len - rqstp->rq_arg.head[0].iov_len;
 
 	/* If data remains, store it in the pagelist */
 	rqstp->rq_arg.page_len = len;
 	rqstp->rq_arg.page_base = 0;
 
 	sge_no = 1;
-	while (len && sge_no < ctxt->count) {
-		page = ctxt->pages[sge_no];
+	while (len && sge_no < ctxt->rc_recv_wr.num_sge) {
+		page = ctxt->rc_pages[sge_no];
 		put_page(rqstp->rq_pages[sge_no]);
 		rqstp->rq_pages[sge_no] = page;
-		len -= min_t(u32, len, ctxt->sge[sge_no].length);
+		len -= min_t(u32, len, ctxt->rc_sges[sge_no].length);
 		sge_no++;
 	}
 	rqstp->rq_respages = &rqstp->rq_pages[sge_no];
@@ -154,11 +388,11 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
 
 	/* If not all pages were used from the SGL, free the remaining ones */
 	len = sge_no;
-	while (sge_no < ctxt->count) {
-		page = ctxt->pages[sge_no++];
+	while (sge_no < ctxt->rc_recv_wr.num_sge) {
+		page = ctxt->rc_pages[sge_no++];
 		put_page(page);
 	}
-	ctxt->count = len;
+	ctxt->rc_page_count = len;
 
 	/* Set up tail */
 	rqstp->rq_arg.tail[0].iov_base = NULL;
@@ -364,29 +598,29 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg)
 }
 
 static void rdma_read_complete(struct svc_rqst *rqstp,
-			       struct svc_rdma_op_ctxt *head)
+			       struct svc_rdma_recv_ctxt *head)
 {
 	int page_no;
 
 	/* Copy RPC pages */
-	for (page_no = 0; page_no < head->count; page_no++) {
+	for (page_no = 0; page_no < head->rc_page_count; page_no++) {
 		put_page(rqstp->rq_pages[page_no]);
-		rqstp->rq_pages[page_no] = head->pages[page_no];
+		rqstp->rq_pages[page_no] = head->rc_pages[page_no];
 	}
 
 	/* Point rq_arg.pages past header */
-	rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
-	rqstp->rq_arg.page_len = head->arg.page_len;
+	rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count];
+	rqstp->rq_arg.page_len = head->rc_arg.page_len;
 
 	/* rq_respages starts after the last arg page */
 	rqstp->rq_respages = &rqstp->rq_pages[page_no];
 	rqstp->rq_next_page = rqstp->rq_respages + 1;
 
 	/* Rebuild rq_arg head and tail. */
-	rqstp->rq_arg.head[0] = head->arg.head[0];
-	rqstp->rq_arg.tail[0] = head->arg.tail[0];
-	rqstp->rq_arg.len = head->arg.len;
-	rqstp->rq_arg.buflen = head->arg.buflen;
+	rqstp->rq_arg.head[0] = head->rc_arg.head[0];
+	rqstp->rq_arg.tail[0] = head->rc_arg.tail[0];
+	rqstp->rq_arg.len = head->rc_arg.len;
+	rqstp->rq_arg.buflen = head->rc_arg.buflen;
 }
 
 static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
@@ -506,28 +740,26 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	struct svc_xprt *xprt = rqstp->rq_xprt;
 	struct svcxprt_rdma *rdma_xprt =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
-	struct svc_rdma_op_ctxt *ctxt;
+	struct svc_rdma_recv_ctxt *ctxt;
 	__be32 *p;
 	int ret;
 
 	spin_lock(&rdma_xprt->sc_rq_dto_lock);
-	if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
-		ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q,
-					struct svc_rdma_op_ctxt, list);
-		list_del(&ctxt->list);
+	ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
+	if (ctxt) {
+		list_del(&ctxt->rc_list);
 		spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 		rdma_read_complete(rqstp, ctxt);
 		goto complete;
-	} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
-		ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q,
-					struct svc_rdma_op_ctxt, list);
-		list_del(&ctxt->list);
-	} else {
+	}
+	ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
+	if (!ctxt) {
 		/* No new incoming requests, terminate the loop */
 		clear_bit(XPT_DATA, &xprt->xpt_flags);
 		spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 		return 0;
 	}
+	list_del(&ctxt->rc_list);
 	spin_unlock(&rdma_xprt->sc_rq_dto_lock);
 
 	atomic_inc(&rdma_stat_recv);
@@ -545,7 +777,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	if (svc_rdma_is_backchannel_reply(xprt, p)) {
 		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p,
 					       &rqstp->rq_arg);
-		svc_rdma_put_context(ctxt, 0);
+		svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
 		return ret;
 	}
 
@@ -554,7 +786,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 		goto out_readchunk;
 
 complete:
-	svc_rdma_put_context(ctxt, 0);
+	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
 	rqstp->rq_prot = IPPROTO_MAX;
 	svc_xprt_copy_addrs(rqstp, xprt);
 	return rqstp->rq_arg.len;
@@ -567,16 +799,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 
 out_err:
 	svc_rdma_send_error(rdma_xprt, p, ret);
-	svc_rdma_put_context(ctxt, 0);
+	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0);
 	return 0;
 
 out_postfail:
 	if (ret == -EINVAL)
 		svc_rdma_send_error(rdma_xprt, p, ret);
-	svc_rdma_put_context(ctxt, 1);
+	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1);
 	return ret;
 
 out_drop:
-	svc_rdma_put_context(ctxt, 1);
+	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1);
 	return 0;
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 887ceef..c080ce2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 /*
- * Copyright (c) 2016 Oracle.  All rights reserved.
+ * Copyright (c) 2016-2018 Oracle.  All rights reserved.
  *
  * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
  */
@@ -227,7 +227,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 /* State for pulling a Read chunk.
  */
 struct svc_rdma_read_info {
-	struct svc_rdma_op_ctxt		*ri_readctxt;
+	struct svc_rdma_recv_ctxt	*ri_readctxt;
 	unsigned int			ri_position;
 	unsigned int			ri_pageno;
 	unsigned int			ri_pageoff;
@@ -282,10 +282,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
 			pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
 			       ib_wc_status_msg(wc->status),
 			       wc->status, wc->vendor_err);
-		svc_rdma_put_context(info->ri_readctxt, 1);
+		svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt, 1);
 	} else {
 		spin_lock(&rdma->sc_rq_dto_lock);
-		list_add_tail(&info->ri_readctxt->list,
+		list_add_tail(&info->ri_readctxt->rc_list,
 			      &rdma->sc_read_complete_q);
 		spin_unlock(&rdma->sc_rq_dto_lock);
 
@@ -607,7 +607,7 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 				       struct svc_rqst *rqstp,
 				       u32 rkey, u32 len, u64 offset)
 {
-	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
 	struct svc_rdma_rw_ctxt *ctxt;
 	unsigned int sge_no, seg_len;
@@ -625,10 +625,10 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
 		seg_len = min_t(unsigned int, len,
 				PAGE_SIZE - info->ri_pageoff);
 
-		head->arg.pages[info->ri_pageno] =
+		head->rc_arg.pages[info->ri_pageno] =
 			rqstp->rq_pages[info->ri_pageno];
 		if (!info->ri_pageoff)
-			head->count++;
+			head->rc_page_count++;
 
 		sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
 			    seg_len, info->ri_pageoff);
@@ -705,9 +705,9 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
 }
 
 /* Construct RDMA Reads to pull over a normal Read chunk. The chunk
- * data lands in the page list of head->arg.pages.
+ * data lands in the page list of head->rc_arg.pages.
  *
- * Currently NFSD does not look at the head->arg.tail[0] iovec.
+ * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
  * Therefore, XDR round-up of the Read chunk and trailing
  * inline content must both be added at the end of the pagelist.
  */
@@ -715,10 +715,10 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 					    struct svc_rdma_read_info *info,
 					    __be32 *p)
 {
-	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	int ret;
 
-	info->ri_pageno = head->hdr_count;
+	info->ri_pageno = head->rc_hdr_count;
 	info->ri_pageoff = 0;
 
 	ret = svc_rdma_build_read_chunk(rqstp, info, p);
@@ -732,11 +732,11 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	 * chunk is not included in either the pagelist or in
 	 * the tail.
 	 */
-	head->arg.tail[0].iov_base =
-		head->arg.head[0].iov_base + info->ri_position;
-	head->arg.tail[0].iov_len =
-		head->arg.head[0].iov_len - info->ri_position;
-	head->arg.head[0].iov_len = info->ri_position;
+	head->rc_arg.tail[0].iov_base =
+		head->rc_arg.head[0].iov_base + info->ri_position;
+	head->rc_arg.tail[0].iov_len =
+		head->rc_arg.head[0].iov_len - info->ri_position;
+	head->rc_arg.head[0].iov_len = info->ri_position;
 
 	/* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
 	 *
@@ -749,9 +749,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 	 */
 	info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2;
 
-	head->arg.page_len = info->ri_chunklen;
-	head->arg.len += info->ri_chunklen;
-	head->arg.buflen += info->ri_chunklen;
+	head->rc_arg.page_len = info->ri_chunklen;
+	head->rc_arg.len += info->ri_chunklen;
+	head->rc_arg.buflen += info->ri_chunklen;
 
 out:
 	return ret;
@@ -760,7 +760,7 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
 /* Construct RDMA Reads to pull over a Position Zero Read chunk.
  * The start of the data lands in the first page just after
  * the Transport header, and the rest lands in the page list of
- * head->arg.pages.
+ * head->rc_arg.pages.
  *
  * Assumptions:
  *	- A PZRC has an XDR-aligned length (no implicit round-up).
@@ -772,11 +772,11 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 					struct svc_rdma_read_info *info,
 					__be32 *p)
 {
-	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
 	int ret;
 
-	info->ri_pageno = head->hdr_count - 1;
-	info->ri_pageoff = offset_in_page(head->byte_len);
+	info->ri_pageno = head->rc_hdr_count - 1;
+	info->ri_pageoff = offset_in_page(head->rc_byte_len);
 
 	ret = svc_rdma_build_read_chunk(rqstp, info, p);
 	if (ret < 0)
@@ -784,22 +784,22 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
 
 	trace_svcrdma_encode_pzr(info->ri_chunklen);
 
-	head->arg.len += info->ri_chunklen;
-	head->arg.buflen += info->ri_chunklen;
+	head->rc_arg.len += info->ri_chunklen;
+	head->rc_arg.buflen += info->ri_chunklen;
 
-	if (head->arg.buflen <= head->sge[0].length) {
+	if (head->rc_arg.buflen <= head->rc_sges[0].length) {
 		/* Transport header and RPC message fit entirely
 		 * in page where head iovec resides.
 		 */
-		head->arg.head[0].iov_len = info->ri_chunklen;
+		head->rc_arg.head[0].iov_len = info->ri_chunklen;
 	} else {
 		/* Transport header and part of RPC message reside
 		 * in the head iovec's page.
 		 */
-		head->arg.head[0].iov_len =
-				head->sge[0].length - head->byte_len;
-		head->arg.page_len =
-				info->ri_chunklen - head->arg.head[0].iov_len;
+		head->rc_arg.head[0].iov_len =
+			head->rc_sges[0].length - head->rc_byte_len;
+		head->rc_arg.page_len =
+			info->ri_chunklen - head->rc_arg.head[0].iov_len;
 	}
 
 out:
@@ -824,24 +824,24 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
  * - All Read segments in @p have the same Position value.
  */
 int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
-			     struct svc_rdma_op_ctxt *head, __be32 *p)
+			     struct svc_rdma_recv_ctxt *head, __be32 *p)
 {
 	struct svc_rdma_read_info *info;
 	struct page **page;
 	int ret;
 
 	/* The request (with page list) is constructed in
-	 * head->arg. Pages involved with RDMA Read I/O are
+	 * head->rc_arg. Pages involved with RDMA Read I/O are
 	 * transferred there.
 	 */
-	head->hdr_count = head->count;
-	head->arg.head[0] = rqstp->rq_arg.head[0];
-	head->arg.tail[0] = rqstp->rq_arg.tail[0];
-	head->arg.pages = head->pages;
-	head->arg.page_base = 0;
-	head->arg.page_len = 0;
-	head->arg.len = rqstp->rq_arg.len;
-	head->arg.buflen = rqstp->rq_arg.buflen;
+	head->rc_hdr_count = head->rc_page_count;
+	head->rc_arg.head[0] = rqstp->rq_arg.head[0];
+	head->rc_arg.tail[0] = rqstp->rq_arg.tail[0];
+	head->rc_arg.pages = head->rc_pages;
+	head->rc_arg.page_base = 0;
+	head->rc_arg.page_len = 0;
+	head->rc_arg.len = rqstp->rq_arg.len;
+	head->rc_arg.buflen = rqstp->rq_arg.buflen;
 
 	info = svc_rdma_read_info_alloc(rdma);
 	if (!info)
@@ -867,7 +867,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
 
 out:
 	/* Read sink pages have been moved from rqstp->rq_pages to
-	 * head->arg.pages. Force svc_recv to refill those slots
+	 * head->rc_arg.pages. Force svc_recv to refill those slots
 	 * in rq_pages.
 	 */
 	for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index fed28de..a397d9a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,6 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
 /*
- * Copyright (c) 2016 Oracle. All rights reserved.
+ * Copyright (c) 2016-2018 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index ca9001d..afd5e61 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -63,7 +63,6 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
-static int svc_rdma_post_recv(struct svcxprt_rdma *xprt);
 static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
 						 struct net *net);
 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
@@ -175,11 +174,7 @@ static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
 {
 	unsigned int i;
 
-	/* Each RPC/RDMA credit can consume one Receive and
-	 * one Send WQE at the same time.
-	 */
-	i = xprt->sc_sq_depth + xprt->sc_rq_depth;
-
+	i = xprt->sc_sq_depth;
 	while (i--) {
 		struct svc_rdma_op_ctxt *ctxt;
 
@@ -298,54 +293,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
 }
 
 /**
- * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct svcxprt_rdma *xprt = cq->cq_context;
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct svc_rdma_op_ctxt *ctxt;
-
-	trace_svcrdma_wc_receive(wc);
-
-	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
-	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-	svc_rdma_unmap_dma(ctxt);
-
-	if (wc->status != IB_WC_SUCCESS)
-		goto flushed;
-
-	/* All wc fields are now known to be valid */
-	ctxt->byte_len = wc->byte_len;
-	spin_lock(&xprt->sc_rq_dto_lock);
-	list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q);
-	spin_unlock(&xprt->sc_rq_dto_lock);
-
-	svc_rdma_post_recv(xprt);
-
-	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-	if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-		goto out;
-	goto out_enqueue;
-
-flushed:
-	if (wc->status != IB_WC_WR_FLUSH_ERR)
-		pr_err("svcrdma: Recv: %s (%u/0x%x)\n",
-		       ib_wc_status_msg(wc->status),
-		       wc->status, wc->vendor_err);
-	set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-	svc_rdma_put_context(ctxt, 1);
-
-out_enqueue:
-	svc_xprt_enqueue(&xprt->sc_xprt);
-out:
-	svc_xprt_put(&xprt->sc_xprt);
-}
-
-/**
  * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
  * @cq:        completion queue
  * @wc:        completed WR
@@ -392,12 +339,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
 	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+	INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_ctxt_lock);
+	spin_lock_init(&cma_xprt->sc_recv_lock);
 	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
 	/*
@@ -411,63 +360,6 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
 	return cma_xprt;
 }
 
-static int
-svc_rdma_post_recv(struct svcxprt_rdma *xprt)
-{
-	struct ib_recv_wr recv_wr, *bad_recv_wr;
-	struct svc_rdma_op_ctxt *ctxt;
-	struct page *page;
-	dma_addr_t pa;
-	int sge_no;
-	int buflen;
-	int ret;
-
-	ctxt = svc_rdma_get_context(xprt);
-	buflen = 0;
-	ctxt->direction = DMA_FROM_DEVICE;
-	ctxt->cqe.done = svc_rdma_wc_receive;
-	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
-		if (sge_no >= xprt->sc_max_sge) {
-			pr_err("svcrdma: Too many sges (%d)\n", sge_no);
-			goto err_put_ctxt;
-		}
-		page = alloc_page(GFP_KERNEL);
-		if (!page)
-			goto err_put_ctxt;
-		ctxt->pages[sge_no] = page;
-		pa = ib_dma_map_page(xprt->sc_cm_id->device,
-				     page, 0, PAGE_SIZE,
-				     DMA_FROM_DEVICE);
-		if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
-			goto err_put_ctxt;
-		svc_rdma_count_mappings(xprt, ctxt);
-		ctxt->sge[sge_no].addr = pa;
-		ctxt->sge[sge_no].length = PAGE_SIZE;
-		ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
-		ctxt->count = sge_no + 1;
-		buflen += PAGE_SIZE;
-	}
-	recv_wr.next = NULL;
-	recv_wr.sg_list = &ctxt->sge[0];
-	recv_wr.num_sge = ctxt->count;
-	recv_wr.wr_cqe = &ctxt->cqe;
-
-	svc_xprt_get(&xprt->sc_xprt);
-	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
-	trace_svcrdma_post_recv(&recv_wr, ret);
-	if (ret) {
-		svc_rdma_unmap_dma(ctxt);
-		svc_rdma_put_context(ctxt, 1);
-		svc_xprt_put(&xprt->sc_xprt);
-	}
-	return ret;
-
- err_put_ctxt:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 1);
-	return -ENOMEM;
-}
-
 static void
 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
 			       struct rdma_conn_param *param)
@@ -699,7 +591,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	struct ib_qp_init_attr qp_attr;
 	struct ib_device *dev;
 	struct sockaddr *sap;
-	unsigned int i, ctxts;
+	unsigned int ctxts;
 	int ret = 0;
 
 	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
@@ -804,14 +696,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	    !rdma_ib_or_roce(dev, newxprt->sc_port_num))
 		goto errout;
 
-	/* Post receive buffers */
-	for (i = 0; i < newxprt->sc_max_requests; i++) {
-		ret = svc_rdma_post_recv(newxprt);
-		if (ret) {
-			dprintk("svcrdma: failure posting receive buffers\n");
-			goto errout;
-		}
-	}
+	if (!svc_rdma_post_recvs(newxprt))
+		goto errout;
 
 	/* Swap out the handler */
 	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
@@ -908,20 +794,7 @@ static void __svc_rdma_free(struct work_struct *work)
 		pr_err("svcrdma: sc_xprt still in use? (%d)\n",
 		       kref_read(&xprt->xpt_ref));
 
-	while (!list_empty(&rdma->sc_read_complete_q)) {
-		struct svc_rdma_op_ctxt *ctxt;
-		ctxt = list_first_entry(&rdma->sc_read_complete_q,
-					struct svc_rdma_op_ctxt, list);
-		list_del(&ctxt->list);
-		svc_rdma_put_context(ctxt, 1);
-	}
-	while (!list_empty(&rdma->sc_rq_dto_q)) {
-		struct svc_rdma_op_ctxt *ctxt;
-		ctxt = list_first_entry(&rdma->sc_rq_dto_q,
-					struct svc_rdma_op_ctxt, list);
-		list_del(&ctxt->list);
-		svc_rdma_put_context(ctxt, 1);
-	}
+	svc_rdma_flush_recv_queues(rdma);
 
 	/* Warn if we leaked a resource or under-referenced */
 	if (rdma->sc_ctxt_used != 0)
@@ -936,6 +809,7 @@ static void __svc_rdma_free(struct work_struct *work)
 
 	svc_rdma_destroy_rw_ctxts(rdma);
 	svc_rdma_destroy_ctxts(rdma);
+	svc_rdma_recv_ctxts_destroy(rdma);
 
 	/* Destroy the QP if present (not a listener) */
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux