[PATCH v1 14/16] svcrdma: Refactor chunk list encoders

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Same idea as the receive-side changes I did a while back: use
xdr_stream helpers rather than open-coding the XDR chunk list
encoders. This builds the Reply transport header from beginning to
end without backtracking.

As additional clean-ups, fill in documenting comments for the XDR
encoders and sprinkle some trace points in the new encoding
functions.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/svc_rdma.h            |    2 
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |   15 +-
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   34 +++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |  279 +++++++++++++++++-----------
 4 files changed, 209 insertions(+), 121 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c506732886b3..d001aac13c2f 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -149,6 +149,8 @@ struct svc_rdma_send_ctxt {
 	struct list_head	sc_list;
 	struct ib_send_wr	sc_send_wr;
 	struct ib_cqe		sc_cqe;
+	struct xdr_buf		sc_hdrbuf;
+	struct xdr_stream	sc_stream;
 	void			*sc_xprt_buf;
 	int			sc_page_count;
 	int			sc_cur_sge_no;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index ce1a7a706f36..9830748c58d2 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -181,7 +181,9 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 	if (!ctxt)
 		goto drop_connection;
 
-	p = ctxt->sc_xprt_buf;
+	p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
+	if (!p)
+		goto put_ctxt;
 	*p++ = rqst->rq_xid;
 	*p++ = rpcrdma_version;
 	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
@@ -189,7 +191,7 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p   = xdr_zero;
-	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);
+	svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
 
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
 	pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
@@ -197,12 +199,13 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
 
 	rqst->rq_xtime = ktime_get();
 	rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
-	if (rc) {
-		svc_rdma_send_ctxt_put(rdma, ctxt);
-		goto drop_connection;
-	}
+	if (rc)
+		goto put_ctxt;
 	return 0;
 
+put_ctxt:
+	svc_rdma_send_ctxt_put(rdma, ctxt);
+
 drop_connection:
 	dprintk("svcrdma: failed to send bc call\n");
 	return -ENOTCONN;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f9a3c413491c..447060dbd6fd 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -698,7 +698,6 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 				__be32 *rdma_argp, int status)
 {
 	struct svc_rdma_send_ctxt *ctxt;
-	unsigned int length;
 	__be32 *p;
 	int ret;
 
@@ -706,29 +705,48 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
 	if (!ctxt)
 		return;
 
-	p = ctxt->sc_xprt_buf;
+	p = xdr_reserve_space(&ctxt->sc_stream,
+			      rpcrdma_fixed_maxsz * sizeof(*p));
+	if (!p)
+		goto put_ctxt;
+
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = xprt->sc_fc_credits;
-	*p++ = rdma_error;
+	*p = rdma_error;
+
 	switch (status) {
 	case -EPROTONOSUPPORT:
+		p = xdr_reserve_space(&ctxt->sc_stream,
+				      3 * sizeof(*p));
+		if (!p)
+			goto put_ctxt;
+
 		*p++ = err_vers;
 		*p++ = rpcrdma_version;
-		*p++ = rpcrdma_version;
+		*p = rpcrdma_version;
 		trace_svcrdma_err_vers(*rdma_argp);
 		break;
 	default:
-		*p++ = err_chunk;
+		p = xdr_reserve_space(&ctxt->sc_stream,
+				      sizeof(*p));
+		if (!p)
+			goto put_ctxt;
+
+		*p = err_chunk;
 		trace_svcrdma_err_chunk(*rdma_argp);
 	}
-	length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
-	svc_rdma_sync_reply_hdr(xprt, ctxt, length);
+
+	svc_rdma_sync_reply_hdr(xprt, ctxt, ctxt->sc_hdrbuf.len);
 
 	ctxt->sc_send_wr.opcode = IB_WR_SEND;
 	ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
 	if (ret)
-		svc_rdma_send_ctxt_put(xprt, ctxt);
+		goto put_ctxt;
+	return;
+
+put_ctxt:
+	svc_rdma_send_ctxt_put(xprt, ctxt);
 }
 
 /* By convention, backchannel calls arrive via rdma_msg type
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1035df3a3b5c..f557e1cc5694 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -151,6 +151,8 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
 	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
 	ctxt->sc_cqe.done = svc_rdma_wc_send;
 	ctxt->sc_xprt_buf = buffer;
+	xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
+		     rdma->sc_max_req_size);
 	ctxt->sc_sges[0].addr = addr;
 
 	for (i = 0; i < rdma->sc_max_send_sges; i++)
@@ -204,6 +206,10 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
 	spin_unlock(&rdma->sc_send_lock);
 
 out:
+	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+			ctxt->sc_xprt_buf, NULL);
+
 	ctxt->sc_send_wr.num_sge = 0;
 	ctxt->sc_cur_sge_no = 0;
 	ctxt->sc_page_count = 0;
@@ -322,131 +328,171 @@ int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr)
 	return ret;
 }
 
-/* Returns length of transport header, in bytes.
+/**
+ * svc_rdma_encode_read_list - Encode RPC Reply's Read chunk list
+ * @sctxt: Send context for the RPC Reply
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply Read list
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
+static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
 {
-	unsigned int nsegs;
-	__be32 *p;
-
-	p = rdma_resp;
-
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p += rpcrdma_fixed_maxsz + 1;
-
-	/* Skip Write list. */
-	while (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
+	/* RPC-over-RDMA version 1 replies never have a Read list. */
+	return xdr_stream_encode_item_absent(&sctxt->sc_stream);
+}
 
