[PATCH 12/20] svcrdma: Use parsed chunk lists to encode Reply transport headers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing the egress RPC Reply transport header, use
the new parsed Write list and Reply chunk, which are version-
agnostic and already XDR decoded.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/trace/events/rpcrdma.h        |   37 +++++++++++-
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |  105 ++++++++++++++-------------------
 2 files changed, 80 insertions(+), 62 deletions(-)

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 72b941aef43b..5218e0f9596a 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1447,9 +1447,44 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
 				TP_ARGS(handle, length, offset))
 
 DEFINE_SEGMENT_EVENT(send_rseg);
-DEFINE_SEGMENT_EVENT(encode_wseg);
 DEFINE_SEGMENT_EVENT(send_wseg);
 
+TRACE_EVENT(svcrdma_encode_wseg,
+	TP_PROTO(
+		const struct svc_rdma_send_ctxt *ctxt,
+		u32 segno,
+		u32 handle,
+		u32 length,
+		u64 offset
+	),
+
+	TP_ARGS(ctxt, segno, handle, length, offset),
+
+	TP_STRUCT__entry(
+		__field(u32, cq_id)
+		__field(int, completion_id)
+		__field(u32, segno)
+		__field(u32, handle)
+		__field(u32, length)
+		__field(u64, offset)
+	),
+
+	TP_fast_assign(
+		__entry->cq_id = ctxt->sc_cid.ci_queue_id;
+		__entry->completion_id = ctxt->sc_cid.ci_completion_id;
+		__entry->segno = segno;
+		__entry->handle = handle;
+		__entry->length = length;
+		__entry->offset = offset;
+	),
+
+	TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
+		__entry->cq_id, __entry->completion_id,
+		__entry->segno, __entry->length,
+		(unsigned long long)__entry->offset, __entry->handle
+	)
+);
+
 TRACE_EVENT(svcrdma_decode_rseg,
 	TP_PROTO(
 		const struct rpc_rdma_cid *cid,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index f697e79757a6..fd8d62b1e640 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
 
 /**
  * svc_rdma_encode_write_segment - Encode one Write segment
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
  * @remaining: remaining bytes of the payload left in the Write chunk
+ * @segno: which segment in the chunk
  *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
- *   that was consumed by the Write segment
+ *   that was consumed by the Write segment, and updates @remaining
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_segment(__be32 *src,
-					     struct svc_rdma_send_ctxt *sctxt,
-					     unsigned int *remaining)
+static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
+					     const struct svc_rdma_chunk *chunk,
+					     u32 *remaining, unsigned int segno)
 {
+	const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
+	const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
+	u32 length;
 	__be32 *p;
-	const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
-	u32 handle, length;
-	u64 offset;
 
 	p = xdr_reserve_space(&sctxt->sc_stream, len);
 	if (!p)
 		return -EMSGSIZE;
 
-	xdr_decode_rdma_segment(src, &handle, &length, &offset);
-
-	if (*remaining < length) {
-		/* segment only partly filled */
-		length = *remaining;
-		*remaining = 0;
-	} else {
-		/* entire segment was consumed */
-		*remaining -= length;
-	}
-	xdr_encode_rdma_segment(p, handle, length, offset);
-
-	trace_svcrdma_encode_wseg(handle, length, offset);
+	length = min_t(u32, *remaining, segment->rs_length);
+	*remaining -= length;
+	xdr_encode_rdma_segment(p, segment->rs_handle, length,
+				segment->rs_offset);
+	trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
+				  segment->rs_offset);
 	return len;
 }
 
 /**
  * svc_rdma_encode_write_chunk - Encode one Write chunk
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
- * @remaining: size in bytes of the payload in the Write chunk
+ * @chunk: Write chunk to push
  *
  * Copy a Write chunk from the Call transport header to the
  * Reply transport header. Update each segment's length field
@@ -411,33 +404,30 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
  *   that was consumed by the Write chunk
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
-					   struct svc_rdma_send_ctxt *sctxt,
-					   unsigned int remaining)
+static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
+					   const struct svc_rdma_chunk *chunk)
 {
-	unsigned int i, nsegs;
+	u32 remaining = chunk->ch_payload_length;
+	unsigned int segno;
 	ssize_t len, ret;
 
-	len = 0;
 	trace_svcrdma_encode_write_chunk(remaining);
 
-	src++;
+	len = 0;
 	ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
 	if (ret < 0)
-		return -EMSGSIZE;
+		return ret;
 	len += ret;
 
-	nsegs = be32_to_cpup(src++);
-	ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+	ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
 	if (ret < 0)
-		return -EMSGSIZE;
+		return ret;
 	len += ret;
 
-	for (i = nsegs; i; i--) {
-		ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+	for (segno = 0; segno < chunk->ch_segcount; segno++) {
+		ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
 		if (ret < 0)
-			return -EMSGSIZE;
-		src += rpcrdma_segment_maxsz;
+			return ret;
 		len += ret;
 	}
 
@@ -449,34 +439,23 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
  * @rctxt: Reply context with information about the RPC Call
  * @sctxt: Send context for the RPC Reply
  *
- * The client provides a Write chunk list in the Call message. Fill
- * in the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- *  - Client has provided only one Write chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Write list
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t
-svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
-			   struct svc_rdma_send_ctxt *sctxt)
+static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
+					  struct svc_rdma_send_ctxt *sctxt)
 {
 	struct svc_rdma_chunk *chunk;
 	ssize_t len, ret;
 
 	len = 0;
-	if (rctxt->rc_write_list) {
-		chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
-		ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
-						  chunk->ch_payload_length);
+	pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+		ret = svc_rdma_encode_write_chunk(sctxt, chunk);
 		if (ret < 0)
 			return ret;
-		len = ret;
+		len += ret;
 	}
 
 	/* Terminate the Write list */
@@ -493,24 +472,28 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
  * @sctxt: Send context for the RPC Reply
  * @length: size in bytes of the payload in the Reply chunk
  *
- * Assumptions:
- * - Reply can always fit in the client-provided Reply chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Reply chunk
  *   %-EMSGSIZE on XDR buffer overflow
+ *   %-E2BIG if the RPC message is larger than the Reply chunk
  */
 static ssize_t
-svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
 			    struct svc_rdma_send_ctxt *sctxt,
 			    unsigned int length)
 {
-	if (!rctxt->rc_reply_chunk)
+	struct svc_rdma_chunk *chunk;
+
+	if (pcl_is_empty(&rctxt->rc_reply_pcl))
 		return xdr_stream_encode_item_absent(&sctxt->sc_stream);
 
-	return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
-					   length);
+	chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+	if (length > chunk->ch_length)
+		return -E2BIG;
+
+	chunk->ch_payload_length = length;
+	return svc_rdma_encode_write_chunk(sctxt, chunk);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -928,7 +911,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	*p++ = *rdma_argp;
 	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
+	*p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
 
 	if (svc_rdma_encode_read_list(sctxt) < 0)
 		goto err0;





[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux