> On Aug 31, 2021, at 4:40 PM, J. Bruce Fields <bfields@xxxxxxxxxxxx> wrote: > > How does this deal with compounds with multiple writes? Explained in the cover letter. > We don't need to handle those efficiently, but we do need to handle > them. > > --b. > > On Tue, Aug 31, 2021 at 03:05:46PM -0400, Chuck Lever wrote: >> This enables the XDR decoder to figure out how the payload sink >> buffer needs to be aligned before setting up the RDMA Reads. >> Then re-alignment of large RDMA Read payloads can be avoided. >> >> Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> >> --- >> include/linux/sunrpc/svc_rdma.h | 6 + >> include/trace/events/rpcrdma.h | 26 ++++++ >> net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 17 +++- >> net/sunrpc/xprtrdma/svc_rdma_rw.c | 139 ++++++++++++++++++++++++++++--- >> 4 files changed, 169 insertions(+), 19 deletions(-) >> >> diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h >> index f660244cc8ba..8d80a759a909 100644 >> --- a/include/linux/sunrpc/svc_rdma.h >> +++ b/include/linux/sunrpc/svc_rdma.h >> @@ -192,6 +192,12 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, >> extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, >> struct svc_rqst *rqstp, >> struct svc_rdma_recv_ctxt *head); >> +extern void svc_rdma_prepare_read_chunk(struct svc_rqst *rqstp, >> + struct svc_rdma_recv_ctxt *head); >> +extern int svc_rdma_pull_read_chunk(struct svcxprt_rdma *rdma, >> + struct svc_rqst *rqstp, >> + struct svc_rdma_recv_ctxt *ctxt, >> + unsigned int offset, unsigned int length); >> >> /* svc_rdma_sendto.c */ >> extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma); >> diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h >> index 5954ce036173..30440cca321a 100644 >> --- a/include/trace/events/rpcrdma.h >> +++ b/include/trace/events/rpcrdma.h >> @@ -2136,6 +2136,32 @@ TRACE_EVENT(svcrdma_sq_post_err, >> ) >> ); >> >> +TRACE_EVENT(svcrdma_arg_payload, >> + TP_PROTO( >> + const struct svc_rqst *rqstp, >> + unsigned int offset, >> + unsigned int length >> + ), >> + >> + TP_ARGS(rqstp, offset, length), >> + >> + TP_STRUCT__entry( >> + __field(u32, xid) >> + __field(u32, offset) >> + __field(u32, length) >> + ), >> + >> + TP_fast_assign( >> + __entry->xid = __be32_to_cpu(rqstp->rq_xid); >> + __entry->offset = offset_in_page(offset); >> + __entry->length = length; >> + ), >> + >> + TP_printk("xid=0x%08x offset=%u length=%u", >> + __entry->xid, __entry->offset, __entry->length >> + ) >> +); >> + >> #endif /* _TRACE_RPCRDMA_H */ >> >> #include <trace/define_trace.h> >> diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c >> index 08a620b370ae..cd9c0fb1a470 100644 >> --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c >> +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c >> @@ -838,12 +838,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) >> >> svc_rdma_get_inv_rkey(rdma_xprt, ctxt); >> >> - if (!pcl_is_empty(&ctxt->rc_read_pcl) || >> - !pcl_is_empty(&ctxt->rc_call_pcl)) { >> + if (!pcl_is_empty(&ctxt->rc_call_pcl) || >> + ctxt->rc_read_pcl.cl_count > 1) { >> ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); >> if (ret < 0) >> goto out_readfail; >> - } >> + } else if (ctxt->rc_read_pcl.cl_count == 1) >> + svc_rdma_prepare_read_chunk(rqstp, ctxt); >> >> rqstp->rq_xprt_ctxt = ctxt; >> rqstp->rq_prot = IPPROTO_MAX; >> @@ -887,5 +888,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) >> int svc_rdma_argument_payload(struct svc_rqst *rqstp, unsigned int offset, >> unsigned int length) >> { >> - return 0; >> + struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt; >> + struct svc_xprt *xprt = rqstp->rq_xprt; >> + struct svcxprt_rdma *rdma = >> + container_of(xprt, struct svcxprt_rdma, sc_xprt); >> + >> + if (!pcl_is_empty(&ctxt->rc_call_pcl) || >> + ctxt->rc_read_pcl.cl_count != 1) >> + return 0; >> + return svc_rdma_pull_read_chunk(rdma, rqstp, ctxt, offset, length); >> } >> diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c >> index 29b7d477891c..5f03dfd2fa03 100644 >> --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c >> +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c >> @@ -707,19 +707,24 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, >> >> len = segment->rs_length; >> sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; >> + >> + trace_printk("pageoff=%u len=%u sges=%u\n", >> + info->ri_pageoff, len, sge_no); >> + >> ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); >> if (!ctxt) >> return -ENOMEM; >> ctxt->rw_nents = sge_no; >> + head->rc_page_count += sge_no; >> >> sg = ctxt->rw_sg_table.sgl; >> for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { >> seg_len = min_t(unsigned int, len, >> PAGE_SIZE - info->ri_pageoff); >> >> - if (!info->ri_pageoff) >> - head->rc_page_count++; >> - >> + trace_printk(" page=%p seg_len=%u offset=%u\n", >> + rqstp->rq_pages[info->ri_pageno], seg_len, >> + info->ri_pageoff); >> sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], >> seg_len, info->ri_pageoff); >> sg = sg_next(sg); >> @@ -804,15 +809,14 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, >> unsigned int page_no, numpages; >> >> numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; >> + head->rc_page_count += numpages; >> + >> for (page_no = 0; page_no < numpages; page_no++) { >> unsigned int page_len; >> >> page_len = min_t(unsigned int, remaining, >> PAGE_SIZE - info->ri_pageoff); >> >> - if (!info->ri_pageoff) >> - head->rc_page_count++; >> - >> dst = page_address(rqstp->rq_pages[info->ri_pageno]); >> memcpy(dst + info->ri_pageno, src + offset, page_len); >> >> @@ -1092,15 +1096,8 @@ static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) >> * @rqstp: set of pages to use as Read sink buffers >> * @head: pages under I/O collect here >> * >> - * The RPC/RDMA protocol assumes that the upper layer's XDR decoders >> - * pull each Read chunk as they decode an incoming RPC message. >> - * >> - * On Linux, however, the server needs to have a fully-constructed RPC >> - * message in rqstp->rq_arg when there is a positive return code from >> - * ->xpo_recvfrom. So the Read list is safety-checked immediately when >> - * it is received, then here the whole Read list is pulled all at once. >> - * The ingress RPC message is fully reconstructed once all associated >> - * RDMA Reads have completed. >> + * Handle complex Read chunk cases fully before svc_rdma_recvfrom() >> + * returns. >> * >> * Return values: >> * %1: all needed RDMA Reads were posted successfully, >> @@ -1159,3 +1156,115 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, >> svc_rdma_read_info_free(info); >> return ret; >> } >> + >> +/** >> + * svc_rdma_prepare_read_chunk - Prepare rq_arg for Read chunk >> + * @rqstp: set of pages to use as Read sink buffers >> + * @head: pages under I/O collect here >> + * >> + * The Read chunk will be pulled when the upper layer's XDR >> + * decoder calls svc_decode_argument_payload(). In the meantime, >> + * fake up rq_arg.page_len and .len to reflect the size of the >> + * yet-to-be-pulled payload. >> + */ >> +void svc_rdma_prepare_read_chunk(struct svc_rqst *rqstp, >> + struct svc_rdma_recv_ctxt *head) >> +{ >> + struct svc_rdma_chunk *chunk = pcl_first_chunk(&head->rc_read_pcl); >> + unsigned int length = xdr_align_size(chunk->ch_length); >> + struct xdr_buf *buf = &rqstp->rq_arg; >> + >> + buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; >> + buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; >> + buf->head[0].iov_len = chunk->ch_position; >> + >> + buf->page_len = length; >> + buf->len += length; >> + buf->buflen += length; >> + >> + /* >> + * rq_respages starts after the last arg page. Note that we >> + * don't know the offset yet, so add an extra page as slack. >> + */ >> + length += PAGE_SIZE * 2 - 1; >> + rqstp->rq_respages = &rqstp->rq_pages[length >> PAGE_SHIFT]; >> + rqstp->rq_next_page = rqstp->rq_respages + 1; >> +} >> + >> +/** >> + * svc_rdma_pull_read_chunk - Pull one Read chunk from the client >> + * @rdma: controlling RDMA transport >> + * @rqstp: set of pages to use as Read sink buffers >> + * @head: pages under I/O collect here >> + * @offset: offset of payload in file's page cache >> + * @length: size of payload, in bytes >> + * >> + * Once the upper layer's XDR decoder has decoded the length of >> + * the payload and it's offset, we can be clever about setting up >> + * the RDMA Read sink buffer so that the VFS does not have to >> + * re-align the payload once it is received. >> + * >> + * Caveat: To keep things simple, this is an optimization that is >> + * used only when there is a single Read chunk in the Read >> + * list. >> + * >> + * Return values: >> + * %0: all needed RDMA Reads were posted successfully, >> + * %-EINVAL: client provided the wrong chunk size, >> + * %-ENOMEM: rdma_rw context pool was exhausted, >> + * %-ENOTCONN: posting failed (connection is lost), >> + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). >> + */ >> +int svc_rdma_pull_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, >> + struct svc_rdma_recv_ctxt *head, >> + unsigned int offset, unsigned int length) >> +{ >> + struct svc_rdma_read_info *info; >> + struct svc_rdma_chunk_ctxt *cc; >> + struct svc_rdma_chunk *chunk; >> + int ret; >> + >> + trace_svcrdma_arg_payload(rqstp, offset, length); >> + >> + /* Sanity: the Requester must have provided enough >> + * bytes to fill the XDR opaque. >> + */ >> + chunk = pcl_first_chunk(&head->rc_read_pcl); >> + if (length > chunk->ch_length) >> + return -EINVAL; >> + >> + info = svc_rdma_read_info_alloc(rdma); >> + if (!info) >> + return -ENOMEM; >> + cc = &info->ri_cc; >> + info->ri_rqst = rqstp; >> + info->ri_readctxt = head; >> + info->ri_pageno = 0; >> + info->ri_pageoff = offset_in_page(offset); >> + info->ri_totalbytes = 0; >> + >> + ret = svc_rdma_build_read_chunk(info, chunk); >> + if (ret < 0) >> + goto out_err; >> + rqstp->rq_arg.pages = &info->ri_rqst->rq_pages[0]; >> + rqstp->rq_arg.page_base = offset_in_page(offset); >> + rqstp->rq_arg.buflen += offset_in_page(offset); >> + >> + trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); >> + init_completion(&cc->cc_done); >> + ret = svc_rdma_post_chunk_ctxt(cc); >> + if (ret < 0) >> + goto out_err; >> + >> + ret = 0; >> + wait_for_completion(&cc->cc_done); >> + if (cc->cc_status != IB_WC_SUCCESS) >> + ret = -EIO; >> + >> + /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ >> + head->rc_page_count = 0; >> + >> +out_err: >> + svc_rdma_read_info_free(info); >> + return ret; >> +} >> -- Chuck Lever