-	/* Skip Reply chunk. */
-	if (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
+/**
+ * svc_rdma_encode_write_segment - Encode one Write segment
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: remaining bytes of the payload left in the Write chunk
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Write segment
+ *   %-EMSGSIZE on XDR buffer overflow
+ */
+static ssize_t svc_rdma_encode_write_segment(__be32 *src,
+					     struct svc_rdma_send_ctxt *sctxt,
+					     unsigned int *remaining)
+{
+	__be32 *p;
+	const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
+	u32 handle, length;
+	u64 offset;
+
+	p = xdr_reserve_space(&sctxt->sc_stream, len);
+	if (!p)
+		return -EMSGSIZE;
+
+	handle = be32_to_cpup(src++);
+	length = be32_to_cpup(src++);
+	xdr_decode_hyper(src, &offset);
+
+	*p++ = cpu_to_be32(handle);
+	if (*remaining < length) {
+		/* segment only partly filled */
+		length = *remaining;
+		*remaining = 0;
+	} else {
+		/* entire segment was consumed */
+		*remaining -= length;
 	}
+	*p++ = cpu_to_be32(length);
+	xdr_encode_hyper(p, offset);
 
-	return (unsigned long)p - (unsigned long)rdma_resp;
+	trace_svcrdma_encode_wseg(handle, length, offset);
+	return len;
 }
 
-/* One Write chunk is copied from Call transport header to Reply
- * transport header. Each segment's length field is updated to
- * reflect number of bytes consumed in the segment.
- *
- * Returns number of segments in this chunk.
+/**
+ * svc_rdma_encode_write_chunk - Encode one Write chunk
+ * @src: matching Write chunk in the RPC Call header
+ * @sctxt: Send context for the RPC Reply
+ * @remaining: size in bytes of the payload in the Write chunk
+ *
+ * Copy a Write chunk from the Call transport header to the
+ * Reply transport header. Update each segment's length field
+ * to reflect the number of bytes written in that segment.
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Write chunk
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
+					   struct svc_rdma_send_ctxt *sctxt,
 					   unsigned int remaining)
 {
 	unsigned int i, nsegs;
-	u32 seg_len;
+	ssize_t len, ret;
 
-	/* Write list discriminator */
-	*dst++ = *src++;
+	len = 0;
+	trace_svcrdma_encode_write_chunk(remaining);
 
-	/* number of segments in this chunk */
-	nsegs = be32_to_cpup(src);
-	*dst++ = *src++;
+	src++;
+	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
+	if (ret < 0)
+		return -EMSGSIZE;
+	len += ret;
 
-	for (i = nsegs; i; i--) {
-		/* segment's RDMA handle */
-		*dst++ = *src++;
-
-		/* bytes returned in this segment */
-		seg_len = be32_to_cpu(*src);
-		if (remaining >= seg_len) {
-			/* entire segment was consumed */
-			*dst = *src;
-			remaining -= seg_len;
-		} else {
-			/* segment only partly filled */
-			*dst = cpu_to_be32(remaining);
-			remaining = 0;
-		}
-		dst++; src++;
+	nsegs = be32_to_cpup(src++);
+	ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+	if (ret < 0)
+		return -EMSGSIZE;
+	len += ret;
 
-		/* segment's RDMA offset */
-		*dst++ = *src++;
-		*dst++ = *src++;
+	for (i = nsegs; i; i--) {
+		ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+		if (ret < 0)
+			return -EMSGSIZE;
+		src += rpcrdma_segment_maxsz;
+		len += ret;
 	}
 
-	return nsegs;
+	return len;
 }
 
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
+/**
+ * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the first Write chunk
+ *
+ * The client provides a Write chunk list in the Call message. Fill
+ * in the segments in the first Write chunk in the Reply's transport
  * header with the number of bytes consumed in each segment.
  * Remaining chunks are returned unused.
  *
  * Assumptions:
  *  - Client has provided only one Write chunk
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply's Write list
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
-					   unsigned int consumed)
+static ssize_t svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
+					  struct svc_rdma_send_ctxt *sctxt,
+					  unsigned int length)
 {
-	unsigned int nsegs;
-	__be32 *p, *q;
-
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
-	q = wr_ch;
-	while (*q != xdr_zero) {
-		nsegs = xdr_encode_write_chunk(p, q, consumed);
-		q += 2 + nsegs * rpcrdma_segment_maxsz;
-		p += 2 + nsegs * rpcrdma_segment_maxsz;
-		consumed = 0;
-	}
+	ssize_t len, ret;
 
-	/* Terminate Write list */
-	*p++ = xdr_zero;
+	ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length);
+	if (ret < 0)
+		return ret;
+	len = ret;
 
-	/* Reply chunk discriminator; may be replaced later */
-	*p = xdr_zero;
+	/* Terminate the Write list */
+	ret = xdr_stream_encode_item_absent(&sctxt->sc_stream);
+	if (ret < 0)
+		return ret;
+
+	return len + ret;
 }
 
-/* The client provided a Reply chunk in the Call message. Fill in
- * the segments in the Reply chunk in the Reply message with the
- * number of bytes consumed in each segment.
+/**
+ * svc_rdma_encode_reply_chunk - Encode RPC Reply's Reply chunk
+ * @rctxt: Reply context with information about the RPC Call
+ * @sctxt: Send context for the RPC Reply
+ * @length: size in bytes of the payload in the Reply chunk
  *
  * Assumptions:
- * - Reply can always fit in the provided Reply chunk
+ * - Reply can always fit in the client-provided Reply chunk
+ *
+ * Return values:
+ *   On success, returns length in bytes of the Reply XDR buffer
+ *   that was consumed by the Reply's Reply chunk
+ *   %-EMSGSIZE on XDR buffer overflow
  */
-static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
-					    unsigned int consumed)
+static ssize_t svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+					   struct svc_rdma_send_ctxt *sctxt,
+					   unsigned int length)
 {
-	__be32 *p;
-
-	/* Find the Reply chunk in the Reply's xprt header.
-	 * RPC-over-RDMA V1 replies never have a Read list.
-	 */
-	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
-
-	/* Skip past Write list */
-	while (*p++ != xdr_zero)
-		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
-
-	xdr_encode_write_chunk(p, rp_ch, consumed);
+	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
+					   length);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -765,14 +811,26 @@ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
 				   struct svc_rdma_send_ctxt *ctxt,
 				   struct svc_rqst *rqstp)
 {
+	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
+	__be32 *rdma_argp = rctxt->rc_recv_buf;
 	__be32 *p;
 
-	p = ctxt->sc_xprt_buf;
-	trace_svcrdma_err_chunk(*p);
-	p += 3;
+	rpcrdma_set_xdrlen(&ctxt->sc_hdrbuf, 0);
+	xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
+			ctxt->sc_xprt_buf, NULL);
+
+	p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_ERR);
+	if (!p)
+		return -ENOMSG;
+
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
+	*p++ = rdma->sc_fc_credits;
 	*p++ = rdma_error;
 	*p   = err_chunk;
-	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);
+	trace_svcrdma_err_chunk(*rdma_argp);
+
+	svc_rdma_sync_reply_hdr(rdma, ctxt, ctxt->sc_hdrbuf.len);
 
 	svc_rdma_save_io_pages(rqstp, ctxt);
 
@@ -803,7 +861,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	__be32 *rp_ch = rctxt->rc_reply_chunk;
 	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct svc_rdma_send_ctxt *sctxt;
-	__be32 *p, *rdma_resp;
+	__be32 *p;
 	int ret;
 
 	/* Create the RDMA response header. xprt->xpt_mutex,
@@ -816,19 +874,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	sctxt = svc_rdma_send_ctxt_get(rdma);
 	if (!sctxt)
 		goto err0;
-	rdma_resp = sctxt->sc_xprt_buf;
 
-	p = rdma_resp;
+	p = xdr_reserve_space(&sctxt->sc_stream,
+			      rpcrdma_fixed_maxsz * sizeof(*p));
+	if (!p)
+		goto err0;
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p++ = rp_ch ? rdma_nomsg : rdma_msg;
-
-	/* Start with empty chunks */
-	*p++ = xdr_zero;
-	*p++ = xdr_zero;
-	*p   = xdr_zero;
+	*p   = rp_ch ? rdma_nomsg : rdma_msg;
 
+	if (svc_rdma_encode_read_list(sctxt) < 0)
+		goto err0;
 	if (wr_lst) {
 		/* XXX: Presume the client sent only one Write chunk */
 		unsigned long offset;
@@ -845,16 +902,24 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 						length);
 		if (ret < 0)
 			goto err2;
-		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
+		if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0)
+			goto err0;
+	} else {
+		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+			goto err0;
 	}
 	if (rp_ch) {
 		ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res);
 		if (ret < 0)
 			goto err2;
-		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
+		if (svc_rdma_encode_reply_chunk(rctxt, sctxt, ret) < 0)
+			goto err0;
+	} else {
+		if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
+			goto err0;
 	}
 
-	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
+	svc_rdma_sync_reply_hdr(rdma, sctxt, sctxt->sc_hdrbuf.len);
 	ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp);
 	if (ret < 0)
 		goto err1;




[Index of Archives]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Photo]     [Yosemite News]     [Yosemite Photos]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux