On Mon, May 07, 2018 at 03:27:21PM -0400, Chuck Lever wrote: > svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt > free list. This eliminates the overhead of calling kmalloc / kfree, > both of which grab a globally shared lock that disables interrupts. > To reduce contention further, separate the use of these objects in > the Receive and Send paths in svcrdma. > > Subsequent patches will take advantage of this separation by > allocating real resources which are then cached in these objects. > The allocations are freed when the transport is torn down. Out of curiosity, about how much memory does that end up being per svc_xprt? --b. > > I've renamed the structure so that static type checking can be used > to ensure that uses of op_ctxt and recv_ctxt are not confused. As an > additional clean up, structure fields are renamed to conform with > kernel coding conventions. > > As a final clean up, helpers related to recv_ctxt are moved closer > to the functions that use them. > > Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> > --- > include/linux/sunrpc/svc_rdma.h | 24 ++ > net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 318 ++++++++++++++++++++++++++---- > net/sunrpc/xprtrdma/svc_rdma_rw.c | 84 ++++---- > net/sunrpc/xprtrdma/svc_rdma_sendto.c | 2 > net/sunrpc/xprtrdma/svc_rdma_transport.c | 142 +------------ > 5 files changed, 349 insertions(+), 221 deletions(-) > > diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h > index 88da0c9..37f759d 100644 > --- a/include/linux/sunrpc/svc_rdma.h > +++ b/include/linux/sunrpc/svc_rdma.h > @@ -128,6 +128,9 @@ struct svcxprt_rdma { > unsigned long sc_flags; > struct list_head sc_read_complete_q; > struct work_struct sc_work; > + > + spinlock_t sc_recv_lock; > + struct list_head sc_recv_ctxts; > }; > /* sc_flags */ > #define RDMAXPRT_CONN_PENDING 3 > @@ -142,6 +145,19 @@ struct svcxprt_rdma { > > #define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD > > +struct svc_rdma_recv_ctxt { > + struct list_head rc_list; > + struct ib_recv_wr rc_recv_wr; > + struct ib_cqe rc_cqe; > + struct xdr_buf rc_arg; > + u32 rc_byte_len; > + unsigned int rc_page_count; > + unsigned int rc_hdr_count; > + struct ib_sge rc_sges[1 + > + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE]; > + struct page *rc_pages[RPCSVC_MAXPAGES]; > +}; > + > /* Track DMA maps for this transport and context */ > static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma, > struct svc_rdma_op_ctxt *ctxt) > @@ -155,13 +171,19 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, > struct xdr_buf *rcvbuf); > > /* svc_rdma_recvfrom.c */ > +extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma); > +extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma); > +extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, > + struct svc_rdma_recv_ctxt *ctxt, > + int free_pages); > +extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma); > extern int svc_rdma_recvfrom(struct svc_rqst *); > > /* svc_rdma_rw.c */ > extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma); > extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, > struct svc_rqst *rqstp, > - struct svc_rdma_op_ctxt *head, __be32 *p); > + struct svc_rdma_recv_ctxt *head, __be32 *p); > extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, > __be32 *wr_ch, struct xdr_buf *xdr); > extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > index 330d542..b7d9c55 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c > @@ -1,6 +1,6 @@ > // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause > /* > - * Copyright (c) 2016, 2017 Oracle. All rights reserved. > + * Copyright (c) 2016-2018 Oracle. All rights reserved. > * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. > * > @@ -61,7 +61,7 @@ > * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's > * data payload from the client. svc_rdma_recvfrom sets up the > * RDMA Reads using pages in svc_rqst::rq_pages, which are > - * transferred to an svc_rdma_op_ctxt for the duration of the > + * transferred to an svc_rdma_recv_ctxt for the duration of the > * I/O. svc_rdma_recvfrom then returns zero, since the RPC message > * is still not yet ready. > * > @@ -70,18 +70,18 @@ > * svc_rdma_recvfrom again. This second call may use a different > * svc_rqst than the first one, thus any information that needs > * to be preserved across these two calls is kept in an > - * svc_rdma_op_ctxt. > + * svc_rdma_recv_ctxt. > * > * The second call to svc_rdma_recvfrom performs final assembly > * of the RPC Call message, using the RDMA Read sink pages kept in > - * the svc_rdma_op_ctxt. The xdr_buf is copied from the > - * svc_rdma_op_ctxt to the second svc_rqst. The second call returns > + * the svc_rdma_recv_ctxt. The xdr_buf is copied from the > + * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns > * the length of the completed RPC Call message. > * > * Page Management > * > * Pages under I/O must be transferred from the first svc_rqst to an > - * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns. > + * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. > * > * The first svc_rqst supplies pages for RDMA Reads. These are moved > * from rqstp::rq_pages into ctxt::pages. The consumed elements of > @@ -89,7 +89,7 @@ > * svc_rdma_recvfrom call returns. > * > * During the second svc_rdma_recvfrom call, RDMA Read sink pages > - * are transferred from the svc_rdma_op_ctxt to the second svc_rqst > + * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst > * (see rdma_read_complete() below). > */ > > @@ -108,13 +108,247 @@ > > #define RPCDBG_FACILITY RPCDBG_SVCXPRT > > +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc); > + > +static inline struct svc_rdma_recv_ctxt * > +svc_rdma_next_recv_ctxt(struct list_head *list) > +{ > + return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt, > + rc_list); > +} > + > +/** > + * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt > + * @rdma: svcxprt_rdma being torn down > + * > + */ > +void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) > +{ > + struct svc_rdma_recv_ctxt *ctxt; > + > + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) { > + list_del(&ctxt->rc_list); > + kfree(ctxt); > + } > +} > + > +static struct svc_rdma_recv_ctxt * > +svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) > +{ > + struct svc_rdma_recv_ctxt *ctxt; > + > + spin_lock(&rdma->sc_recv_lock); > + ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts); > + if (!ctxt) > + goto out_empty; > + list_del(&ctxt->rc_list); > + spin_unlock(&rdma->sc_recv_lock); > + > +out: > + ctxt->rc_recv_wr.num_sge = 0; > + ctxt->rc_page_count = 0; > + return ctxt; > + > +out_empty: > + spin_unlock(&rdma->sc_recv_lock); > + > + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); > + if (!ctxt) > + return NULL; > + goto out; > +} > + > +static void svc_rdma_recv_ctxt_unmap(struct svcxprt_rdma *rdma, > + struct svc_rdma_recv_ctxt *ctxt) > +{ > + struct ib_device *device = rdma->sc_cm_id->device; > + int i; > + > + for (i = 0; i < ctxt->rc_recv_wr.num_sge; i++) > + ib_dma_unmap_page(device, > + ctxt->rc_sges[i].addr, > + ctxt->rc_sges[i].length, > + DMA_FROM_DEVICE); > +} > + > +/** > + * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list > + * @rdma: controlling svcxprt_rdma > + * @ctxt: object to return to the free list > + * @free_pages: Non-zero if rc_pages should be freed > + * > + */ > +void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, > + struct svc_rdma_recv_ctxt *ctxt, > + int free_pages) > +{ > + unsigned int i; > + > + if (free_pages) > + for (i = 0; i < ctxt->rc_page_count; i++) > + put_page(ctxt->rc_pages[i]); > + spin_lock(&rdma->sc_recv_lock); > + list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts); > + spin_unlock(&rdma->sc_recv_lock); > +} > + > +static int svc_rdma_post_recv(struct svcxprt_rdma *rdma) > +{ > + struct ib_device *device = rdma->sc_cm_id->device; > + struct svc_rdma_recv_ctxt *ctxt; > + struct ib_recv_wr *bad_recv_wr; > + int sge_no, buflen, ret; > + struct page *page; > + dma_addr_t pa; > + > + ctxt = svc_rdma_recv_ctxt_get(rdma); > + if (!ctxt) > + return -ENOMEM; > + > + buflen = 0; > + ctxt->rc_cqe.done = svc_rdma_wc_receive; > + for (sge_no = 0; buflen < rdma->sc_max_req_size; sge_no++) { > + if (sge_no >= rdma->sc_max_sge) { > + pr_err("svcrdma: Too many sges (%d)\n", sge_no); > + goto err_put_ctxt; > + } > + > + page = alloc_page(GFP_KERNEL); > + if (!page) > + goto err_put_ctxt; > + ctxt->rc_pages[sge_no] = page; > + ctxt->rc_page_count++; > + > + pa = ib_dma_map_page(device, ctxt->rc_pages[sge_no], > + 0, PAGE_SIZE, DMA_FROM_DEVICE); > + if (ib_dma_mapping_error(device, pa)) > + goto err_put_ctxt; > + ctxt->rc_sges[sge_no].addr = pa; > + ctxt->rc_sges[sge_no].length = PAGE_SIZE; > + ctxt->rc_sges[sge_no].lkey = rdma->sc_pd->local_dma_lkey; > + ctxt->rc_recv_wr.num_sge++; > + > + buflen += PAGE_SIZE; > + } > + ctxt->rc_recv_wr.next = NULL; > + ctxt->rc_recv_wr.sg_list = &ctxt->rc_sges[0]; > + ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; > + > + svc_xprt_get(&rdma->sc_xprt); > + ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, &bad_recv_wr); > + trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret); > + if (ret) > + goto err_post; > + return 0; > + > +err_put_ctxt: > + svc_rdma_recv_ctxt_unmap(rdma, ctxt); > + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); > + return -ENOMEM; > +err_post: > + svc_rdma_recv_ctxt_unmap(rdma, ctxt); > + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); > + svc_xprt_put(&rdma->sc_xprt); > + return ret; > +} > + > +/** > + * svc_rdma_post_recvs - Post initial set of Recv WRs > + * @rdma: fresh svcxprt_rdma > + * > + * Returns true if successful, otherwise false. > + */ > +bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) > +{ > + unsigned int i; > + int ret; > + > + for (i = 0; i < rdma->sc_max_requests; i++) { > + ret = svc_rdma_post_recv(rdma); > + if (ret) { > + pr_err("svcrdma: failure posting recv buffers: %d\n", > + ret); > + return false; > + } > + } > + return true; > +} > + > +/** > + * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC > + * @cq: Completion Queue context > + * @wc: Work Completion object > + * > + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that > + * the Receive completion handler could be running. > + */ > +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) > +{ > + struct svcxprt_rdma *rdma = cq->cq_context; > + struct ib_cqe *cqe = wc->wr_cqe; > + struct svc_rdma_recv_ctxt *ctxt; > + > + trace_svcrdma_wc_receive(wc); > + > + /* WARNING: Only wc->wr_cqe and wc->status are reliable */ > + ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); > + svc_rdma_recv_ctxt_unmap(rdma, ctxt); > + > + if (wc->status != IB_WC_SUCCESS) > + goto flushed; > + > + if (svc_rdma_post_recv(rdma)) > + goto post_err; > + > + /* All wc fields are now known to be valid */ > + ctxt->rc_byte_len = wc->byte_len; > + spin_lock(&rdma->sc_rq_dto_lock); > + list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); > + spin_unlock(&rdma->sc_rq_dto_lock); > + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); > + if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) > + svc_xprt_enqueue(&rdma->sc_xprt); > + goto out; > + > +flushed: > + if (wc->status != IB_WC_WR_FLUSH_ERR) > + pr_err("svcrdma: Recv: %s (%u/0x%x)\n", > + ib_wc_status_msg(wc->status), > + wc->status, wc->vendor_err); > +post_err: > + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); > + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); > + svc_xprt_enqueue(&rdma->sc_xprt); > +out: > + svc_xprt_put(&rdma->sc_xprt); > +} > + > +/** > + * svc_rdma_flush_recv_queues - Drain pending Receive work > + * @rdma: svcxprt_rdma being shut down > + * > + */ > +void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) > +{ > + struct svc_rdma_recv_ctxt *ctxt; > + > + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { > + list_del(&ctxt->rc_list); > + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); > + } > + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { > + list_del(&ctxt->rc_list); > + svc_rdma_recv_ctxt_put(rdma, ctxt, 1); > + } > +} > + > /* > * Replace the pages in the rq_argpages array with the pages from the SGE in > * the RDMA_RECV completion. The SGL should contain full pages up until the > * last one. > */ > static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, > - struct svc_rdma_op_ctxt *ctxt) > + struct svc_rdma_recv_ctxt *ctxt) > { > struct page *page; > int sge_no; > @@ -123,30 +357,30 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, > /* The reply path assumes the Call's transport header resides > * in rqstp->rq_pages[0]. > */ > - page = ctxt->pages[0]; > + page = ctxt->rc_pages[0]; > put_page(rqstp->rq_pages[0]); > rqstp->rq_pages[0] = page; > > /* Set up the XDR head */ > rqstp->rq_arg.head[0].iov_base = page_address(page); > rqstp->rq_arg.head[0].iov_len = > - min_t(size_t, ctxt->byte_len, ctxt->sge[0].length); > - rqstp->rq_arg.len = ctxt->byte_len; > - rqstp->rq_arg.buflen = ctxt->byte_len; > + min_t(size_t, ctxt->rc_byte_len, ctxt->rc_sges[0].length); > + rqstp->rq_arg.len = ctxt->rc_byte_len; > + rqstp->rq_arg.buflen = ctxt->rc_byte_len; > > /* Compute bytes past head in the SGL */ > - len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len; > + len = ctxt->rc_byte_len - rqstp->rq_arg.head[0].iov_len; > > /* If data remains, store it in the pagelist */ > rqstp->rq_arg.page_len = len; > rqstp->rq_arg.page_base = 0; > > sge_no = 1; > - while (len && sge_no < ctxt->count) { > - page = ctxt->pages[sge_no]; > + while (len && sge_no < ctxt->rc_recv_wr.num_sge) { > + page = ctxt->rc_pages[sge_no]; > put_page(rqstp->rq_pages[sge_no]); > rqstp->rq_pages[sge_no] = page; > - len -= min_t(u32, len, ctxt->sge[sge_no].length); > + len -= min_t(u32, len, ctxt->rc_sges[sge_no].length); > sge_no++; > } > rqstp->rq_respages = &rqstp->rq_pages[sge_no]; > @@ -154,11 +388,11 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, > > /* If not all pages were used from the SGL, free the remaining ones */ > len = sge_no; > - while (sge_no < ctxt->count) { > - page = ctxt->pages[sge_no++]; > + while (sge_no < ctxt->rc_recv_wr.num_sge) { > + page = ctxt->rc_pages[sge_no++]; > put_page(page); > } > - ctxt->count = len; > + ctxt->rc_page_count = len; > > /* Set up tail */ > rqstp->rq_arg.tail[0].iov_base = NULL; > @@ -364,29 +598,29 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) > } > > static void rdma_read_complete(struct svc_rqst *rqstp, > - struct svc_rdma_op_ctxt *head) > + struct svc_rdma_recv_ctxt *head) > { > int page_no; > > /* Copy RPC pages */ > - for (page_no = 0; page_no < head->count; page_no++) { > + for (page_no = 0; page_no < head->rc_page_count; page_no++) { > put_page(rqstp->rq_pages[page_no]); > - rqstp->rq_pages[page_no] = head->pages[page_no]; > + rqstp->rq_pages[page_no] = head->rc_pages[page_no]; > } > > /* Point rq_arg.pages past header */ > - rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; > - rqstp->rq_arg.page_len = head->arg.page_len; > + rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count]; > + rqstp->rq_arg.page_len = head->rc_arg.page_len; > > /* rq_respages starts after the last arg page */ > rqstp->rq_respages = &rqstp->rq_pages[page_no]; > rqstp->rq_next_page = rqstp->rq_respages + 1; > > /* Rebuild rq_arg head and tail. */ > - rqstp->rq_arg.head[0] = head->arg.head[0]; > - rqstp->rq_arg.tail[0] = head->arg.tail[0]; > - rqstp->rq_arg.len = head->arg.len; > - rqstp->rq_arg.buflen = head->arg.buflen; > + rqstp->rq_arg.head[0] = head->rc_arg.head[0]; > + rqstp->rq_arg.tail[0] = head->rc_arg.tail[0]; > + rqstp->rq_arg.len = head->rc_arg.len; > + rqstp->rq_arg.buflen = head->rc_arg.buflen; > } > > static void svc_rdma_send_error(struct svcxprt_rdma *xprt, > @@ -506,28 +740,26 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > struct svc_xprt *xprt = rqstp->rq_xprt; > struct svcxprt_rdma *rdma_xprt = > container_of(xprt, struct svcxprt_rdma, sc_xprt); > - struct svc_rdma_op_ctxt *ctxt; > + struct svc_rdma_recv_ctxt *ctxt; > __be32 *p; > int ret; > > spin_lock(&rdma_xprt->sc_rq_dto_lock); > - if (!list_empty(&rdma_xprt->sc_read_complete_q)) { > - ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q, > - struct svc_rdma_op_ctxt, list); > - list_del(&ctxt->list); > + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); > + if (ctxt) { > + list_del(&ctxt->rc_list); > spin_unlock(&rdma_xprt->sc_rq_dto_lock); > rdma_read_complete(rqstp, ctxt); > goto complete; > - } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { > - ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q, > - struct svc_rdma_op_ctxt, list); > - list_del(&ctxt->list); > - } else { > + } > + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); > + if (!ctxt) { > /* No new incoming requests, terminate the loop */ > clear_bit(XPT_DATA, &xprt->xpt_flags); > spin_unlock(&rdma_xprt->sc_rq_dto_lock); > return 0; > } > + list_del(&ctxt->rc_list); > spin_unlock(&rdma_xprt->sc_rq_dto_lock); > > atomic_inc(&rdma_stat_recv); > @@ -545,7 +777,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > if (svc_rdma_is_backchannel_reply(xprt, p)) { > ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p, > &rqstp->rq_arg); > - svc_rdma_put_context(ctxt, 0); > + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); > return ret; > } > > @@ -554,7 +786,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > goto out_readchunk; > > complete: > - svc_rdma_put_context(ctxt, 0); > + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); > rqstp->rq_prot = IPPROTO_MAX; > svc_xprt_copy_addrs(rqstp, xprt); > return rqstp->rq_arg.len; > @@ -567,16 +799,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) > > out_err: > svc_rdma_send_error(rdma_xprt, p, ret); > - svc_rdma_put_context(ctxt, 0); > + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 0); > return 0; > > out_postfail: > if (ret == -EINVAL) > svc_rdma_send_error(rdma_xprt, p, ret); > - svc_rdma_put_context(ctxt, 1); > + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1); > return ret; > > out_drop: > - svc_rdma_put_context(ctxt, 1); > + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt, 1); > return 0; > } > diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c > index 887ceef..c080ce2 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c > @@ -1,6 +1,6 @@ > // SPDX-License-Identifier: GPL-2.0 > /* > - * Copyright (c) 2016 Oracle. All rights reserved. > + * Copyright (c) 2016-2018 Oracle. All rights reserved. > * > * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. > */ > @@ -227,7 +227,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) > /* State for pulling a Read chunk. > */ > struct svc_rdma_read_info { > - struct svc_rdma_op_ctxt *ri_readctxt; > + struct svc_rdma_recv_ctxt *ri_readctxt; > unsigned int ri_position; > unsigned int ri_pageno; > unsigned int ri_pageoff; > @@ -282,10 +282,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) > pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", > ib_wc_status_msg(wc->status), > wc->status, wc->vendor_err); > - svc_rdma_put_context(info->ri_readctxt, 1); > + svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt, 1); > } else { > spin_lock(&rdma->sc_rq_dto_lock); > - list_add_tail(&info->ri_readctxt->list, > + list_add_tail(&info->ri_readctxt->rc_list, > &rdma->sc_read_complete_q); > spin_unlock(&rdma->sc_rq_dto_lock); > > @@ -607,7 +607,7 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, > struct svc_rqst *rqstp, > u32 rkey, u32 len, u64 offset) > { > - struct svc_rdma_op_ctxt *head = info->ri_readctxt; > + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; > struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; > struct svc_rdma_rw_ctxt *ctxt; > unsigned int sge_no, seg_len; > @@ -625,10 +625,10 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, > seg_len = min_t(unsigned int, len, > PAGE_SIZE - info->ri_pageoff); > > - head->arg.pages[info->ri_pageno] = > + head->rc_arg.pages[info->ri_pageno] = > rqstp->rq_pages[info->ri_pageno]; > if (!info->ri_pageoff) > - head->count++; > + head->rc_page_count++; > > sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], > seg_len, info->ri_pageoff); > @@ -705,9 +705,9 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, > } > > /* Construct RDMA Reads to pull over a normal Read chunk. The chunk > - * data lands in the page list of head->arg.pages. > + * data lands in the page list of head->rc_arg.pages. > * > - * Currently NFSD does not look at the head->arg.tail[0] iovec. > + * Currently NFSD does not look at the head->rc_arg.tail[0] iovec. > * Therefore, XDR round-up of the Read chunk and trailing > * inline content must both be added at the end of the pagelist. > */ > @@ -715,10 +715,10 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, > struct svc_rdma_read_info *info, > __be32 *p) > { > - struct svc_rdma_op_ctxt *head = info->ri_readctxt; > + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; > int ret; > > - info->ri_pageno = head->hdr_count; > + info->ri_pageno = head->rc_hdr_count; > info->ri_pageoff = 0; > > ret = svc_rdma_build_read_chunk(rqstp, info, p); > @@ -732,11 +732,11 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, > * chunk is not included in either the pagelist or in > * the tail. > */ > - head->arg.tail[0].iov_base = > - head->arg.head[0].iov_base + info->ri_position; > - head->arg.tail[0].iov_len = > - head->arg.head[0].iov_len - info->ri_position; > - head->arg.head[0].iov_len = info->ri_position; > + head->rc_arg.tail[0].iov_base = > + head->rc_arg.head[0].iov_base + info->ri_position; > + head->rc_arg.tail[0].iov_len = > + head->rc_arg.head[0].iov_len - info->ri_position; > + head->rc_arg.head[0].iov_len = info->ri_position; > > /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). > * > @@ -749,9 +749,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, > */ > info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2; > > - head->arg.page_len = info->ri_chunklen; > - head->arg.len += info->ri_chunklen; > - head->arg.buflen += info->ri_chunklen; > + head->rc_arg.page_len = info->ri_chunklen; > + head->rc_arg.len += info->ri_chunklen; > + head->rc_arg.buflen += info->ri_chunklen; > > out: > return ret; > @@ -760,7 +760,7 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, > /* Construct RDMA Reads to pull over a Position Zero Read chunk. > * The start of the data lands in the first page just after > * the Transport header, and the rest lands in the page list of > - * head->arg.pages. > + * head->rc_arg.pages. > * > * Assumptions: > * - A PZRC has an XDR-aligned length (no implicit round-up). > @@ -772,11 +772,11 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, > struct svc_rdma_read_info *info, > __be32 *p) > { > - struct svc_rdma_op_ctxt *head = info->ri_readctxt; > + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; > int ret; > > - info->ri_pageno = head->hdr_count - 1; > - info->ri_pageoff = offset_in_page(head->byte_len); > + info->ri_pageno = head->rc_hdr_count - 1; > + info->ri_pageoff = offset_in_page(head->rc_byte_len); > > ret = svc_rdma_build_read_chunk(rqstp, info, p); > if (ret < 0) > @@ -784,22 +784,22 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, > > trace_svcrdma_encode_pzr(info->ri_chunklen); > > - head->arg.len += info->ri_chunklen; > - head->arg.buflen += info->ri_chunklen; > + head->rc_arg.len += info->ri_chunklen; > + head->rc_arg.buflen += info->ri_chunklen; > > - if (head->arg.buflen <= head->sge[0].length) { > + if (head->rc_arg.buflen <= head->rc_sges[0].length) { > /* Transport header and RPC message fit entirely > * in page where head iovec resides. > */ > - head->arg.head[0].iov_len = info->ri_chunklen; > + head->rc_arg.head[0].iov_len = info->ri_chunklen; > } else { > /* Transport header and part of RPC message reside > * in the head iovec's page. > */ > - head->arg.head[0].iov_len = > - head->sge[0].length - head->byte_len; > - head->arg.page_len = > - info->ri_chunklen - head->arg.head[0].iov_len; > + head->rc_arg.head[0].iov_len = > + head->rc_sges[0].length - head->rc_byte_len; > + head->rc_arg.page_len = > + info->ri_chunklen - head->rc_arg.head[0].iov_len; > } > > out: > @@ -824,24 +824,24 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, > * - All Read segments in @p have the same Position value. > */ > int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, > - struct svc_rdma_op_ctxt *head, __be32 *p) > + struct svc_rdma_recv_ctxt *head, __be32 *p) > { > struct svc_rdma_read_info *info; > struct page **page; > int ret; > > /* The request (with page list) is constructed in > - * head->arg. Pages involved with RDMA Read I/O are > + * head->rc_arg. Pages involved with RDMA Read I/O are > * transferred there. > */ > - head->hdr_count = head->count; > - head->arg.head[0] = rqstp->rq_arg.head[0]; > - head->arg.tail[0] = rqstp->rq_arg.tail[0]; > - head->arg.pages = head->pages; > - head->arg.page_base = 0; > - head->arg.page_len = 0; > - head->arg.len = rqstp->rq_arg.len; > - head->arg.buflen = rqstp->rq_arg.buflen; > + head->rc_hdr_count = head->rc_page_count; > + head->rc_arg.head[0] = rqstp->rq_arg.head[0]; > + head->rc_arg.tail[0] = rqstp->rq_arg.tail[0]; > + head->rc_arg.pages = head->rc_pages; > + head->rc_arg.page_base = 0; > + head->rc_arg.page_len = 0; > + head->rc_arg.len = rqstp->rq_arg.len; > + head->rc_arg.buflen = rqstp->rq_arg.buflen; > > info = svc_rdma_read_info_alloc(rdma); > if (!info) > @@ -867,7 +867,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, > > out: > /* Read sink pages have been moved from rqstp->rq_pages to > - * head->arg.pages. Force svc_recv to refill those slots > + * head->rc_arg.pages. Force svc_recv to refill those slots > * in rq_pages. > */ > for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++) > diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c > index fed28de..a397d9a 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c > @@ -1,6 +1,6 @@ > // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause > /* > - * Copyright (c) 2016 Oracle. All rights reserved. > + * Copyright (c) 2016-2018 Oracle. All rights reserved. > * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. > * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. > * > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c > index ca9001d..afd5e61 100644 > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c > @@ -63,7 +63,6 @@ > > #define RPCDBG_FACILITY RPCDBG_SVCXPRT > > -static int svc_rdma_post_recv(struct svcxprt_rdma *xprt); > static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, > struct net *net); > static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, > @@ -175,11 +174,7 @@ static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) > { > unsigned int i; > > - /* Each RPC/RDMA credit can consume one Receive and > - * one Send WQE at the same time. > - */ > - i = xprt->sc_sq_depth + xprt->sc_rq_depth; > - > + i = xprt->sc_sq_depth; > while (i--) { > struct svc_rdma_op_ctxt *ctxt; > > @@ -298,54 +293,6 @@ static void qp_event_handler(struct ib_event *event, void *context) > } > > /** > - * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC > - * @cq: completion queue > - * @wc: completed WR > - * > - */ > -static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) > -{ > - struct svcxprt_rdma *xprt = cq->cq_context; > - struct ib_cqe *cqe = wc->wr_cqe; > - struct svc_rdma_op_ctxt *ctxt; > - > - trace_svcrdma_wc_receive(wc); > - > - /* WARNING: Only wc->wr_cqe and wc->status are reliable */ > - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); > - svc_rdma_unmap_dma(ctxt); > - > - if (wc->status != IB_WC_SUCCESS) > - goto flushed; > - > - /* All wc fields are now known to be valid */ > - ctxt->byte_len = wc->byte_len; > - spin_lock(&xprt->sc_rq_dto_lock); > - list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); > - spin_unlock(&xprt->sc_rq_dto_lock); > - > - svc_rdma_post_recv(xprt); > - > - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); > - if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) > - goto out; > - goto out_enqueue; > - > -flushed: > - if (wc->status != IB_WC_WR_FLUSH_ERR) > - pr_err("svcrdma: Recv: %s (%u/0x%x)\n", > - ib_wc_status_msg(wc->status), > - wc->status, wc->vendor_err); > - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); > - svc_rdma_put_context(ctxt, 1); > - > -out_enqueue: > - svc_xprt_enqueue(&xprt->sc_xprt); > -out: > - svc_xprt_put(&xprt->sc_xprt); > -} > - > -/** > * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC > * @cq: completion queue > * @wc: completed WR > @@ -392,12 +339,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, > INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); > INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); > INIT_LIST_HEAD(&cma_xprt->sc_ctxts); > + INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); > INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); > init_waitqueue_head(&cma_xprt->sc_send_wait); > > spin_lock_init(&cma_xprt->sc_lock); > spin_lock_init(&cma_xprt->sc_rq_dto_lock); > spin_lock_init(&cma_xprt->sc_ctxt_lock); > + spin_lock_init(&cma_xprt->sc_recv_lock); > spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); > > /* > @@ -411,63 +360,6 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, > return cma_xprt; > } > > -static int > -svc_rdma_post_recv(struct svcxprt_rdma *xprt) > -{ > - struct ib_recv_wr recv_wr, *bad_recv_wr; > - struct svc_rdma_op_ctxt *ctxt; > - struct page *page; > - dma_addr_t pa; > - int sge_no; > - int buflen; > - int ret; > - > - ctxt = svc_rdma_get_context(xprt); > - buflen = 0; > - ctxt->direction = DMA_FROM_DEVICE; > - ctxt->cqe.done = svc_rdma_wc_receive; > - for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { > - if (sge_no >= xprt->sc_max_sge) { > - pr_err("svcrdma: Too many sges (%d)\n", sge_no); > - goto err_put_ctxt; > - } > - page = alloc_page(GFP_KERNEL); > - if (!page) > - goto err_put_ctxt; > - ctxt->pages[sge_no] = page; > - pa = ib_dma_map_page(xprt->sc_cm_id->device, > - page, 0, PAGE_SIZE, > - DMA_FROM_DEVICE); > - if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) > - goto err_put_ctxt; > - svc_rdma_count_mappings(xprt, ctxt); > - ctxt->sge[sge_no].addr = pa; > - ctxt->sge[sge_no].length = PAGE_SIZE; > - ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; > - ctxt->count = sge_no + 1; > - buflen += PAGE_SIZE; > - } > - recv_wr.next = NULL; > - recv_wr.sg_list = &ctxt->sge[0]; > - recv_wr.num_sge = ctxt->count; > - recv_wr.wr_cqe = &ctxt->cqe; > - > - svc_xprt_get(&xprt->sc_xprt); > - ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); > - trace_svcrdma_post_recv(&recv_wr, ret); > - if (ret) { > - svc_rdma_unmap_dma(ctxt); > - svc_rdma_put_context(ctxt, 1); > - svc_xprt_put(&xprt->sc_xprt); > - } > - return ret; > - > - err_put_ctxt: > - svc_rdma_unmap_dma(ctxt); > - svc_rdma_put_context(ctxt, 1); > - return -ENOMEM; > -} > - > static void > svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, > struct rdma_conn_param *param) > @@ -699,7 +591,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) > struct ib_qp_init_attr qp_attr; > struct ib_device *dev; > struct sockaddr *sap; > - unsigned int i, ctxts; > + unsigned int ctxts; > int ret = 0; > > listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); > @@ -804,14 +696,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) > !rdma_ib_or_roce(dev, newxprt->sc_port_num)) > goto errout; > > - /* Post receive buffers */ > - for (i = 0; i < newxprt->sc_max_requests; i++) { > - ret = svc_rdma_post_recv(newxprt); > - if (ret) { > - dprintk("svcrdma: failure posting receive buffers\n"); > - goto errout; > - } > - } > + if (!svc_rdma_post_recvs(newxprt)) > + goto errout; > > /* Swap out the handler */ > newxprt->sc_cm_id->event_handler = rdma_cma_handler; > @@ -908,20 +794,7 @@ static void __svc_rdma_free(struct work_struct *work) > pr_err("svcrdma: sc_xprt still in use? (%d)\n", > kref_read(&xprt->xpt_ref)); > > - while (!list_empty(&rdma->sc_read_complete_q)) { > - struct svc_rdma_op_ctxt *ctxt; > - ctxt = list_first_entry(&rdma->sc_read_complete_q, > - struct svc_rdma_op_ctxt, list); > - list_del(&ctxt->list); > - svc_rdma_put_context(ctxt, 1); > - } > - while (!list_empty(&rdma->sc_rq_dto_q)) { > - struct svc_rdma_op_ctxt *ctxt; > - ctxt = list_first_entry(&rdma->sc_rq_dto_q, > - struct svc_rdma_op_ctxt, list); > - list_del(&ctxt->list); > - svc_rdma_put_context(ctxt, 1); > - } > + svc_rdma_flush_recv_queues(rdma); > > /* Warn if we leaked a resource or under-referenced */ > if (rdma->sc_ctxt_used != 0) > @@ -936,6 +809,7 @@ static void __svc_rdma_free(struct work_struct *work) > > svc_rdma_destroy_rw_ctxts(rdma); > svc_rdma_destroy_ctxts(rdma); > + svc_rdma_recv_ctxts_destroy(rdma); > > /* Destroy the QP if present (not a listener) */ > if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html