[PATCH v1 18/19] svcrdma: Persistently allocate and DMA-map Send buffers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



While sending each RPC Reply, svc_rdma_sendto allocates and DMA-
maps a separate buffer where the RPC/RDMA transport header is
constructed. The buffer is unmapped and released in the Send
completion handler. This is significant per-RPC overhead,
especially for small RPCs.

Instead, allocate and DMA-map a buffer, and cache it in each
svc_rdma_send_ctxt. This buffer and its mapping can be re-used
for each RPC, saving the cost of memory allocation and DMA
mapping.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/svc_rdma.h            |    8 +-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   51 +++-------
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   25 +----
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |  149 ++++++++++++++--------------
 4 files changed, 105 insertions(+), 128 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index a8bfc21..96b14a7 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -162,6 +162,7 @@ struct svc_rdma_send_ctxt {
 	struct list_head	sc_list;
 	struct ib_send_wr	sc_send_wr;
 	struct ib_cqe		sc_cqe;
+	void			*sc_xprt_buf;
 	int			sc_page_count;
 	int			sc_cur_sge_no;
 	struct page		*sc_pages[RPCSVC_MAXPAGES];
@@ -199,9 +200,12 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
 				   struct svc_rdma_send_ctxt *ctxt);
 extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
-extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
+				    struct svc_rdma_send_ctxt *ctxt,
+				    unsigned int len);
+extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_send_ctxt *ctxt,
-				  __be32 *rdma_resp, unsigned int len);
+				  struct xdr_buf *xdr, __be32 *wr_lst);
 extern int svc_rdma_sendto(struct svc_rqst *);
 
 /* svc_rdma_transport.c */
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 40f5e4a..343e7ad 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -115,43 +115,21 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
  * the adapter has a small maximum SQ depth.
  */
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
-			      struct rpc_rqst *rqst)
+			      struct rpc_rqst *rqst,
+			      struct svc_rdma_send_ctxt *ctxt)
 {
-	struct svc_rdma_send_ctxt *ctxt;
 	int ret;
 
-	ctxt = svc_rdma_send_ctxt_get(rdma);
-	if (!ctxt) {
-		ret = -ENOMEM;
-		goto out_err;
-	}
-
-	/* rpcrdma_bc_send_request builds the transport header and
-	 * the backchannel RPC message in the same buffer. Thus only
-	 * one SGE is needed to send both.
-	 */
-	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
-				     rqst->rq_snd_buf.len);
+	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
 	if (ret < 0)
-		goto out_err;
+		return -EIO;
 
 	/* Bump page refcnt so Send completion doesn't release
 	 * the rq_buffer before all retransmits are complete.
 	 */
 	get_page(virt_to_page(rqst->rq_buffer));
 	ctxt->sc_send_wr.opcode = IB_WR_SEND;
-	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
-	if (ret)
-		goto out_unmap;
-
-out_err:
-	dprintk("svcrdma: %s returns %d\n", __func__, ret);
-	return ret;
-
-out_unmap:
-	svc_rdma_send_ctxt_put(rdma, ctxt);
-	ret = -EIO;
-	goto out_err;
+	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
 }
 
 /* Server-side transport endpoint wants a whole page for its send
@@ -198,13 +176,15 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 {
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct svc_rdma_send_ctxt *ctxt;
 	__be32 *p;
 	int rc;
 
-	/* Space in the send buffer for an RPC/RDMA header is reserved
-	 * via xprt->tsh_size.
-	 */
-	p = rqst->rq_buffer;
+	ctxt = svc_rdma_send_ctxt_get(rdma);
+	if (!ctxt)
+		goto drop_connection;
+
+	p = ctxt->sc_xprt_buf;
 	*p++ = rqst->rq_xid;
 	*p++ = rpcrdma_version;
 	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
@@ -212,14 +192,17 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p   = xdr_zero;
+	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);
 
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
 	pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
 #endif
 
-	rc = svc_rdma_bc_sendto(rdma, rqst);
-	if (rc)
+	rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
+	if (rc) {
+		svc_rdma_send_ctxt_put(rdma, ctxt);
 		goto drop_connection;
+	}
 	return rc;
 
 drop_connection:
@@ -327,7 +310,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
-	xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
+	xprt->tsh_size = 0;
 	xprt->ops = &xprt_rdma_bc_procs;
 
 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 68648e6..09ce09b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -602,17 +602,15 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 				__be32 *rdma_argp, int status)
 {
 	struct svc_rdma_send_ctxt *ctxt;
-	__be32 *p, *err_msgp;
 	unsigned int length;
-	struct page *page;
+	__be32 *p;
 	int ret;
 
-	page = alloc_page(GFP_KERNEL);
-	if (!page)
+	ctxt = svc_rdma_send_ctxt_get(xprt);
+	if (!ctxt)
 		return;
-	err_msgp = page_address(page);
 
-	p = err_msgp;
+	p = ctxt->sc_xprt_buf;
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = xprt->sc_fc_credits;
@@ -628,19 +626,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 		*p++ = err_chunk;
 		trace_svcrdma_err_chunk(*rdma_argp);
 	}
-	length = (unsigned long)p - (unsigned long)err_msgp;
-
-	/* Map transport header; no RPC message payload */
-	ctxt = svc_rdma_send_ctxt_get(xprt);
-	if (!ctxt)
-		return;
-
-	ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
-	if (ret) {
-		dprintk("svcrdma: Error %d mapping send for protocol error\n",
-			ret);
-		return;
-	}
+	length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
+	svc_rdma_sync_reply_hdr(xprt, ctxt, length);
 
 	ctxt->sc_send_wr.opcode = IB_WR_SEND;
 	ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index edfeca4..4a3efae 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -127,6 +127,8 @@
 svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
 {
 	struct svc_rdma_send_ctxt *ctxt;
+	dma_addr_t addr;
+	void *buffer;
 	size_t size;
 	int i;
 
@@ -134,16 +136,33 @@
 	size += rdma->sc_max_send_sges * sizeof(struct ib_sge);
 	ctxt = kmalloc(size, GFP_KERNEL);
 	if (!ctxt)
-		return NULL;
+		goto fail0;
+	buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
+	if (!buffer)
+		goto fail1;
+	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
+				 rdma->sc_max_req_size, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
+		goto fail2;
 
-	ctxt->sc_cqe.done = svc_rdma_wc_send;
 	ctxt->sc_send_wr.next = NULL;
 	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
 	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
+	ctxt->sc_cqe.done = svc_rdma_wc_send;
+	ctxt->sc_xprt_buf = buffer;
+	ctxt->sc_sges[0].addr = addr;
+
 	for (i = 0; i < rdma->sc_max_send_sges; i++)
 		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
 	return ctxt;
+
+fail2:
+	kfree(buffer);
+fail1:
+	kfree(ctxt);
+fail0:
+	return NULL;
 }
 
 /**
@@ -157,6 +176,11 @@ void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)
 
 	while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
 		list_del(&ctxt->sc_list);
+		ib_dma_unmap_single(rdma->sc_pd->device,
+				    ctxt->sc_sges[0].addr,
+				    rdma->sc_max_req_size,
+				    DMA_TO_DEVICE);
+		kfree(ctxt->sc_xprt_buf);
 		kfree(ctxt);
 	}
 }
@@ -181,6 +205,7 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
 
 out:
 	ctxt->sc_send_wr.num_sge = 0;
+	ctxt->sc_cur_sge_no = 0;
 	ctxt->sc_page_count = 0;
 	return ctxt;
 
@@ -205,7 +230,10 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
 	struct ib_device *device = rdma->sc_cm_id->device;
 	unsigned int i;
 
-	for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
+	/* The first SGE contains the transport header, which
+	 * remains mapped until @ctxt is destroyed.
+	 */
+	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
 		ib_dma_unmap_page(device,
 				  ctxt->sc_sges[i].addr,
 				  ctxt->sc_sges[i].length,
@@ -519,35 +547,37 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
 }
 
 /**
- * svc_rdma_map_reply_hdr - DMA map the transport header buffer
+ * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
  * @rdma: controlling transport
- * @ctxt: op_ctxt for the Send WR
- * @rdma_resp: buffer containing transport header
+ * @ctxt: send_ctxt for the Send WR
  * @len: length of transport header
  *
- * Returns:
- *	%0 if the header is DMA mapped,
- *	%-EIO if DMA mapping failed.
  */
-int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
-			   struct svc_rdma_send_ctxt *ctxt,
-			   __be32 *rdma_resp,
-			   unsigned int len)
+void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
+			     struct svc_rdma_send_ctxt *ctxt,
+			     unsigned int len)
 {
-	ctxt->sc_pages[0] = virt_to_page(rdma_resp);
-	ctxt->sc_page_count++;
-	ctxt->sc_cur_sge_no = 0;
-	return svc_rdma_dma_map_page(rdma, ctxt, ctxt->sc_pages[0], 0, len);
+	ctxt->sc_sges[0].length = len;
+	ctxt->sc_send_wr.num_sge++;
+	ib_dma_sync_single_for_device(rdma->sc_pd->device,
+				      ctxt->sc_sges[0].addr, len,
+				      DMA_TO_DEVICE);
 }
 
-/* Load the xdr_buf into the ctxt's sge array, and DMA map each
+/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
+ * @rdma: controlling transport
+ * @ctxt: send_ctxt for the Send WR
+ * @xdr: prepared xdr_buf containing RPC message
+ * @wr_lst: pointer to Call header's Write list, or NULL
+ *
+ * Load the xdr_buf into the ctxt's sge array, and DMA map each
  * element as it is added.
  *
  * Returns zero on success, or a negative errno on failure.
  */
-static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
-				  struct svc_rdma_send_ctxt *ctxt,
-				  struct xdr_buf *xdr, __be32 *wr_lst)
+int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
+			   struct svc_rdma_send_ctxt *ctxt,
+			   struct xdr_buf *xdr, __be32 *wr_lst)
 {
 	unsigned int len, remaining;
 	unsigned long page_off;
@@ -624,7 +654,7 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
 
 	ctxt->sc_page_count += pages;
 	for (i = 0; i < pages; i++) {
-		ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
+		ctxt->sc_pages[i] = rqstp->rq_respages[i];
 		rqstp->rq_respages[i] = NULL;
 	}
 	rqstp->rq_next_page = rqstp->rq_respages + 1;
@@ -649,27 +679,18 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
  * - The Reply's transport header will never be larger than a page.
  */
 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
-				   __be32 *rdma_argp, __be32 *rdma_resp,
+				   struct svc_rdma_send_ctxt *ctxt,
+				   __be32 *rdma_argp,
 				   struct svc_rqst *rqstp,
 				   __be32 *wr_lst, __be32 *rp_ch)
 {
-	struct svc_rdma_send_ctxt *ctxt;
 	int ret;
 
-	ctxt = svc_rdma_send_ctxt_get(rdma);
-	if (!ctxt)
-		return -ENOMEM;
-
-	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
-				     svc_rdma_reply_hdr_len(rdma_resp));
-	if (ret < 0)
-		goto err;
-
 	if (!rp_ch) {
 		ret = svc_rdma_map_reply_msg(rdma, ctxt,
 					     &rqstp->rq_res, wr_lst);
 		if (ret < 0)
-			goto err;
+			return ret;
 	}
 
 	svc_rdma_save_io_pages(rqstp, ctxt);
@@ -683,15 +704,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
 	}
 	dprintk("svcrdma: posting Send WR with %u sge(s)\n",
 		ctxt->sc_send_wr.num_sge);
-	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
-	if (ret)
-		goto err;
-
-	return 0;
-
-err:
-	svc_rdma_send_ctxt_put(rdma, ctxt);
-	return ret;
+	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
 }
 
 /* Given the client-provided Write and Reply chunks, the server was not
@@ -702,40 +715,29 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
  * Remote Invalidation is skipped for simplicity.
  */
 static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
-				   __be32 *rdma_resp, struct svc_rqst *rqstp)
+				   struct svc_rdma_send_ctxt *ctxt,
+				   struct svc_rqst *rqstp)
 {
-	struct svc_rdma_send_ctxt *ctxt;
 	__be32 *p;
 	int ret;
 
-	ctxt = svc_rdma_send_ctxt_get(rdma);
-	if (!ctxt)
-		return -ENOMEM;
-
-	/* Replace the original transport header with an
-	 * RDMA_ERROR response. XID etc are preserved.
-	 */
-	trace_svcrdma_err_chunk(*rdma_resp);
-	p = rdma_resp + 3;
+	p = ctxt->sc_xprt_buf;
+	trace_svcrdma_err_chunk(*p);
+	p += 3;
 	*p++ = rdma_error;
 	*p   = err_chunk;
-
-	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
-	if (ret < 0)
-		goto err;
+	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
 
 	svc_rdma_save_io_pages(rqstp, ctxt);
 
 	ctxt->sc_send_wr.opcode = IB_WR_SEND;
 	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
-	if (ret)
-		goto err;
+	if (ret) {
+		svc_rdma_send_ctxt_put(rdma, ctxt);
+		return ret;
+	}
 
 	return 0;
-
-err:
-	svc_rdma_send_ctxt_put(rdma, ctxt);
-	return ret;
 }
 
 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
@@ -762,7 +764,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
 	struct xdr_buf *xdr = &rqstp->rq_res;
-	struct page *res_page;
+	struct svc_rdma_send_ctxt *sctxt;
 	int ret;
 
 	rdma_argp = rctxt->rc_recv_buf;
@@ -775,10 +777,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	 * critical section.
 	 */
 	ret = -ENOMEM;
-	res_page = alloc_page(GFP_KERNEL);
-	if (!res_page)
+	sctxt = svc_rdma_send_ctxt_get(rdma);
+	if (!sctxt)
 		goto err0;
-	rdma_resp = page_address(res_page);
+	rdma_resp = sctxt->sc_xprt_buf;
 
 	p = rdma_resp;
 	*p++ = *rdma_argp;
@@ -805,10 +807,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
 	}
 
-	ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
+	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
+	ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
 				      wr_lst, rp_ch);
 	if (ret < 0)
-		goto err0;
+		goto err1;
 	ret = 0;
 
 out:
@@ -820,14 +823,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	if (ret != -E2BIG && ret != -EINVAL)
 		goto err1;
 
-	ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
+	ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
 	if (ret < 0)
-		goto err0;
+		goto err1;
 	ret = 0;
 	goto out;
 
  err1:
-	put_page(res_page);
+	svc_rdma_send_ctxt_put(rdma, sctxt);
  err0:
 	trace_svcrdma_send_failed(rqstp, ret);
 	set_bit(XPT_CLOSE, &xprt->xpt_flags);

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux