> On Aug 31, 2021, at 3:05 PM, Chuck Lever <chuck.lever@xxxxxxxxxx> wrote: > > This enables the XDR decoder to figure out how the payload sink > buffer needs to be aligned before setting up the RDMA Reads. > Then re-alignment of large RDMA Read payloads can be avoided. Should say "can be avoided" but isn't yet. All of these patches are pre-requisites to that behavior. > Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> > --- > include/linux/sunrpc/svc_rdma.h | 6 + > include/trace/events/rpcrdma.h | 26 ++++++ > net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 17 +++- > net/sunrpc/xprtrdma/svc_rdma_rw.c | 139 ++++++++++++++++++++++++++++--- > 4 files changed, 169 insertions(+), 19 deletions(-) > > diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h > index f660244cc8ba..8d80a759a909 100644 > --- a/include/linux/sunrpc/svc_rdma.h > +++ b/include/linux/sunrpc/svc_rdma.h > @@ -192,6 +192,12 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, > extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, > struct svc_rqst *rqstp, > struct svc_rdma_recv_ctxt *head); > +extern void svc_rdma_prepare_read_chunk(struct svc_rqst *rqstp, > + struct svc_rdma_recv_ctxt *head); > +extern int svc_rdma_pull_read_chunk(struct svcxprt_rdma *rdma, > + struct svc_rqst *rqstp, > + struct svc_rdma_recv_ctxt *ctxt, > + unsigned int offset, unsigned int length); > > /* svc_rdma_sendto.c */ > extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma); > diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h > index 5954ce036173..30440cca321a 100644 > --- a/include/trace/events/rpcrdma.h > +++ b/include/trace/events/rpcrdma.h > @@ -2136,6 +2136,32 @@ TRACE_EVENT(svcrdma_sq_post_err, > ) > ); > > +TRACE_EVENT(svcrdma_arg_payload, > + TP_PROTO( > + const struct svc_rqst *rqstp, > + unsigned int offset, > + unsigned int length > + ), > + > + TP_ARGS(rqstp, offset, length), > + > + TP_STRUCT__entry( > + __field(u32, xid) > + __field(u32, offset) > + __field(u32, length) > + ), > + > + TP_fast_assign( > + __entry->xid = __be32_to_cpu(rqstp->rq_xid); > + __entry->offset = offset_in_page(offset); > + __entry->length = length; > + ), > + > + TP_printk("xid=0x%08x offset=%u length=%u", > + __entry->xid, __entry->offset, __entry->length > + ) > +); > + > #endif /* _TRACE_RPCRDMA_H */ > > #include <trace/define_trace.h> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > index 08a620b370ae..cd9c0fb1a470 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > @@ -838,12 +838,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > > svc_rdma_get_inv_rkey(rdma_xprt, ctxt); > > - if (!pcl_is_empty(&ctxt->rc_read_pcl) || > - !pcl_is_empty(&ctxt->rc_call_pcl)) { > + if (!pcl_is_empty(&ctxt->rc_call_pcl) || > + ctxt->rc_read_pcl.cl_count > 1) { > ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); > if (ret < 0) > goto out_readfail; > - } > + } else if (ctxt->rc_read_pcl.cl_count == 1) > + svc_rdma_prepare_read_chunk(rqstp, ctxt); > > rqstp->rq_xprt_ctxt = ctxt; > rqstp->rq_prot = IPPROTO_MAX; > @@ -887,5 +888,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > int svc_rdma_argument_payload(struct svc_rqst *rqstp, unsigned int offset, > unsigned int length) > { > - return 0; > + struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt; > + struct svc_xprt *xprt = rqstp->rq_xprt; > + struct svcxprt_rdma *rdma = > + container_of(xprt, struct svcxprt_rdma, sc_xprt); > + > + if (!pcl_is_empty(&ctxt->rc_call_pcl) || > + ctxt->rc_read_pcl.cl_count != 1) > + return 0; > + return svc_rdma_pull_read_chunk(rdma, rqstp, ctxt, offset, length); > } > diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c > index 29b7d477891c..5f03dfd2fa03 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c > @@ -707,19 +707,24 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, > > len = segment->rs_length; > sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; > + > + trace_printk("pageoff=%u len=%u sges=%u\n", > + info->ri_pageoff, len, sge_no); > + > ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); > if (!ctxt) > return -ENOMEM; > ctxt->rw_nents = sge_no; > + head->rc_page_count += sge_no; > > sg = ctxt->rw_sg_table.sgl; > for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { > seg_len = min_t(unsigned int, len, > PAGE_SIZE - info->ri_pageoff); > > - if (!info->ri_pageoff) > - head->rc_page_count++; > - > + trace_printk(" page=%p seg_len=%u offset=%u\n", > + rqstp->rq_pages[info->ri_pageno], seg_len, > + info->ri_pageoff); > sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], > seg_len, info->ri_pageoff); > sg = sg_next(sg); > @@ -804,15 +809,14 @@ static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, > unsigned int page_no, numpages; > > numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; > + head->rc_page_count += numpages; > + > for (page_no = 0; page_no < numpages; page_no++) { > unsigned int page_len; > > page_len = min_t(unsigned int, remaining, > PAGE_SIZE - info->ri_pageoff); > > - if (!info->ri_pageoff) > - head->rc_page_count++; > - > dst = page_address(rqstp->rq_pages[info->ri_pageno]); > memcpy(dst + info->ri_pageno, src + offset, page_len); > > @@ -1092,15 +1096,8 @@ static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) > * @rqstp: set of pages to use as Read sink buffers > * @head: pages under I/O collect here > * > - * The RPC/RDMA protocol assumes that the upper layer's XDR decoders > - * pull each Read chunk as they decode an incoming RPC message. > - * > - * On Linux, however, the server needs to have a fully-constructed RPC > - * message in rqstp->rq_arg when there is a positive return code from > - * ->xpo_recvfrom. So the Read list is safety-checked immediately when > - * it is received, then here the whole Read list is pulled all at once. > - * The ingress RPC message is fully reconstructed once all associated > - * RDMA Reads have completed. > + * Handle complex Read chunk cases fully before svc_rdma_recvfrom() > + * returns. > * > * Return values: > * %1: all needed RDMA Reads were posted successfully, > @@ -1159,3 +1156,115 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, > svc_rdma_read_info_free(info); > return ret; > } > + > +/** > + * svc_rdma_prepare_read_chunk - Prepare rq_arg for Read chunk > + * @rqstp: set of pages to use as Read sink buffers > + * @head: pages under I/O collect here > + * > + * The Read chunk will be pulled when the upper layer's XDR > + * decoder calls svc_decode_argument_payload(). In the meantime, > + * fake up rq_arg.page_len and .len to reflect the size of the > + * yet-to-be-pulled payload. > + */ > +void svc_rdma_prepare_read_chunk(struct svc_rqst *rqstp, > + struct svc_rdma_recv_ctxt *head) > +{ > + struct svc_rdma_chunk *chunk = pcl_first_chunk(&head->rc_read_pcl); > + unsigned int length = xdr_align_size(chunk->ch_length); > + struct xdr_buf *buf = &rqstp->rq_arg; > + > + buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; > + buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; > + buf->head[0].iov_len = chunk->ch_position; > + > + buf->page_len = length; > + buf->len += length; > + buf->buflen += length; > + > + /* > + * rq_respages starts after the last arg page. Note that we > + * don't know the offset yet, so add an extra page as slack. > + */ > + length += PAGE_SIZE * 2 - 1; > + rqstp->rq_respages = &rqstp->rq_pages[length >> PAGE_SHIFT]; > + rqstp->rq_next_page = rqstp->rq_respages + 1; > +} > + > +/** > + * svc_rdma_pull_read_chunk - Pull one Read chunk from the client > + * @rdma: controlling RDMA transport > + * @rqstp: set of pages to use as Read sink buffers > + * @head: pages under I/O collect here > + * @offset: offset of payload in file's page cache > + * @length: size of payload, in bytes > + * > + * Once the upper layer's XDR decoder has decoded the length of > + * the payload and it's offset, we can be clever about setting up > + * the RDMA Read sink buffer so that the VFS does not have to > + * re-align the payload once it is received. > + * > + * Caveat: To keep things simple, this is an optimization that is > + * used only when there is a single Read chunk in the Read > + * list. > + * > + * Return values: > + * %0: all needed RDMA Reads were posted successfully, > + * %-EINVAL: client provided the wrong chunk size, > + * %-ENOMEM: rdma_rw context pool was exhausted, > + * %-ENOTCONN: posting failed (connection is lost), > + * %-EIO: rdma_rw initialization failed (DMA mapping, etc). > + */ > +int svc_rdma_pull_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, > + struct svc_rdma_recv_ctxt *head, > + unsigned int offset, unsigned int length) > +{ > + struct svc_rdma_read_info *info; > + struct svc_rdma_chunk_ctxt *cc; > + struct svc_rdma_chunk *chunk; > + int ret; > + > + trace_svcrdma_arg_payload(rqstp, offset, length); > + > + /* Sanity: the Requester must have provided enough > + * bytes to fill the XDR opaque. > + */ > + chunk = pcl_first_chunk(&head->rc_read_pcl); > + if (length > chunk->ch_length) > + return -EINVAL; > + > + info = svc_rdma_read_info_alloc(rdma); > + if (!info) > + return -ENOMEM; > + cc = &info->ri_cc; > + info->ri_rqst = rqstp; > + info->ri_readctxt = head; > + info->ri_pageno = 0; > + info->ri_pageoff = offset_in_page(offset); > + info->ri_totalbytes = 0; > + > + ret = svc_rdma_build_read_chunk(info, chunk); > + if (ret < 0) > + goto out_err; > + rqstp->rq_arg.pages = &info->ri_rqst->rq_pages[0]; > + rqstp->rq_arg.page_base = offset_in_page(offset); > + rqstp->rq_arg.buflen += offset_in_page(offset); > + > + trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); > + init_completion(&cc->cc_done); > + ret = svc_rdma_post_chunk_ctxt(cc); > + if (ret < 0) > + goto out_err; > + > + ret = 0; > + wait_for_completion(&cc->cc_done); > + if (cc->cc_status != IB_WC_SUCCESS) > + ret = -EIO; > + > + /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ > + head->rc_page_count = 0; > + > +out_err: > + svc_rdma_read_info_free(info); > + return ret; > +} > > -- Chuck Lever