The Linux NFS/RDMA server implementation currently supports only a single Write chunk per RPC/RDMA request. Requests with more than one are so rare there has never been a strong need to support more. However we are aware of at least one existing NFS client implementation that can generate such requests, so let's dig in. Allocate a data structure at Receive time to keep track of the set of READ payloads and the Write chunks. Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- include/linux/sunrpc/svc_rdma.h | 3 ++ net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 30 +++++++++++++++++++----- net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 +- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 38 +++++++++++++++++-------------- 4 files changed, 47 insertions(+), 26 deletions(-) diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 9af9d4dff330..37e4c597dc71 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -145,9 +145,10 @@ struct svc_rdma_recv_ctxt { unsigned int rc_page_count; unsigned int rc_hdr_count; u32 rc_inv_rkey; - struct svc_rdma_payload rc_read_payload; + struct svc_rdma_payload *rc_read_payloads; __be32 *rc_reply_chunk; unsigned int rc_num_write_chunks; + unsigned int rc_cur_payload; struct page *rc_pages[RPCSVC_MAXPAGES]; }; diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 95b88f68f8ca..2c3ab554c6ec 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -193,8 +193,9 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) out: ctxt->rc_page_count = 0; - ctxt->rc_read_payload.rp_length = 0; ctxt->rc_num_write_chunks = 0; + ctxt->rc_cur_payload = 0; + ctxt->rc_read_payloads = NULL; return ctxt; out_empty: @@ -217,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, for (i = 0; i < ctxt->rc_page_count; i++) put_page(ctxt->rc_pages[i]); - + kfree(ctxt->rc_read_payloads); + ctxt->rc_read_payloads = NULL; if (!ctxt->rc_temp) llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); else @@ -474,13 +476,13 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen) */ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) { - u32 chcount = 0; - __be32 *p; + u32 i, segcount, chcount = 0; + __be32 *p, *saved; p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) return false; - rctxt->rc_read_payload.rp_chunk = p; + saved = p; while (*p != xdr_zero) { if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK)) return false; @@ -491,8 +493,22 @@ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) } rctxt->rc_num_write_chunks = chcount; if (!chcount) - rctxt->rc_read_payload.rp_chunk = NULL; - return chcount < 2; + return true; + + rctxt->rc_read_payloads = kcalloc(chcount, + sizeof(struct svc_rdma_payload), + GFP_KERNEL); + if (!rctxt->rc_read_payloads) + return false; + + i = 0; + p = saved; + while (*p != xdr_zero) { + rctxt->rc_read_payloads[i++].rp_chunk = p++; + segcount = be32_to_cpup(p++); + p += segcount * rpcrdma_segment_maxsz; + } + return true; } /* Sanity check the Reply chunk. diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 8ad137c7e6a0..5f326c18b47c 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -625,7 +625,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, /* Send the page list in the Reply chunk only if the * client did not provide Write chunks. */ - if (!rctxt->rc_num_write_chunks && xdr->page_len) { + if (!rctxt->rc_cur_payload && xdr->page_len) { ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, xdr->page_len); if (ret < 0) diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index b6dd5ae2ad76..9fe7b0d1e335 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -447,10 +447,11 @@ static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt, * @rctxt: Reply context with information about the RPC Call * @sctxt: Send context for the RPC Reply * - * The client provides a Write chunk list in the Call message. Fill - * in the segments in the first Write chunk in the Reply's transport - * header with the number of bytes consumed in each segment. - * Remaining chunks are returned unused. + * The client provided a Write list in the Call message. For each + * READ payload, fill in the segments in the Write chunks in the + * Reply's transport header with the number of bytes consumed + * in each segment. Any remaining Write chunks are returned to + * the client unused. * * Assumptions: * - Client has provided only one Write chunk @@ -465,11 +466,12 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, struct svc_rdma_send_ctxt *sctxt) { ssize_t len, ret; + unsigned int i; len = 0; - if (rctxt->rc_num_write_chunks) { + for (i = 0; i < rctxt->rc_num_write_chunks; i++) { ret = svc_rdma_encode_write_chunk(sctxt, - &rctxt->rc_read_payload); + &rctxt->rc_read_payloads[i]); if (ret < 0) return ret; len += ret; @@ -564,7 +566,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, const struct svc_rdma_recv_ctxt *rctxt, struct xdr_buf *xdr) { - bool read_payload_present = rctxt && rctxt->rc_num_write_chunks; + bool read_payload_present = rctxt && rctxt->rc_cur_payload; int elements; /* For small messages, copying bytes is cheaper than DMA mapping. @@ -628,7 +630,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, tailbase = xdr->tail[0].iov_base; taillen = xdr->tail[0].iov_len; - if (rctxt && rctxt->rc_num_write_chunks) { + if (rctxt && rctxt->rc_cur_payload) { u32 xdrpad; xdrpad = xdr_pad_size(xdr->page_len); @@ -708,12 +710,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, if (ret < 0) return ret; - /* If a Write chunk is present, the xdr_buf's page list + /* If Write chunks are present, the xdr_buf's page list * is not included inline. However the Upper Layer may * have added XDR padding in the tail buffer, and that * should not be included inline. */ - if (rctxt && rctxt->rc_num_write_chunks) { + if (rctxt && rctxt->rc_cur_payload) { base = xdr->tail[0].iov_base; len = xdr->tail[0].iov_len; xdr_pad = xdr_pad_size(xdr->page_len); @@ -951,21 +953,23 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; struct xdr_buf uninitialized_var(subbuf); struct svcxprt_rdma *rdma; + unsigned int i; if (!rctxt->rc_num_write_chunks || !length) return 0; - /* XXX: Just one READ payload slot for now, since our - * transport implementation currently supports only one - * Write chunk. - */ - rctxt->rc_read_payload.rp_offset = offset; - rctxt->rc_read_payload.rp_length = length; + if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks) + return -ENOENT; + i = rctxt->rc_cur_payload++; + + rctxt->rc_read_payloads[i].rp_offset = offset; + rctxt->rc_read_payloads[i].rp_length = length; if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length)) return -EMSGSIZE; rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); - return svc_rdma_send_write_chunk(rdma, rctxt->rc_read_payload.rp_chunk, + return svc_rdma_send_write_chunk(rdma, + rctxt->rc_read_payloads[i].rp_chunk, &subbuf); }