The Linux NFS/RDMA server implementation currently supports only a single Write chunk per RPC/RDMA request. Requests with more than one are so rare there has never been a strong need to support more. However we are aware of at least one existing NFS client implementation that can generate such requests, so let's dig in. Allocate a data structure at Receive time to keep track of the set of READ payloads and the Write chunks. Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- include/linux/sunrpc/svc_rdma.h | 15 +++- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 - net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 31 +++++++-- net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 - net/sunrpc/xprtrdma/svc_rdma_sendto.c | 94 +++++++++++++--------------- 5 files changed, 80 insertions(+), 64 deletions(-) diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index c1c4563066d9..85e6b281a39b 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -124,6 +124,12 @@ enum { #define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD +struct svc_rdma_payload { + __be32 *ra_chunk; + unsigned int ra_offset; + unsigned int ra_length; +}; + struct svc_rdma_recv_ctxt { struct llist_node rc_node; struct list_head rc_list; @@ -137,10 +143,10 @@ struct svc_rdma_recv_ctxt { unsigned int rc_page_count; unsigned int rc_hdr_count; u32 rc_inv_rkey; - __be32 *rc_write_list; + struct svc_rdma_payload *rc_read_payloads; __be32 *rc_reply_chunk; - unsigned int rc_read_payload_offset; - unsigned int rc_read_payload_length; + unsigned int rc_num_write_chunks; + unsigned int rc_cur_payload; struct page *rc_pages[RPCSVC_MAXPAGES]; }; @@ -193,7 +199,8 @@ extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, unsigned int len); extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst); + struct xdr_buf *xdr, + unsigned int num_read_payloads); extern int svc_rdma_sendto(struct svc_rqst *); extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, unsigned int length); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 908e78bb87c6..3b1baf15a1b7 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -117,7 +117,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, { int ret; - ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL); + ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, 0); if (ret < 0) return -EIO; diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 91abe08f7d75..85b8dd8ae772 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -193,7 +193,9 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) out: ctxt->rc_page_count = 0; - ctxt->rc_read_payload_length = 0; + ctxt->rc_num_write_chunks = 0; + ctxt->rc_cur_payload = 0; + ctxt->rc_read_payloads = NULL; return ctxt; out_empty: @@ -216,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, for (i = 0; i < ctxt->rc_page_count; i++) put_page(ctxt->rc_pages[i]); - + kfree(ctxt->rc_read_payloads); + ctxt->rc_read_payloads = NULL; if (!ctxt->rc_temp) llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); else @@ -452,9 +455,10 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end, static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end, struct svc_rdma_recv_ctxt *ctxt) { - u32 chcount; + u32 chcount, segcount; + __be32 *saved = p; + int i; - ctxt->rc_write_list = p; chcount = 0; while (*p++ != xdr_zero) { p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG); @@ -463,8 +467,22 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end, if (chcount++ > 1) return NULL; } + ctxt->rc_num_write_chunks = chcount; if (!chcount) - ctxt->rc_write_list = NULL; + return p; + + ctxt->rc_read_payloads = kcalloc(sizeof(struct svc_rdma_payload), + chcount, GFP_KERNEL); + if (!ctxt->rc_read_payloads) + return NULL; + + i = 0; + p = saved; + while (*p++ != xdr_zero) { + ctxt->rc_read_payloads[i++].ra_chunk = p - 1; + segcount = be32_to_cpup(p++); + p += segcount * rpcrdma_segment_maxsz; + } return p; } @@ -484,8 +502,9 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end, p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG); if (!p) return NULL; - } else + } else { ctxt->rc_reply_chunk = NULL; + } return p; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index ca9d414bef9d..740ea4ee251d 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -574,7 +574,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, /* Send the page list in the Reply chunk only if the * client did not provide Write chunks. */ - if (!rctxt->rc_write_list && xdr->page_len) { + if (!rctxt->rc_num_write_chunks && xdr->page_len) { ret = svc_rdma_send_xdr_pagelist(info, xdr, xdr->head[0].iov_len, xdr->page_len); diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 7349a3f9aa5d..378a24b666bb 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -366,10 +366,10 @@ static __be32 *xdr_encode_read_list(__be32 *p) * transport header. Each segment's length field is updated to * reflect number of bytes consumed in the segment. * - * Returns number of segments in this chunk. + * Returns a pointer to the position to encode the next chunk. */ -static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, - unsigned int remaining) +static __be32 *xdr_encode_write_chunk(__be32 *dst, __be32 *src, + unsigned int length) { unsigned int i, nsegs; u32 seg_len; @@ -386,15 +386,15 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, *dst++ = *src++; /* bytes returned in this segment */ - seg_len = be32_to_cpu(*src); - if (remaining >= seg_len) { + seg_len = be32_to_cpup(src); + if (length >= seg_len) { /* entire segment was consumed */ *dst = *src; - remaining -= seg_len; + length -= seg_len; } else { /* segment only partly filled */ - *dst = cpu_to_be32(remaining); - remaining = 0; + *dst = cpu_to_be32(length); + length = 0; } dst++; src++; @@ -403,38 +403,25 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, *dst++ = *src++; } - return nsegs; + return dst; } -/* The client provided a Write list in the Call message. Fill in - * the segments in the first Write chunk in the Reply's transport - * header with the number of bytes consumed in each segment. - * Remaining chunks are returned unused. - * - * Assumptions: - * - Client has provided only one Write chunk +/* The client provided a Write list in the Call message. For each + * READ payload, fill in the segments in the Write chunks in the + * Reply's transport header with the number of bytes consumed + * in each segment. Any remaining Write chunks are returned to + * the client unused. */ static __be32 *xdr_encode_write_list(__be32 *p, const struct svc_rdma_recv_ctxt *rctxt) { - unsigned int consumed, nsegs; - __be32 *q; - - q = rctxt->rc_write_list; - if (!q) - goto out; - - consumed = rctxt->rc_read_payload_length; - while (*q != xdr_zero) { - nsegs = xdr_encode_write_chunk(p, q, consumed); - q += 2 + nsegs * rpcrdma_segment_maxsz; - p += 2 + nsegs * rpcrdma_segment_maxsz; - consumed = 0; - } + unsigned int i; - /* Terminate Write list */ -out: - *p++ = xdr_zero; + for (i = 0; i < rctxt->rc_num_write_chunks; i++) + p = xdr_encode_write_chunk(p, + rctxt->rc_read_payloads[i].ra_chunk, + rctxt->rc_read_payloads[i].ra_length); + *p++ = xdr_zero; /* Terminate Write list */ return p; } @@ -519,7 +506,7 @@ void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt, struct xdr_buf *xdr, - __be32 *wr_lst) + unsigned int num_write_chunks) { int elements; @@ -535,7 +522,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, elements = 1; /* xdr->pages */ - if (!wr_lst) { + if (!num_write_chunks) { unsigned int remaining; unsigned long pageoff; @@ -563,7 +550,8 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, */ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst) + struct xdr_buf *xdr, + unsigned int num_write_chunks) { unsigned char *dst, *tailbase; unsigned int taillen; @@ -576,7 +564,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, tailbase = xdr->tail[0].iov_base; taillen = xdr->tail[0].iov_len; - if (wr_lst) { + if (num_write_chunks) { u32 xdrpad; xdrpad = xdr_padsize(xdr->page_len); @@ -619,7 +607,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, * @rdma: controlling transport * @ctxt: send_ctxt for the Send WR * @xdr: prepared xdr_buf containing RPC message - * @wr_lst: pointer to Call header's Write list, or NULL + * @num_read_payloads: count of separate READ payloads to send * * Load the xdr_buf into the ctxt's sge array, and DMA map each * element as it is added. @@ -628,7 +616,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, */ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, struct svc_rdma_send_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst) + struct xdr_buf *xdr, unsigned int num_read_payloads) { unsigned int len, remaining; unsigned long page_off; @@ -637,8 +625,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, u32 xdr_pad; int ret; - if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst)) - return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst); + if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, num_read_payloads)) + return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, num_read_payloads); ++ctxt->sc_cur_sge_no; ret = svc_rdma_dma_map_buf(rdma, ctxt, @@ -647,12 +635,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, if (ret < 0) return ret; - /* If a Write chunk is present, the xdr_buf's page list + /* If Write chunks are present, the xdr_buf's page list * is not included inline. However the Upper Layer may * have added XDR padding in the tail buffer, and that * should not be included inline. */ - if (wr_lst) { + if (num_read_payloads) { base = xdr->tail[0].iov_base; len = xdr->tail[0].iov_len; xdr_pad = xdr_padsize(xdr->page_len); @@ -741,7 +729,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, if (!rctxt->rc_reply_chunk) { ret = svc_rdma_map_reply_msg(rdma, sctxt, &rqstp->rq_res, - rctxt->rc_write_list); + rctxt->rc_cur_payload); if (ret < 0) return ret; } @@ -885,18 +873,20 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, { struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; struct svcxprt_rdma *rdma; + unsigned int i; - if (!rctxt->rc_write_list) + if (!rctxt->rc_num_write_chunks) return 0; - /* XXX: Just one READ payload slot for now, since our - * transport implementation currently supports only one - * Write chunk. - */ - rctxt->rc_read_payload_offset = offset; - rctxt->rc_read_payload_length = length; + if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks) + return -ENOENT; + i = rctxt->rc_cur_payload++; + + rctxt->rc_read_payloads[i].ra_offset = offset; + rctxt->rc_read_payloads[i].ra_length = length; rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); - return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list, + return svc_rdma_send_write_chunk(rdma, + rctxt->rc_read_payloads[i].ra_chunk, &rqstp->rq_res, offset, length); }