[PATCH v2 10/19] svcrdma: Add recvfrom helpers to svc_rdma_rw.c

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



svc_rdma_rw.c already contains helpers for the sendto path.
Introduce helpers for the recvfrom path.

The plan is to replace the local NFSD bespoke code that constructs
and posts RDMA Read Work Requests with calls to the rdma_rw API.
This shares code with other RDMA-enabled ULPs that manages the gory
details of buffer registration and posting Work Requests.

This new code also puts all RDMA_NOMSG-specific logic in one place.

Lastly, the use of rqstp->rq_arg.pages is deprecated in favor of
using rqstp->rq_pages directly, for clarity.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/svc_rdma.h   |    3 
 net/sunrpc/xprtrdma/svc_rdma_rw.c |  424 +++++++++++++++++++++++++++++++++++++
 2 files changed, 426 insertions(+), 1 deletion(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 3ca9916..cf5d541 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -196,6 +196,9 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
 
 /* svc_rdma_rw.c */
 extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
+				    struct svc_rqst *rqstp,
+				    struct svc_rdma_op_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
 				     __be32 *wr_ch, struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index 3b35364..d88a4b5 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -12,6 +12,9 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
+
 /* Each R/W context contains state for one chain of RDMA Read or
  * Write Work Requests.
  *
@@ -177,6 +180,7 @@ struct svc_rdma_write_info {
 	info->wi_nsegs = be32_to_cpup(++chunk);
 	info->wi_segs = ++chunk;
 	svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+	info->wi_cc.cc_cqe.done = svc_rdma_write_done;
 	return info;
 }
 
@@ -216,6 +220,75 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 	svc_rdma_write_info_free(info);
 }
 
+/* State for pulling a Read chunk.
+ */
+struct svc_rdma_read_info {
+	struct svc_rdma_op_ctxt		*ri_readctxt;
+	unsigned int			ri_position;
+	unsigned int			ri_pageno;
+	unsigned int			ri_pageoff;
+	unsigned int			ri_chunklen;
+
+	struct svc_rdma_chunk_ctxt	ri_cc;
+};
+
+static struct svc_rdma_read_info *
+svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_read_info *info;
+
+	info = kmalloc(sizeof(*info), GFP_KERNEL);
+	if (!info)
+		return info;
+
+	svc_rdma_cc_init(rdma, &info->ri_cc, DMA_FROM_DEVICE);
+	info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
+	return info;
+}
+
+static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
+{
+	svc_rdma_cc_release(&info->ri_cc);
+	kfree(info);
+}
+
+/**
+ * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ */
+static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_chunk_ctxt *cc =
+			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_read_info *info =
+			container_of(cc, struct svc_rdma_read_info, ri_cc);
+
+	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+		if (wc->status != IB_WC_WR_FLUSH_ERR)
+			pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
+			       ib_wc_status_msg(wc->status),
+			       wc->status, wc->vendor_err);
+	} else {
+		spin_lock(&rdma->sc_rq_dto_lock);
+		list_add_tail(&info->ri_readctxt->list,
+			      &rdma->sc_read_complete_q);
+		spin_unlock(&rdma->sc_rq_dto_lock);
+
+		set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+		svc_xprt_enqueue(&rdma->sc_xprt);
+	}
+
+	svc_rdma_read_info_free(info);
+}
+
 /* This function sleeps when the transport's Send Queue is congested.
  *
  * Assumptions:
@@ -335,7 +408,6 @@ static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
 	__be32 *seg;
 	int ret;
 
-	cc->cc_cqe.done = svc_rdma_write_done;
 	seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
 	do {
 		unsigned int write_len;
@@ -515,3 +587,353 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
 	svc_rdma_write_info_free(info);
 	return ret;
 }
+
+static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
+				       struct svc_rqst *rqstp,
+				       u32 rkey, u32 len, u64 offset)
+{
+	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+	struct svc_rdma_rw_ctxt *ctxt;
+	unsigned int sge_no, seg_len;
+	struct scatterlist *sg;
+	int ret;
+
+	sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
+	ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
+	if (!ctxt)
+		goto out_noctx;
+	ctxt->rw_nents = sge_no;
+
+	dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
+		len, offset, rkey, sge_no);
+
+	sg = ctxt->rw_sg_table.sgl;
+	for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
+		seg_len = min_t(unsigned int, len,
+				PAGE_SIZE - info->ri_pageoff);
+
+		head->arg.pages[info->ri_pageno] =
+			rqstp->rq_pages[info->ri_pageno];
+		if (!info->ri_pageoff)
+			head->count++;
+
+		sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
+			    seg_len, info->ri_pageoff);
+		sg = sg_next(sg);
+
+		info->ri_pageoff += seg_len;
+		if (info->ri_pageoff == PAGE_SIZE) {
+			info->ri_pageno++;
+			info->ri_pageoff = 0;
+		}
+		len -= seg_len;
+
+		/* Safety check */
+		if (len &&
+		    &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
+			goto out_overrun;
+	}
+
+	ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
+			       cc->cc_rdma->sc_port_num,
+			       ctxt->rw_sg_table.sgl, ctxt->rw_nents,
+			       0, offset, rkey, DMA_FROM_DEVICE);
+	if (ret < 0)
+		goto out_initerr;
+
+	list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+	cc->cc_sqecount += ret;
+	return 0;
+
+out_noctx:
+	dprintk("svcrdma: no R/W ctxs available\n");
+	return -ENOMEM;
+
+out_overrun:
+	dprintk("svcrdma: request overruns rq_pages\n");
+	return -EINVAL;
+
+out_initerr:
+	svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
+	pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+	return -EIO;
+}
+
+static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
+				     struct svc_rdma_read_info *info,
+				     __be32 *p)
+{
+	int ret;
+
+	info->ri_chunklen = 0;
+	while (*p++ != xdr_zero) {
+		u32 rs_handle, rs_length;
+		u64 rs_offset;
+
+		if (be32_to_cpup(p++) != info->ri_position)
+			break;
+		rs_handle = be32_to_cpup(p++);
+		rs_length = be32_to_cpup(p++);
+		p = xdr_decode_hyper(p, &rs_offset);
+
+		ret = svc_rdma_build_read_segment(info, rqstp,
+						  rs_handle, rs_length,
+						  rs_offset);
+		if (ret < 0)
+			break;
+
+		info->ri_chunklen += rs_length;
+	}
+
+	return ret;
+}
+
+/* If there is inline content following the Read chunk, append it to
+ * the page list immediately following the data payload. This has to
+ * be done after the reader function has determined how many pages
+ * were consumed for RDMA Read.
+ *
+ * On entry, ri_pageno and ri_pageoff point directly to the end of the
+ * page list. On exit, both have been updated to the new "next byte".
+ *
+ * Assumptions:
+ *	- Inline content fits entirely in rq_pages[0]
+ *	- Trailing content is only a handful of bytes
+ */
+static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
+			      struct svc_rdma_read_info *info)
+{
+	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	unsigned int tail_length, remaining;
+	u8 *srcp, *destp;
+
+	/* Assert that all inline content fits in page 0. This is an
+	 * implementation limit, not a protocol limit.
+	 */
+	if (head->arg.head[0].iov_len > PAGE_SIZE) {
+		pr_warn_once("svcrdma: too much trailing inline content\n");
+		return -EINVAL;
+	}
+
+	srcp = head->arg.head[0].iov_base;
+	srcp += info->ri_position;
+	tail_length = head->arg.head[0].iov_len - info->ri_position;
+	remaining = tail_length;
+
+	/* If there is room on the last page in the page list, try to
+	 * fit the trailing content there.
+	 */
+	if (info->ri_pageoff > 0) {
+		unsigned int len;
+
+		len = min_t(unsigned int, remaining,
+			    PAGE_SIZE - info->ri_pageoff);
+		destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+		destp += info->ri_pageoff;
+
+		memcpy(destp, srcp, len);
+		srcp += len;
+		destp += len;
+		info->ri_pageoff += len;
+		remaining -= len;
+
+		if (info->ri_pageoff == PAGE_SIZE) {
+			info->ri_pageno++;
+			info->ri_pageoff = 0;
+		}
+	}
+
+	/* Otherwise, a fresh page is needed. */
+	if (remaining) {
+		head->arg.pages[info->ri_pageno] =
+				rqstp->rq_pages[info->ri_pageno];
+		head->count++;
+
+		destp = page_address(rqstp->rq_pages[info->ri_pageno]);
+		memcpy(destp, srcp, remaining);
+		info->ri_pageoff += remaining;
+	}
+
+	head->arg.page_len += tail_length;
+	head->arg.len += tail_length;
+	head->arg.buflen += tail_length;
+	return 0;
+}
+
+/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
+ * data lands in the page list of head->arg.pages.
+ *
+ * Currently NFSD does not look at the head->arg.tail[0] iovec.
+ * Therefore, XDR round-up of the Read chunk and trailing
+ * inline content must both be added at the end of the pagelist.
+ */
+static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
+					    struct svc_rdma_read_info *info,
+					    __be32 *p)
+{
+	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	int ret;
+
+	dprintk("svcrdma: Reading Read chunk at position %u\n",
+		info->ri_position);
+
+	info->ri_pageno = head->hdr_count;
+	info->ri_pageoff = 0;
+
+	ret = svc_rdma_build_read_chunk(rqstp, info, p);
+	if (ret < 0)
+		goto out;
+
+	/* Read chunk may need XDR round-up (see RFC 5666, s. 3.7).
+	 */
+	if (info->ri_chunklen & 3) {
+		u32 padlen = 4 - (info->ri_chunklen & 3);
+
+		info->ri_chunklen += padlen;
+
+		/* NB: data payload always starts on XDR alignment,
+		 * thus the pad can never contain a page boundary.
+		 */
+		info->ri_pageoff += padlen;
+		if (info->ri_pageoff == PAGE_SIZE) {
+			info->ri_pageno++;
+			info->ri_pageoff = 0;
+		}
+	}
+
+	head->arg.page_len = info->ri_chunklen;
+	head->arg.len += info->ri_chunklen;
+	head->arg.buflen += info->ri_chunklen;
+
+	if (info->ri_position < head->arg.head[0].iov_len) {
+		ret = svc_rdma_copy_tail(rqstp, info);
+		if (ret < 0)
+			goto out;
+	}
+	head->arg.head[0].iov_len = info->ri_position;
+
+out:
+	return ret;
+}
+
+/* Construct RDMA Reads to pull over a Position Zero Read chunk.
+ * The start of the data lands in the first page just after
+ * the Transport header, and the rest lands in the page list of
+ * head->arg.pages.
+ *
+ * Assumptions:
+ *	- A PZRC has an XDR-aligned length (no implicit round-up).
+ *	- There can be no trailing inline content (IOW, we assume
+ *	  a PZRC is never sent in an RDMA_MSG message, though it's
+ *	  allowed by spec).
+ */
+static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
+					struct svc_rdma_read_info *info,
+					__be32 *p)
+{
+	struct svc_rdma_op_ctxt *head = info->ri_readctxt;
+	int ret;
+
+	dprintk("svcrdma: Reading Position Zero Read chunk\n");
+
+	info->ri_pageno = head->hdr_count - 1;
+	info->ri_pageoff = offset_in_page(head->byte_len);
+
+	ret = svc_rdma_build_read_chunk(rqstp, info, p);
+	if (ret < 0)
+		goto out;
+
+	head->arg.len += info->ri_chunklen;
+	head->arg.buflen += info->ri_chunklen;
+
+	if (head->arg.len <= head->sge[0].length) {
+		/* Transport header and RPC message fit entirely
+		 * in page where head iovec resides.
+		 */
+		head->arg.head[0].iov_len = info->ri_chunklen;
+	} else {
+		/* Transport header and part of RPC message reside
+		 * in the head iovec's page.
+		 */
+		head->arg.head[0].iov_len =
+				head->sge[0].length - head->byte_len;
+		head->arg.page_len =
+				info->ri_chunklen - head->arg.head[0].iov_len;
+	}
+
+out:
+	return ret;
+}
+
+/**
+ * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * @rdma: controlling RDMA transport
+ * @rqstp: set of pages to use as Read sink buffers
+ * @head: pages under I/O collect here
+ * @p: pointer to start of Read chunk
+ *
+ * Returns:
+ *	%0 if all needed RDMA Reads were posted successfully,
+ *	%-EINVAL if client provided too many segments,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ * - All Read segments in @p have the same Position value.
+ */
+int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
+			     struct svc_rdma_op_ctxt *head, __be32 *p)
+{
+	struct svc_rdma_read_info *info;
+	struct page **page;
+	int ret;
+
+	/* The request (with page list) is constructed in
+	 * head->arg. Pages involved with RDMA Read I/O are
+	 * transferred there.
+	 */
+	head->hdr_count = head->count;
+	head->arg.head[0] = rqstp->rq_arg.head[0];
+	head->arg.tail[0] = rqstp->rq_arg.tail[0];
+	head->arg.pages = head->pages;
+	head->arg.page_base = 0;
+	head->arg.page_len = 0;
+	head->arg.len = rqstp->rq_arg.len;
+	head->arg.buflen = rqstp->rq_arg.buflen;
+
+	info = svc_rdma_read_info_alloc(rdma);
+	if (!info)
+		return -ENOMEM;
+	info->ri_readctxt = head;
+
+	info->ri_position = be32_to_cpup(p + 1);
+	if (info->ri_position)
+		ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
+	else
+		ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+
+	/* Mark the start of the pages that can be used for the reply */
+	if (info->ri_pageoff > 0)
+		info->ri_pageno++;
+	rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno];
+	rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+	if (ret < 0)
+		goto out;
+
+	ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
+
+out:
+	/* Read sink pages have been moved from rqstp->rq_pages to
+	 * head->arg.pages. Force svc_recv to refill those slots
+	 * in rq_pages.
+	 */
+	for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
+		*page = NULL;
+
+	if (ret < 0)
+		svc_rdma_read_info_free(info);
+	return ret;
+}

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux