The plan is to replace the local bespoke code that constructs and posts RDMA Read and Write Work Requests with calls to the rdma_rw API. This shares code with other RDMA-enabled ULPs that manages the gory details of buffer registration and posting Work Requests. Some design notes: o svc_xprt reference counting is modified, since one rdma_rw_ctx generates one completion, no matter how many Write WRs are posted. To accommodate the new reference counting scheme, a new version of svc_rdma_send() is introduced. o The structure of RPC-over-RDMA transport headers is flexible, allowing multiple segments per Reply with arbitrary alignment. Thus I did not take the further step of chaining Write WRs with the Send WR containing the RPC Reply message. The Write and Send WRs continue to be built by separate pieces of code. o The current code builds the transport header as it is construct- ing Write WRs. I've replaced that with marshaling of transport header data items in a separate step. This is because the exact structure of client-provided segments may not align with the components of the server's reply xdr_buf, or the pages in the page list. Thus parts of each client-provided segment may be written at different points in the send path. o Since the Write list and Reply chunk marshaling code is being replaced, I took the opportunity to replace some of the C structure-based XDR encoding code with more portable code that instead uses pointer arithmetic. Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- include/linux/sunrpc/svc_rdma.h | 22 + net/sunrpc/xprtrdma/Makefile | 2 net/sunrpc/xprtrdma/svc_rdma_marshal.c | 114 ++++ net/sunrpc/xprtrdma/svc_rdma_rw.c | 785 ++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/svc_rdma_sendto.c | 2 net/sunrpc/xprtrdma/svc_rdma_transport.c | 4 6 files changed, 925 insertions(+), 4 deletions(-) create mode 100644 net/sunrpc/xprtrdma/svc_rdma_rw.c diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index f066349..5fc9f6e 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -145,12 +145,15 @@ struct svcxprt_rdma { u32 sc_max_requests; /* Max requests */ u32 sc_max_bc_requests;/* Backward credits */ int sc_max_req_size; /* Size of each RQ WR buf */ + u8 sc_port_num; struct ib_pd *sc_pd; spinlock_t sc_ctxt_lock; struct list_head sc_ctxts; int sc_ctxt_used; + spinlock_t sc_rw_ctxt_lock; + struct list_head sc_rw_ctxts; spinlock_t sc_map_lock; struct list_head sc_maps; @@ -209,10 +212,15 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, struct rpcrdma_msg *, enum rpcrdma_errcode, __be32 *); -extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int); +extern void svc_rdma_old_encode_write_list(struct rpcrdma_msg *rmsgp, + int chunks); extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, __be32, __be64, u32); +extern void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, + unsigned int consumed); +extern void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, + unsigned int consumed); extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp); /* svc_rdma_recvfrom.c */ @@ -224,6 +232,18 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *, struct svc_rdma_op_ctxt *, int *, u32 *, u32, u32, u64, bool); +/* svc_rdma_rw.c */ +extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma); +extern int svc_rdma_recv_read_list(struct svcxprt_rdma *rdma, __be32 *ch, + struct svc_rdma_op_ctxt *head, + struct svc_rqst *rqstp); +extern int svc_rdma_send_write_list(struct svcxprt_rdma *rdma, + __be32 *wr_ch, __be32 *rdma_resp, + struct xdr_buf *xdr); +extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, + __be32 *wr_lst, __be32 *rp_ch, + __be32 *rdma_resp, struct xdr_buf *xdr); + /* svc_rdma_sendto.c */ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, struct svc_rdma_req_map *, bool); diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index ef19fa4..c1ae814 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ fmr_ops.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ - module.o + svc_rdma_rw.o module.o rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index 1c4aabf..bf3ca7e 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -217,7 +217,7 @@ unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp) return (unsigned long)p - (unsigned long)rdma_resp; } -void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks) +void svc_rdma_old_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks) { struct rpcrdma_write_array *ary; @@ -255,3 +255,115 @@ void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary, seg->rs_offset = rs_offset; seg->rs_length = cpu_to_be32(write_len); } + +/* One Write chunk is copied from Call transport header to Reply + * transport header. Each segment's length field is updated to + * reflect number of bytes consumed in the segment. + * + * Returns number of segments in this chunk. + */ +static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, + unsigned int remaining) +{ + unsigned int i, nsegs; + u32 seg_len; + + /* Write list discriminator */ + *dst++ = *src++; + + /* number of segments in this chunk */ + nsegs = be32_to_cpup(src); + *dst++ = *src++; + + for (i = nsegs; i; i--) { + /* segment's RDMA handle */ + *dst++ = *src++; + + /* bytes returned in this segment */ + seg_len = be32_to_cpu(*src); + if (remaining >= seg_len) { + /* entire segment was consumed */ + *dst = *src; + remaining -= seg_len; + } else { + /* segment only partly filled */ + *dst = cpu_to_be32(remaining); + remaining = 0; + } + dst++; src++; + + /* segment's RDMA offset */ + *dst++ = *src++; + *dst++ = *src++; + } + + return nsegs; +} + +/** + * svc_rdma_xdr_encode_write_list - Encode Reply's Write list + * @rdma_resp: Reply's transport header + * @wr_ch: Write list in Call's transport header + * @consumed: total Write chunk bytes consumed in Reply + * + * The client provided a Write list in the Call message. Fill in + * the segments in the first Write chunk in the Reply's transport + * header with the number of bytes consumed in each segment. + * Remaining chunks are returned unused. + * + * Assumptions: + * - Server can consume only one Write chunk. + */ +void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, + unsigned int consumed) +{ + unsigned int nsegs; + __be32 *p, *q; + + /* RPC-over-RDMA V1 replies never have a Read list. */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + q = wr_ch; + while (*q != xdr_zero) { + nsegs = xdr_encode_write_chunk(p, q, consumed); + q += 2 + nsegs * rpcrdma_segment_maxsz; + p += 2 + nsegs * rpcrdma_segment_maxsz; + consumed = 0; + } + + /* Terminate Write list */ + *p++ = xdr_zero; + + /* Reply chunk discriminator; may be replaced later */ + *p = xdr_zero; +} + +/** + * svc_rdma_xdr_encode_reply_chunk - Encode Reply's Reply chunk + * @rdma_resp: Reply's transport header + * @rp_ch: Reply chunk in Call's transport header + * @consumed: total Reply chunk bytes consumed in Reply + * + * The client provided a Reply chunk in the Call message. Fill in + * the segments in the Reply chunk in the Reply message with the + * number of bytes consumed in each segment. + * + * Assumptions: + * - Reply chunk is smaller than or equal in size to Reply + */ +void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, + unsigned int consumed) +{ + __be32 *p; + + /* Find the Reply chunk in the Reply's xprt header. + * RPC-over-RDMA V1 replies never have a Read list. + */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + /* Skip past Write list */ + while (*p++ != xdr_zero) + p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; + + xdr_encode_write_chunk(p, rp_ch, consumed); +} diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c new file mode 100644 index 0000000..1e76227 --- /dev/null +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -0,0 +1,785 @@ +/* + * Copyright (c) 2016 Oracle. All rights reserved. + * + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. + */ + +#include <linux/sunrpc/rpc_rdma.h> +#include <linux/sunrpc/svc_rdma.h> +#include <linux/sunrpc/debug.h> + +#include <rdma/rw.h> + +#define RPCDBG_FACILITY RPCDBG_SVCXPRT + +/* Each R/W context contains state for one chain of RDMA Read or + * Write Work Requests (one RDMA segment to be read from or written + * back to the client). + * + * Each WR chain handles a single contiguous server-side buffer, + * because some registration modes (eg. FRWR) do not support a + * discontiguous scatterlist. + * + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment + * from a client may contain a unique R_key, so each WR chain moves + * one segment (or less) at a time. + * + * The scatterlist makes this data structure just over 8KB in size + * on 4KB-page platforms. As the size of this structure increases + * past one page, it becomes more likely that allocating one of these + * will fail. Therefore, these contexts are created on demand, but + * cached and reused until the controlling svcxprt_rdma is destroyed. + */ +struct svc_rdma_rw_ctxt { + struct list_head rw_list; + struct ib_cqe rw_cqe; + struct svcxprt_rdma *rw_rdma; + int rw_nents; + int rw_wrcount; + enum dma_data_direction rw_dir; + struct svc_rdma_op_ctxt *rw_readctxt; + struct rdma_rw_ctx rw_ctx; + struct scatterlist rw_sg[RPCSVC_MAXPAGES]; +}; + +static struct svc_rdma_rw_ctxt * +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_rw_ctxt *ctxt; + + svc_xprt_get(&rdma->sc_xprt); + + spin_lock(&rdma->sc_rw_ctxt_lock); + if (list_empty(&rdma->sc_rw_ctxts)) + goto out_empty; + + ctxt = list_first_entry(&rdma->sc_rw_ctxts, + struct svc_rdma_rw_ctxt, rw_list); + list_del_init(&ctxt->rw_list); + spin_unlock(&rdma->sc_rw_ctxt_lock); + +out: + ctxt->rw_dir = DMA_NONE; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_rw_ctxt_lock); + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) { + svc_xprt_put(&rdma->sc_xprt); + return NULL; + } + + ctxt->rw_rdma = rdma; + INIT_LIST_HEAD(&ctxt->rw_list); + sg_init_table(ctxt->rw_sg, ARRAY_SIZE(ctxt->rw_sg)); + goto out; +} + +static void svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt) +{ + struct svcxprt_rdma *rdma = ctxt->rw_rdma; + + if (ctxt->rw_dir != DMA_NONE) + rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, + ctxt->rw_sg, ctxt->rw_nents, + ctxt->rw_dir); + + spin_lock(&rdma->sc_rw_ctxt_lock); + list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); + spin_unlock(&rdma->sc_rw_ctxt_lock); + + svc_xprt_put(&rdma->sc_xprt); +} + +/** + * svc_rdma_destroy_rw_ctxts - Free write contexts + * @rdma: xprt about to be destroyed + * + */ +void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_rw_ctxt *ctxt; + + while (!list_empty(&rdma->sc_rw_ctxts)) { + ctxt = list_first_entry(&rdma->sc_rw_ctxts, + struct svc_rdma_rw_ctxt, rw_list); + list_del(&ctxt->rw_list); + kfree(ctxt); + } +} + +/** + * svc_rdma_wc_write_ctx - Handle completion of an RDMA Write ctx + * @cq: controlling Completion Queue + * @wc: Work Completion + * + * Invoked in soft IRQ context. + * + * Assumptions: + * - Write completion is not responsible for freeing pages under + * I/O. + */ +static void svc_rdma_wc_write_ctx(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_rw_ctxt *ctxt = + container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe); + struct svcxprt_rdma *rdma = ctxt->rw_rdma; + + atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + if (wc->status != IB_WC_SUCCESS) + goto flush; + +out: + svc_rdma_put_rw_ctxt(ctxt); + return; + +flush: + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + goto out; +} + +/** + * svc_rdma_wc_read_ctx - Handle completion of an RDMA Read ctx + * @cq: controlling Completion Queue + * @wc: Work Completion + * + * Invoked in soft IRQ context. + */ +static void svc_rdma_wc_read_ctx(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_rw_ctxt *ctxt = + container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe); + struct svcxprt_rdma *rdma = ctxt->rw_rdma; + struct svc_rdma_op_ctxt *head; + + atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + if (wc->status != IB_WC_SUCCESS) + goto flush; + + head = ctxt->rw_readctxt; + if (!head) + goto out; + + spin_lock(&rdma->sc_rq_dto_lock); + list_add_tail(&head->list, &rdma->sc_read_complete_q); + spin_unlock(&rdma->sc_rq_dto_lock); + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); + +out: + svc_rdma_put_rw_ctxt(ctxt); + return; + +flush: + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + goto out; +} + +/* This function sleeps when the transport's Send Queue is congested. + * + * Assumptions: + * - If ib_post_send() succeeds, only one completion is expected, + * even if one or more WRs are flushed. This is true when posting + * an rdma_rw_ctx or when posting a single signaled WR. + */ +static int svc_rdma_post_send(struct svcxprt_rdma *rdma, + struct ib_send_wr *first_wr, + int num_wrs) +{ + struct svc_xprt *xprt = &rdma->sc_xprt; + struct ib_send_wr *bad_wr; + int ret; + + do { + if ((atomic_sub_return(num_wrs, &rdma->sc_sq_avail) > 0)) { + ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); + if (ret) + break; + return 0; + } + + atomic_inc(&rdma_stat_sq_starve); + atomic_add(num_wrs, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > num_wrs); + } while (1); + + pr_err("svcrdma: post_send rc=%d; SQ avail=%d/%u\n", + ret, atomic_read(&rdma->sc_sq_avail), rdma->sc_sq_depth); + set_bit(XPT_CLOSE, &xprt->xpt_flags); + + /* If even one was posted, there will be a completion. */ + if (bad_wr != first_wr) + return 0; + + atomic_add(num_wrs, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + return -ENOTCONN; +} + +static int svc_rdma_send_write_ctx(struct svcxprt_rdma *rdma, + struct svc_rdma_rw_ctxt *ctxt, + u64 offset, u32 rkey) +{ + struct ib_send_wr *first_wr; + int ret; + + ret = rdma_rw_ctx_init(&ctxt->rw_ctx, + rdma->sc_qp, rdma->sc_port_num, + ctxt->rw_sg, ctxt->rw_nents, + 0, offset, rkey, DMA_TO_DEVICE); + if (ret < 0) + goto out_init; + + ctxt->rw_wrcount = ret; + ctxt->rw_dir = DMA_TO_DEVICE; + ctxt->rw_cqe.done = svc_rdma_wc_write_ctx; + first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, + rdma->sc_qp, rdma->sc_port_num, + &ctxt->rw_cqe, NULL); + atomic_add(ret, &rdma_stat_write); + return svc_rdma_post_send(rdma, first_wr, ret); + +out_init: + pr_err("svcrdma: rdma_rw_ctx_init failed: %d\n", ret); + return -EIO; +} + +/* Common information for sending a Write chunk. + * - Tracks progress of writing one chunk + * - Stores arguments for the SGL constructor function + */ +struct svc_rdma_write_info { + struct svcxprt_rdma *wi_rdma; + + /* write state of this chunk */ + unsigned int wi_bytes_consumed; + unsigned int wi_seg_off; + unsigned int wi_seg_no; + unsigned int wi_nsegs; + __be32 *wi_segs; + + /* SGL constructor arguments */ + struct xdr_buf *wi_xdr; + unsigned char *wi_base; + unsigned int wi_next_off; +}; + +static void svc_rdma_init_write_info(struct svcxprt_rdma *rdma, __be32 *chunk, + struct svc_rdma_write_info *info) +{ + info->wi_rdma = rdma; + info->wi_bytes_consumed = 0; + info->wi_seg_off = 0; + info->wi_seg_no = 0; + info->wi_nsegs = be32_to_cpup(chunk + 1); + info->wi_segs = chunk + 2; +} + +/* Build and DMA-map an SGL that covers one kvec in an xdr_buf + */ +static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt) +{ + sg_set_buf(&ctxt->rw_sg[0], info->wi_base, len); + info->wi_base += len; + + ctxt->rw_nents = 1; +} + +/* Build and DMA-map an SGL that covers the pagelist of an xdr_buf + */ +static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, + unsigned int remaining, + struct svc_rdma_rw_ctxt *ctxt) +{ + unsigned int sge_no, sge_bytes, page_off, page_no; + struct scatterlist *sg = ctxt->rw_sg; + struct xdr_buf *xdr = info->wi_xdr; + + page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT; + page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK; + info->wi_next_off += remaining; + + sge_no = 0; + do { + sge_bytes = min_t(unsigned int, remaining, + PAGE_SIZE - page_off); + + sg_set_page(&sg[sge_no++], xdr->pages[page_no], + sge_bytes, page_off); + + remaining -= sge_bytes; + page_no++; + page_off = 0; + } while (remaining); + + ctxt->rw_nents = sge_no; +} + +/* Post Write WRs to send a portion of an xdr_buf containing + * an RPC Reply. + */ +static int +svc_rdma_send_writes(struct svc_rdma_write_info *info, + void (*constructor)(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt), + unsigned int total) +{ + struct svcxprt_rdma *rdma = info->wi_rdma; + unsigned int remaining, seg_no, seg_off; + struct svc_rdma_rw_ctxt *ctxt; + __be32 *seg; + int ret; + + if (total == 0) + return 0; + + remaining = total; + seg_no = info->wi_seg_no; + seg_off = info->wi_seg_off; + seg = info->wi_segs + seg_no * rpcrdma_segment_maxsz; + do { + unsigned int write_len; + u32 rs_length, rs_handle; + u64 rs_offset; + + if (seg_no >= info->wi_nsegs) + goto out_overflow; + + ctxt = svc_rdma_get_rw_ctxt(rdma); + if (!ctxt) + goto out_noctx; + + rs_handle = be32_to_cpu(*seg++); + rs_length = be32_to_cpu(*seg++); + seg = xdr_decode_hyper(seg, &rs_offset); + + write_len = min(remaining, rs_length - seg_off); + constructor(info, write_len, ctxt); + ret = svc_rdma_send_write_ctx(rdma, ctxt, rs_offset + seg_off, + rs_handle); + if (ret < 0) + goto out_senderr; + + if (write_len == rs_length - seg_off) { + seg_no++; + seg_off = 0; + } else { + seg_off += write_len; + } + remaining -= write_len; + } while (remaining); + + info->wi_bytes_consumed += total; + info->wi_seg_no = seg_no; + info->wi_seg_off = seg_off; + return 0; + +out_overflow: + dprintk("svcrdma: inadequate space in Write chunk (%u)\n", + info->wi_nsegs); + return -E2BIG; + +out_noctx: + dprintk("svcrdma: no R/W ctxs available\n"); + return -ENOMEM; + +out_senderr: + svc_rdma_put_rw_ctxt(ctxt); + pr_err("svcrdma: failed to write pagelist (%d)\n", ret); + return ret; +} + +/* Send one of an xdr_buf's kvecs by itself. To send a Reply + * chunk, the whole RPC Reply is written back to the client. + * This function writes either the head or tail of the xdr_buf + * containing the Reply. + */ +static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, + struct kvec *vec) +{ + info->wi_base = vec->iov_base; + + return svc_rdma_send_writes(info, svc_rdma_vec_to_sg, + vec->iov_len); +} + +/* Send an xdr_buf's page list by itself. A Write chunk is + * just the page list. a Reply chunk is the head, page list, + * and tail. This function is shared between the two types + * of chunk. + */ +static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, + struct xdr_buf *xdr) +{ + info->wi_xdr = xdr; + info->wi_next_off = 0; + + return svc_rdma_send_writes(info, svc_rdma_pagelist_to_sg, + xdr->page_len); +} + +/** + * svc_rdma_send_write_list - Write all chunks in the Write list + * @rdma: controlling RDMA transport + * @wr_ch: Write list provided by client + * @rdma_resp: buffer containing transport header under construction + * @xdr: xdr_buf carrying an RPC Reply + * + * Returns: + * %0 if all needed RDMA Writes were posted successfully, + * %-E2BIG if the payload was larger than the Write chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + * + * Assumptions: + * - Only one Write chunk, and it's the xdr_buf's entire pagelist + */ +int svc_rdma_send_write_list(struct svcxprt_rdma *rdma, + __be32 *wr_ch, __be32 *rdma_resp, + struct xdr_buf *xdr) +{ + struct svc_rdma_write_info info; + int ret; + + svc_rdma_init_write_info(rdma, wr_ch, &info); + + ret = svc_rdma_send_xdr_pagelist(&info, xdr); + + svc_rdma_xdr_encode_write_list(rdma_resp, wr_ch, + info.wi_bytes_consumed); + return ret; +} + +/** + * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk + * @rdma: controlling RDMA transport + * @wr_lst: Write list provided by client + * @rp_ch: Reply chunk provided by client + * @rdma_resp: buffer containing transport header for Reply + * @xdr: xdr_buf carrying an RPC Reply + * + * Returns: + * %0 if all needed RDMA Writes were posted successfully, + * %-E2BIG if the payload was larger than the Reply chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + * + * Assumptions: + * - The Reply chunk always carries the whole xdr_buf + */ +int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *wr_lst, + __be32 *rp_ch, __be32 *rdma_resp, + struct xdr_buf *xdr) +{ + struct svc_rdma_write_info info; + int ret; + + svc_rdma_init_write_info(rdma, rp_ch, &info); + + ret = svc_rdma_send_xdr_kvec(&info, &xdr->head[0]); + if (ret < 0) + goto out; + + /* When Write list entries are present, server has already + * transmitted the pagelist payload via a Write chunk. Thus + * we can skip the pagelist here. + */ + if (!wr_lst) { + ret = svc_rdma_send_xdr_pagelist(&info, xdr); + if (ret < 0) + goto out; + } + + ret = svc_rdma_send_xdr_kvec(&info, &xdr->tail[0]); + +out: + svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, + info.wi_bytes_consumed); + return ret; +} + +/* Pull one Read chunk (segment) from the client. + * + * Returns zero if one or more RDMA Reads have been posted. Otherwise, + * returns a negative errno if there is a Read list present but RDMA + * Reads could not be posted. + * + * For incoming Reads, @rqstp provides a page list containing sink pages. + * As pages are prepared for I/O, they are transferred to @head. After + * all Reads in the list have completed, svc_rdma_recvfrom builds an + * xdr_buf from the page list in @head. + * + * On entry, *page_no and *page_offset point into the rqstp's page list. + * On return, *page_no and *page_offset are updated to point to the next + * position in the page list. + */ +static int svc_rdma_recv_read_segment(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *head, + struct svc_rqst *rqstp, + unsigned int *page_no, + unsigned int *page_offset, + u32 rkey, u32 len, u64 offset, + bool last) +{ + struct svc_rdma_rw_ctxt *ctxt; + struct ib_send_wr *first_wr; + unsigned int pg_no, seg_no; + u32 pg_off; + int ret; + + dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x\n", + len, offset, rkey); + + ctxt = svc_rdma_get_rw_ctxt(rdma); + if (!ctxt) + return -ENOMEM; + + pg_off = *page_offset; + pg_no = *page_no; + ctxt->rw_nents = PAGE_ALIGN(*page_offset + len) >> PAGE_SHIFT; + for (seg_no = 0; seg_no < ctxt->rw_nents; seg_no++) { + unsigned int seg_len = min_t(unsigned int, len, + PAGE_SIZE - pg_off); + + head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; + head->arg.page_len += seg_len; + head->arg.len += seg_len; + if (!pg_off) + head->count++; + + sg_set_page(&ctxt->rw_sg[seg_no], rqstp->rq_arg.pages[pg_no], + seg_len, pg_off); + + rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no + 1]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + + pg_off += seg_len; + if (pg_off == PAGE_SIZE) { + pg_off = 0; + pg_no++; + } + len -= seg_len; + } + + ret = rdma_rw_ctx_init(&ctxt->rw_ctx, + rdma->sc_qp, rdma->sc_port_num, + ctxt->rw_sg, ctxt->rw_nents, + 0, offset, rkey, DMA_FROM_DEVICE); + if (ret < 0) + goto out_init; + + ctxt->rw_wrcount = ret; + ctxt->rw_dir = DMA_FROM_DEVICE; + ctxt->rw_cqe.done = svc_rdma_wc_read_ctx; + ctxt->rw_readctxt = last ? head : NULL; + first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, + rdma->sc_qp, rdma->sc_port_num, + &ctxt->rw_cqe, NULL); + atomic_add(ret, &rdma_stat_read); + ret = svc_rdma_post_send(rdma, first_wr, ret); + if (ret) + goto out_send; + + *page_no = pg_no; + *page_offset = pg_off; + return 0; + +out_init: + pr_err("svcrdma: rdma_rw_ctx_init failed: %d\n", ret); + ret = -EIO; +out_send: + svc_rdma_put_rw_ctxt(ctxt); + return ret; +} + +/* If there was additional inline content, append it to the end of arg.pages. + * Tail copy has to be done after the reader function has determined how many + * pages were consumed for RDMA Read. + */ +static int svc_rdma_copy_tail(struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *head, u32 position, + unsigned int page_offset, unsigned int page_no) +{ + char *srcp, *destp; + u32 byte_count; + + srcp = head->arg.head[0].iov_base + position; + byte_count = head->arg.head[0].iov_len - position; + if (byte_count > PAGE_SIZE) { + dprintk("svcrdma: large tail unsupported\n"); + return 0; + } + + /* Fit as much of the tail on the current page as possible */ + if (page_offset != PAGE_SIZE) { + destp = page_address(rqstp->rq_arg.pages[page_no]); + destp += page_offset; + while (byte_count--) { + *destp++ = *srcp++; + page_offset++; + if (page_offset == PAGE_SIZE && byte_count) + goto more; + } + goto done; + } + +more: + /* Fit the rest on the next page */ + page_no++; + destp = page_address(rqstp->rq_arg.pages[page_no]); + while (byte_count--) + *destp++ = *srcp++; + + rqstp->rq_respages = &rqstp->rq_arg.pages[page_no + 1]; + rqstp->rq_next_page = rqstp->rq_respages + 1; + +done: + byte_count = head->arg.head[0].iov_len - position; + head->arg.page_len += byte_count; + head->arg.len += byte_count; + head->arg.buflen += byte_count; + return 1; +} + +static unsigned int svc_rdma_read_chunk_count(__be32 *p) +{ + unsigned int nsegs; + + for (nsegs = 0; *p != xdr_zero; p += rpcrdma_readchunk_maxsz) + nsegs++; + return nsegs; +} + +/** + * svc_rdma_recv_read_list - Pull read chunks from the client + * @rdma: controlling RDMA transport + * @ch: pointer to Read list in the incoming transport header + * @head: pages under I/O collect here + * @rqstp: set of pages to use as Read sink buffers + * + * Returns: + * %0 if there is no Read list, + * %1 if all needed RDMA Reads were posted successfully, + * %-EINVAL if the Read chunk data is too large, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + * + * Assumptions: + * - Clients can send multiple Read chunks in a Read list, but + * the chunks must all have the same value in their Position + * field. + */ +int svc_rdma_recv_read_list(struct svcxprt_rdma *rdma, __be32 *ch, + struct svc_rdma_op_ctxt *head, + struct svc_rqst *rqstp) +{ + unsigned int page_no, page_offset; + u32 position; + __be32 *p; + bool last; + int ret; + + p = ch; + + /* Sanity check */ + if (svc_rdma_read_chunk_count(p++) > RPCSVC_MAXPAGES) + return -EINVAL; + + /* "head" keeps all the pages that comprise the request. + */ + head->arg.head[0] = rqstp->rq_arg.head[0]; + head->arg.tail[0] = rqstp->rq_arg.tail[0]; + head->hdr_count = head->count; + head->arg.page_base = 0; + head->arg.page_len = 0; + head->arg.len = rqstp->rq_arg.len; + head->arg.buflen = rqstp->rq_arg.buflen; + + /* RDMA_NOMSG: RDMA Read data should land just after Receive data. + */ + position = be32_to_cpu(*p++); + if (position == 0) { + head->arg.pages = &head->pages[0]; + page_offset = head->byte_len; + } else { + head->arg.pages = &head->pages[head->count]; + page_offset = 0; + } + + /* This server implementation supports only one Read chunk (of one + * or more segments) per message. The list walk is terminated once + * "position" changes. + */ + page_no = 0; + last = false; + while (!last) { + u32 rs_handle, rs_length; + u64 rs_offset; + + rs_handle = be32_to_cpu(*p++), + rs_length = be32_to_cpu(*p++); + p = xdr_decode_hyper(p, &rs_offset); + + /* Examine next read segment */ + if (*p == xdr_zero || + ((*p != xdr_zero) && (be32_to_cpu(*(p + 1)) != position))) + last = true; + + ret = svc_rdma_recv_read_segment(rdma, head, rqstp, + &page_no, &page_offset, + rs_handle, rs_length, + rs_offset, last); + if (ret < 0) + goto out; + + p += 2; + } + + /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */ + if (page_offset & 3) { + u32 pad = 4 - (page_offset & 3); + + head->arg.tail[0].iov_len += pad; + head->arg.len += pad; + head->arg.buflen += pad; + page_offset += pad; + } + + ret = 1; + if (position && position < head->arg.head[0].iov_len) + ret = svc_rdma_copy_tail(rqstp, head, position, + page_offset, page_no); + head->arg.head[0].iov_len = position; + head->position = position; + + out: + /* Detach arg pages. svc_recv will replenish them */ + for (page_no = 0; + &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++) + rqstp->rq_pages[page_no] = NULL; + return ret; +} diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index b4028bc3..e4b8800 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -406,7 +406,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt, } } /* Update the req with the number of chunks actually used */ - svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no); + svc_rdma_old_encode_write_list(rdma_resp, chunk_no); return rqstp->rq_res.page_len; diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index b84cd53..90fabad 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -560,6 +560,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_maps); init_waitqueue_head(&cma_xprt->sc_send_wait); @@ -567,6 +568,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_frmr_q_lock); spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); spin_lock_init(&cma_xprt->sc_map_lock); /* @@ -998,6 +1000,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) newxprt, newxprt->sc_cm_id); dev = newxprt->sc_cm_id->device; + newxprt->sc_port_num = newxprt->sc_cm_id->port_num; /* Qualify the transport resource defaults with the * capabilities of this particular device */ @@ -1247,6 +1250,7 @@ static void __svc_rdma_free(struct work_struct *work) } rdma_dealloc_frmr_q(rdma); + svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_ctxts(rdma); svc_rdma_destroy_maps(rdma); -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html