RE: [PATCH V3] svcrdma: refactor marshalling logic

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



> 
> Hi Steve
> 
> I am testing this patch. I have found that when server tries to initiate RDMA-READ on ocrdma
> device the RDMA-READ posting fails because there is no FENCE bit set for
> Non-iwarp device which is using frmr. Because of this, whenever server tries to initiate
> RDMA_READ operation, it fails with completion error.
> This bug was there in v1 and v2 as well.
>

Why would the FENCE bit not be required for mlx4, mthca, cxgb4, and yet be required for ocrdma?

 
> Check inline for the exact location of the change.
> 
> Rest is okay from my side, iozone is passing with this patch. Off-course after putting a FENCE
> indicator.
> 
> -Regards
>  Devesh
> 
> > -----Original Message-----
> > From: linux-rdma-owner@xxxxxxxxxxxxxxx [mailto:linux-rdma-
> > owner@xxxxxxxxxxxxxxx] On Behalf Of Steve Wise
> > Sent: Thursday, May 29, 2014 10:26 PM
> > To: bfields@xxxxxxxxxxxx
> > Cc: linux-nfs@xxxxxxxxxxxxxxx; linux-rdma@xxxxxxxxxxxxxxx;
> > tom@xxxxxxxxxxxxxxxxxxxxx
> > Subject: [PATCH V3] svcrdma: refactor marshalling logic
> >
> > This patch refactors the NFSRDMA server marshalling logic to remove the
> > intermediary map structures.  It also fixes an existing bug where the
> > NFSRDMA server was not minding the device fast register page list length
> > limitations.
> >
> > I've also made a git repo available with these patches on top of 3.15-rc7:
> >
> > git://git.linux-nfs.org/projects/swise/linux.git svcrdma-refactor-v3
> >
> > Changes since V2:
> >
> > - fixed logic bug in rdma_read_chunk_frmr() and rdma_read_chunk_lcl()
> >
> > - in rdma_read_chunks(), set the reader function pointer only once since
> >   it doesn't change
> >
> > - squashed the patch back into one patch since the previous split wasn't
> >   bisectable
> >
> > Changes since V1:
> >
> > - fixed regression for devices that don't support FRMRs (see
> >   rdma_read_chunk_lcl())
> >
> > - split patch up for closer review.  However I request it be squashed
> >   before merging as they is not bisectable, and I think these changes
> >   should all be a single commit anyway.
> >
> > Please review, and test if you can.  I'd like this to hit 3.16.
> >
> > Signed-off-by: Tom Tucker <tom@xxxxxxxxxxxxxxxxxxxxx>
> > Signed-off-by: Steve Wise <swise@xxxxxxxxxxxxxxxxxxxxx>
> > ---
> >
> >  include/linux/sunrpc/svc_rdma.h          |    3
> >  net/sunrpc/xprtrdma/svc_rdma_recvfrom.c  |  643 +++++++++++++----------
> > -------
> >  net/sunrpc/xprtrdma/svc_rdma_sendto.c    |  230 +----------
> >  net/sunrpc/xprtrdma/svc_rdma_transport.c |   62 ++-
> >  4 files changed, 332 insertions(+), 606 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/svc_rdma.h
> > b/include/linux/sunrpc/svc_rdma.h index 0b8e3e6..5cf99a0 100644
> > --- a/include/linux/sunrpc/svc_rdma.h
> > +++ b/include/linux/sunrpc/svc_rdma.h
> > @@ -115,14 +115,13 @@ struct svc_rdma_fastreg_mr {
> >  	struct list_head frmr_list;
> >  };
> >  struct svc_rdma_req_map {
> > -	struct svc_rdma_fastreg_mr *frmr;
> >  	unsigned long count;
> >  	union {
> >  		struct kvec sge[RPCSVC_MAXPAGES];
> >  		struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
> > +		unsigned long lkey[RPCSVC_MAXPAGES];
> >  	};
> >  };
> > -#define RDMACTXT_F_FAST_UNREG	1
> >  #define RDMACTXT_F_LAST_CTXT	2
> >
> >  #define	SVCRDMA_DEVCAP_FAST_REG		1	/*
> > fast mr registration */
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > index 8d904e4..52d9f2c 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ -69,7
> > +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> >
> >  	/* Set up the XDR head */
> >  	rqstp->rq_arg.head[0].iov_base = page_address(page);
> > -	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt-
> > >sge[0].length);
> > +	rqstp->rq_arg.head[0].iov_len =
> > +		min_t(size_t, byte_count, ctxt->sge[0].length);
> >  	rqstp->rq_arg.len = byte_count;
> >  	rqstp->rq_arg.buflen = byte_count;
> >
> > @@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
> >  		page = ctxt->pages[sge_no];
> >  		put_page(rqstp->rq_pages[sge_no]);
> >  		rqstp->rq_pages[sge_no] = page;
> > -		bc -= min(bc, ctxt->sge[sge_no].length);
> > +		bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
> >  		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
> >  		sge_no++;
> >  	}
> > @@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst
> > *rqstp,
> >  	rqstp->rq_arg.tail[0].iov_len = 0;
> >  }
> >
> > -/* Encode a read-chunk-list as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - chunk[0]->position points to pages[0] at an offset of 0
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - *   PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - *   chunk in the read list
> > - *
> > - */
> > -static int map_read_chunks(struct svcxprt_rdma *xprt,
> > -			   struct svc_rqst *rqstp,
> > -			   struct svc_rdma_op_ctxt *head,
> > -			   struct rpcrdma_msg *rmsgp,
> > -			   struct svc_rdma_req_map *rpl_map,
> > -			   struct svc_rdma_req_map *chl_map,
> > -			   int ch_count,
> > -			   int byte_count)
> > +static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
> >  {
> > -	int sge_no;
> > -	int sge_bytes;
> > -	int page_off;
> > -	int page_no;
> > -	int ch_bytes;
> > -	int ch_no;
> > -	struct rpcrdma_read_chunk *ch;
> > +	if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type)
> > ==
> > +	     RDMA_TRANSPORT_IWARP)
> > +		return 1;
> > +	else
> > +		return min_t(int, sge_count, xprt->sc_max_sge); }
> >
> > -	sge_no = 0;
> > -	page_no = 0;
> > -	page_off = 0;
> > -	ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -	ch_no = 0;
> > -	ch_bytes = ntohl(ch->rc_target.rs_length);
> > -	head->arg.head[0] = rqstp->rq_arg.head[0];
> > -	head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > -	head->arg.pages = &head->pages[head->count];
> > -	head->hdr_count = head->count; /* save count of hdr pages */
> > -	head->arg.page_base = 0;
> > -	head->arg.page_len = ch_bytes;
> > -	head->arg.len = rqstp->rq_arg.len + ch_bytes;
> > -	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
> > -	head->count++;
> > -	chl_map->ch[0].start = 0;
> > -	while (byte_count) {
> > -		rpl_map->sge[sge_no].iov_base =
> > -			page_address(rqstp->rq_arg.pages[page_no]) +
> > page_off;
> > -		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
> > -		rpl_map->sge[sge_no].iov_len = sge_bytes;
> > -		/*
> > -		 * Don't bump head->count here because the same page
> > -		 * may be used by multiple SGE.
> > -		 */
> > -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > -		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
> > +typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
> > +			      struct svc_rqst *rqstp,
> > +			      struct svc_rdma_op_ctxt *head,
> > +			      int *page_no,
> > +			      u32 *page_offset,
> > +			      u32 rs_handle,
> > +			      u32 rs_length,
> > +			      u64 rs_offset,
> > +			      int last);
> > +
> > +/* Issue an RDMA_READ using the local lkey to map the data sink */
> > +static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
> > +			       struct svc_rqst *rqstp,
> > +			       struct svc_rdma_op_ctxt *head,
> > +			       int *page_no,
> > +			       u32 *page_offset,
> > +			       u32 rs_handle,
> > +			       u32 rs_length,
> > +			       u64 rs_offset,
> > +			       int last)
> > +{
> > +	struct ib_send_wr read_wr;
> > +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > +	int ret, read, pno;
> > +	u32 pg_off = *page_offset;
> > +	u32 pg_no = *page_no;
> > +
> > +	ctxt->direction = DMA_FROM_DEVICE;
> > +	ctxt->read_hdr = head;
> > +	pages_needed =
> > +		min_t(int, pages_needed, rdma_read_max_sge(xprt,
> > pages_needed));
> > +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> > +
> > +	for (pno = 0; pno < pages_needed; pno++) {
> > +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > +		head->arg.page_len += len;
> > +		head->arg.len += len;
> > +		if (!pg_off)
> > +			head->count++;
> > +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> >  		rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +		ctxt->sge[pno].addr =
> > +			ib_dma_map_page(xprt->sc_cm_id->device,
> > +					head->arg.pages[pg_no], pg_off,
> > +					PAGE_SIZE - pg_off,
> > +					DMA_FROM_DEVICE);
> > +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +					   ctxt->sge[pno].addr);
> > +		if (ret)
> > +			goto err;
> > +		atomic_inc(&xprt->sc_dma_used);
> >
> > -		byte_count -= sge_bytes;
> > -		ch_bytes -= sge_bytes;
> > -		sge_no++;
> > -		/*
> > -		 * If all bytes for this chunk have been mapped to an
> > -		 * SGE, move to the next SGE
> > -		 */
> > -		if (ch_bytes == 0) {
> > -			chl_map->ch[ch_no].count =
> > -				sge_no - chl_map->ch[ch_no].start;
> > -			ch_no++;
> > -			ch++;
> > -			chl_map->ch[ch_no].start = sge_no;
> > -			ch_bytes = ntohl(ch->rc_target.rs_length);
> > -			/* If bytes remaining account for next chunk */
> > -			if (byte_count) {
> > -				head->arg.page_len += ch_bytes;
> > -				head->arg.len += ch_bytes;
> > -				head->arg.buflen += ch_bytes;
> > -			}
> > +		/* The lkey here is either a local dma lkey or a dma_mr lkey
> > */
> > +		ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
> > +		ctxt->sge[pno].length = len;
> > +		ctxt->count++;
> > +
> > +		/* adjust offset and wrap to next page if needed */
> > +		pg_off += len;
> > +		if (pg_off == PAGE_SIZE) {
> > +			pg_off = 0;
> > +			pg_no++;
> >  		}
> > -		/*
> > -		 * If this SGE consumed all of the page, move to the
> > -		 * next page
> > -		 */
> > -		if ((sge_bytes + page_off) == PAGE_SIZE) {
> > -			page_no++;
> > -			page_off = 0;
> > -			/*
> > -			 * If there are still bytes left to map, bump
> > -			 * the page count
> > -			 */
> > -			if (byte_count)
> > -				head->count++;
> > -		} else
> > -			page_off += sge_bytes;
> > +		rs_length -= len;
> >  	}
> > -	BUG_ON(byte_count != 0);
> > -	return sge_no;
> > +
> > +	if (last && rs_length == 0)
> > +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +	else
> > +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +
> > +	memset(&read_wr, 0, sizeof(read_wr));
> > +	read_wr.wr_id = (unsigned long)ctxt;
> > +	read_wr.opcode = IB_WR_RDMA_READ;
> > +	ctxt->wr_op = read_wr.opcode;
> > +	read_wr.send_flags = IB_SEND_SIGNALED;
> > +	read_wr.wr.rdma.rkey = rs_handle;
> > +	read_wr.wr.rdma.remote_addr = rs_offset;
> > +	read_wr.sg_list = ctxt->sge;
> > +	read_wr.num_sge = pages_needed;
> > +
> > +	ret = svc_rdma_send(xprt, &read_wr);
> > +	if (ret) {
> > +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +		goto err;
> > +	}
> > +
> > +	/* return current location in page array */
> > +	*page_no = pg_no;
> > +	*page_offset = pg_off;
> > +	ret = read;
> > +	atomic_inc(&rdma_stat_read);
> > +	return ret;
> > + err:
> > +	svc_rdma_unmap_dma(ctxt);
> > +	svc_rdma_put_context(ctxt, 0);
> > +	return ret;
> >  }
> >
> > -/* Map a read-chunk-list to an XDR and fast register the page-list.
> > - *
> > - * Assumptions:
> > - * - chunk[0]	position points to pages[0] at an offset of 0
> > - * - pages[]	will be made physically contiguous by creating a one-off
> > memory
> > - *		region using the fastreg verb.
> > - * - byte_count is # of bytes in read-chunk-list
> > - * - ch_count	is # of chunks in read-chunk-list
> > - *
> > - * Output:
> > - * - sge array pointing into pages[] array.
> > - * - chunk_sge array specifying sge index and count for each
> > - *   chunk in the read list
> > - */
> > -static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
> > +/* Issue an RDMA_READ using an FRMR to map the data sink */ static int
> > +rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
> >  				struct svc_rqst *rqstp,
> >  				struct svc_rdma_op_ctxt *head,
> > -				struct rpcrdma_msg *rmsgp,
> > -				struct svc_rdma_req_map *rpl_map,
> > -				struct svc_rdma_req_map *chl_map,
> > -				int ch_count,
> > -				int byte_count)
> > +				int *page_no,
> > +				u32 *page_offset,
> > +				u32 rs_handle,
> > +				u32 rs_length,
> > +				u64 rs_offset,
> > +				int last)
> >  {
> > -	int page_no;
> > -	int ch_no;
> > -	u32 offset;
> > -	struct rpcrdma_read_chunk *ch;
> > -	struct svc_rdma_fastreg_mr *frmr;
> > -	int ret = 0;
> > +	struct ib_send_wr read_wr;
> > +	struct ib_send_wr inv_wr;
> > +	struct ib_send_wr fastreg_wr;
> > +	u8 key;
> > +	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >>
> > PAGE_SHIFT;
> > +	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
> > +	struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
> > +	int ret, read, pno;
> > +	u32 pg_off = *page_offset;
> > +	u32 pg_no = *page_no;
> >
> > -	frmr = svc_rdma_get_frmr(xprt);
> >  	if (IS_ERR(frmr))
> >  		return -ENOMEM;
> >
> > -	head->frmr = frmr;
> > -	head->arg.head[0] = rqstp->rq_arg.head[0];
> > -	head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > -	head->arg.pages = &head->pages[head->count];
> > -	head->hdr_count = head->count; /* save count of hdr pages */
> > -	head->arg.page_base = 0;
> > -	head->arg.page_len = byte_count;
> > -	head->arg.len = rqstp->rq_arg.len + byte_count;
> > -	head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
> > +	ctxt->direction = DMA_FROM_DEVICE;
> > +	ctxt->frmr = frmr;
> > +	pages_needed = min_t(int, pages_needed, xprt-
> > >sc_frmr_pg_list_len);
> > +	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
> >
> > -	/* Fast register the page list */
> > -	frmr->kva = page_address(rqstp->rq_arg.pages[0]);
> > +	frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
> >  	frmr->direction = DMA_FROM_DEVICE;
> >  	frmr->access_flags =
> > (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
> > -	frmr->map_len = byte_count;
> > -	frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
> > -	for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
> > -		frmr->page_list->page_list[page_no] =
> > +	frmr->map_len = pages_needed << PAGE_SHIFT;
> > +	frmr->page_list_len = pages_needed;
> > +
> > +	for (pno = 0; pno < pages_needed; pno++) {
> > +		int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
> > +
> > +		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
> > +		head->arg.page_len += len;
> > +		head->arg.len += len;
> > +		if (!pg_off)
> > +			head->count++;
> > +		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
> > +		rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +		frmr->page_list->page_list[pno] =
> >  			ib_dma_map_page(xprt->sc_cm_id->device,
> > -					rqstp->rq_arg.pages[page_no], 0,
> > +					head->arg.pages[pg_no], 0,
> >  					PAGE_SIZE, DMA_FROM_DEVICE);
> > -		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -					 frmr->page_list-
> > >page_list[page_no]))
> > -			goto fatal_err;
> > +		ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +					   frmr->page_list->page_list[pno]);
> > +		if (ret)
> > +			goto err;
> >  		atomic_inc(&xprt->sc_dma_used);
> > -		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
> > -	}
> > -	head->count += page_no;
> > -
> > -	/* rq_respages points one past arg pages */
> > -	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
> > -	rqstp->rq_next_page = rqstp->rq_respages + 1;
> >
> > -	/* Create the reply and chunk maps */
> > -	offset = 0;
> > -	ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -	for (ch_no = 0; ch_no < ch_count; ch_no++) {
> > -		int len = ntohl(ch->rc_target.rs_length);
> > -		rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
> > -		rpl_map->sge[ch_no].iov_len = len;
> > -		chl_map->ch[ch_no].count = 1;
> > -		chl_map->ch[ch_no].start = ch_no;
> > -		offset += len;
> > -		ch++;
> > +		/* adjust offset and wrap to next page if needed */
> > +		pg_off += len;
> > +		if (pg_off == PAGE_SIZE) {
> > +			pg_off = 0;
> > +			pg_no++;
> > +		}
> > +		rs_length -= len;
> >  	}
> >
> > -	ret = svc_rdma_fastreg(xprt, frmr);
> > -	if (ret)
> > -		goto fatal_err;
> > -
> > -	return ch_no;
> > -
> > - fatal_err:
> > -	printk("svcrdma: error fast registering xdr for xprt %p", xprt);
> > -	svc_rdma_put_frmr(xprt, frmr);
> > -	return -EIO;
> > -}
> > -
> > -static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
> > -			     struct svc_rdma_op_ctxt *ctxt,
> > -			     struct svc_rdma_fastreg_mr *frmr,
> > -			     struct kvec *vec,
> > -			     u64 *sgl_offset,
> > -			     int count)
> > -{
> > -	int i;
> > -	unsigned long off;
> > +	if (last && rs_length == 0)
> > +		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > +	else
> > +		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> >
> > -	ctxt->count = count;
> > -	ctxt->direction = DMA_FROM_DEVICE;
> > -	for (i = 0; i < count; i++) {
> > -		ctxt->sge[i].length = 0; /* in case map fails */
> > -		if (!frmr) {
> > -			BUG_ON(!virt_to_page(vec[i].iov_base));
> > -			off = (unsigned long)vec[i].iov_base &
> > ~PAGE_MASK;
> > -			ctxt->sge[i].addr =
> > -				ib_dma_map_page(xprt->sc_cm_id->device,
> > -
> > 	virt_to_page(vec[i].iov_base),
> > -						off,
> > -						vec[i].iov_len,
> > -						DMA_FROM_DEVICE);
> > -			if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -						 ctxt->sge[i].addr))
> > -				return -EINVAL;
> > -			ctxt->sge[i].lkey = xprt->sc_dma_lkey;
> > -			atomic_inc(&xprt->sc_dma_used);
> > -		} else {
> > -			ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
> > -			ctxt->sge[i].lkey = frmr->mr->lkey;
> > -		}
> > -		ctxt->sge[i].length = vec[i].iov_len;
> > -		*sgl_offset = *sgl_offset + vec[i].iov_len;
> > +	/* Bump the key */
> > +	key = (u8)(frmr->mr->lkey & 0x000000FF);
> > +	ib_update_fast_reg_key(frmr->mr, ++key);
> > +
> > +	ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
> > +	ctxt->sge[0].lkey = frmr->mr->lkey;
> > +	ctxt->sge[0].length = read;
> > +	ctxt->count = 1;
> > +	ctxt->read_hdr = head;
> > +
> > +	/* Prepare FASTREG WR */
> > +	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
> > +	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
> > +	fastreg_wr.send_flags = IB_SEND_SIGNALED;
> > +	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
> > +	fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
> > +	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
> > +	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
> > +	fastreg_wr.wr.fast_reg.length = frmr->map_len;
> > +	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
> > +	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
> > +	fastreg_wr.next = &read_wr;
> > +
> > +	/* Prepare RDMA_READ */
> > +	memset(&read_wr, 0, sizeof(read_wr));
> > +	read_wr.send_flags = IB_SEND_SIGNALED;
> > +	read_wr.wr.rdma.rkey = rs_handle;
> > +	read_wr.wr.rdma.remote_addr = rs_offset;
> > +	read_wr.sg_list = ctxt->sge;
> > +	read_wr.num_sge = 1;
> > +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
> > +		read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
> > +		read_wr.wr_id = (unsigned long)ctxt;
> > +		read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
> > +	} else {
> > +		read_wr.opcode = IB_WR_RDMA_READ;
> > +		read_wr.next = &inv_wr;
> > +		/* Prepare invalidate */
> > +		memset(&inv_wr, 0, sizeof(inv_wr));
> > +		inv_wr.wr_id = (unsigned long)ctxt;
> > +		inv_wr.opcode = IB_WR_LOCAL_INV;
> > +		inv_wr.send_flags = IB_SEND_SIGNALED;
> 
> Change this to inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
> 
> > +		inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
> > +	}
> > +	ctxt->wr_op = read_wr.opcode;
> > +
> > +	/* Post the chain */
> > +	ret = svc_rdma_send(xprt, &fastreg_wr);
> > +	if (ret) {
> > +		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
> > +		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +		goto err;
> >  	}
> > -	return 0;
> > -}
> >
> > -static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) -{
> > -	if ((rdma_node_get_transport(xprt->sc_cm_id->device-
> > >node_type) ==
> > -	     RDMA_TRANSPORT_IWARP) &&
> > -	    sge_count > 1)
> > -		return 1;
> > -	else
> > -		return min_t(int, sge_count, xprt->sc_max_sge);
> > +	/* return current location in page array */
> > +	*page_no = pg_no;
> > +	*page_offset = pg_off;
> > +	ret = read;
> > +	atomic_inc(&rdma_stat_read);
> > +	return ret;
> > + err:
> > +	svc_rdma_unmap_dma(ctxt);
> > +	svc_rdma_put_context(ctxt, 0);
> > +	svc_rdma_put_frmr(xprt, frmr);
> > +	return ret;
> >  }
> >
> > -/*
> > - * Use RDMA_READ to read data from the advertised client buffer into the
> > - * XDR stream starting at rq_arg.head[0].iov_base.
> > - * Each chunk in the array
> > - * contains the following fields:
> > - * discrim      - '1', This isn't used for data placement
> > - * position     - The xdr stream offset (the same for every chunk)
> > - * handle       - RMR for client memory region
> > - * length       - data transfer length
> > - * offset       - 64 bit tagged offset in remote memory region
> > - *
> > - * On our side, we need to read into a pagelist. The first page immediately
> > - * follows the RPC header.
> > - *
> > - * This function returns:
> > - * 0 - No error and no read-list found.
> > - *
> > - * 1 - Successful read-list processing. The data is not yet in
> > - * the pagelist and therefore the RPC request must be deferred. The
> > - * I/O completion will enqueue the transport again and
> > - * svc_rdma_recvfrom will complete the request.
> > - *
> > - * <0 - Error processing/posting read-list.
> > - *
> > - * NOTE: The ctxt must not be touched after the last WR has been posted
> > - * because the I/O completion processing may occur on another
> > - * processor and free / modify the context. Ne touche pas!
> > - */
> > -static int rdma_read_xdr(struct svcxprt_rdma *xprt,
> > -			 struct rpcrdma_msg *rmsgp,
> > -			 struct svc_rqst *rqstp,
> > -			 struct svc_rdma_op_ctxt *hdr_ctxt)
> > +static int rdma_read_chunks(struct svcxprt_rdma *xprt,
> > +			    struct rpcrdma_msg *rmsgp,
> > +			    struct svc_rqst *rqstp,
> > +			    struct svc_rdma_op_ctxt *head)
> >  {
> > -	struct ib_send_wr read_wr;
> > -	struct ib_send_wr inv_wr;
> > -	int err = 0;
> > -	int ch_no;
> > -	int ch_count;
> > -	int byte_count;
> > -	int sge_count;
> > -	u64 sgl_offset;
> > +	int page_no, ch_count, ret;
> >  	struct rpcrdma_read_chunk *ch;
> > -	struct svc_rdma_op_ctxt *ctxt = NULL;
> > -	struct svc_rdma_req_map *rpl_map;
> > -	struct svc_rdma_req_map *chl_map;
> > +	u32 page_offset, byte_count;
> > +	u64 rs_offset;
> > +	rdma_reader_fn reader;
> >
> >  	/* If no read list is present, return 0 */
> >  	ch = svc_rdma_get_read_chunk(rmsgp);
> > @@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma
> > *xprt,
> >  	if (ch_count > RPCSVC_MAXPAGES)
> >  		return -EINVAL;
> >
> > -	/* Allocate temporary reply and chunk maps */
> > -	rpl_map = svc_rdma_get_req_map();
> > -	chl_map = svc_rdma_get_req_map();
> > +	/* The request is completed when the RDMA_READs complete. The
> > +	 * head context keeps all the pages that comprise the
> > +	 * request.
> > +	 */
> > +	head->arg.head[0] = rqstp->rq_arg.head[0];
> > +	head->arg.tail[0] = rqstp->rq_arg.tail[0];
> > +	head->arg.pages = &head->pages[head->count];
> > +	head->hdr_count = head->count;
> > +	head->arg.page_base = 0;
> > +	head->arg.page_len = 0;
> > +	head->arg.len = rqstp->rq_arg.len;
> > +	head->arg.buflen = rqstp->rq_arg.buflen;
> >
> > -	if (!xprt->sc_frmr_pg_list_len)
> > -		sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
> > -					    rpl_map, chl_map, ch_count,
> > -					    byte_count);
> > +	/* Use FRMR if supported */
> > +	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
> > +		reader = rdma_read_chunk_frmr;
> >  	else
> > -		sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt,
> > rmsgp,
> > -						 rpl_map, chl_map, ch_count,
> > -						 byte_count);
> > -	if (sge_count < 0) {
> > -		err = -EIO;
> > -		goto out;
> > -	}
> > -
> > -	sgl_offset = 0;
> > -	ch_no = 0;
> > +		reader = rdma_read_chunk_lcl;
> >
> > +	page_no = 0; page_offset = 0;
> >  	for (ch = (struct rpcrdma_read_chunk *)&rmsgp-
> > >rm_body.rm_chunks[0];
> > -	     ch->rc_discrim != 0; ch++, ch_no++) {
> > -		u64 rs_offset;
> > -next_sge:
> > -		ctxt = svc_rdma_get_context(xprt);
> > -		ctxt->direction = DMA_FROM_DEVICE;
> > -		ctxt->frmr = hdr_ctxt->frmr;
> > -		ctxt->read_hdr = NULL;
> > -		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > +	     ch->rc_discrim != 0; ch++) {
> >
> > -		/* Prepare READ WR */
> > -		memset(&read_wr, 0, sizeof read_wr);
> > -		read_wr.wr_id = (unsigned long)ctxt;
> > -		read_wr.opcode = IB_WR_RDMA_READ;
> > -		ctxt->wr_op = read_wr.opcode;
> > -		read_wr.send_flags = IB_SEND_SIGNALED;
> > -		read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
> >  		xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
> >  				 &rs_offset);
> > -		read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
> > -		read_wr.sg_list = ctxt->sge;
> > -		read_wr.num_sge =
> > -			rdma_read_max_sge(xprt, chl_map-
> > >ch[ch_no].count);
> > -		err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
> > -					&rpl_map->sge[chl_map-
> > >ch[ch_no].start],
> > -					&sgl_offset,
> > -					read_wr.num_sge);
> > -		if (err) {
> > -			svc_rdma_unmap_dma(ctxt);
> > -			svc_rdma_put_context(ctxt, 0);
> > -			goto out;
> > -		}
> > -		if (((ch+1)->rc_discrim == 0) &&
> > -		    (read_wr.num_sge == chl_map->ch[ch_no].count)) {
> > -			/*
> > -			 * Mark the last RDMA_READ with a bit to
> > -			 * indicate all RPC data has been fetched from
> > -			 * the client and the RPC needs to be enqueued.
> > -			 */
> > -			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
> > -			if (hdr_ctxt->frmr) {
> > -				set_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags);
> > -				/*
> > -				 * Invalidate the local MR used to map the
> > data
> > -				 * sink.
> > -				 */
> > -				if (xprt->sc_dev_caps &
> > -				    SVCRDMA_DEVCAP_READ_W_INV) {
> > -					read_wr.opcode =
> > -
> > 	IB_WR_RDMA_READ_WITH_INV;
> > -					ctxt->wr_op = read_wr.opcode;
> > -					read_wr.ex.invalidate_rkey =
> > -						ctxt->frmr->mr->lkey;
> > -				} else {
> > -					/* Prepare INVALIDATE WR */
> > -					memset(&inv_wr, 0, sizeof inv_wr);
> > -					inv_wr.opcode = IB_WR_LOCAL_INV;
> > -					inv_wr.send_flags =
> > IB_SEND_SIGNALED;
> > -					inv_wr.ex.invalidate_rkey =
> > -						hdr_ctxt->frmr->mr->lkey;
> > -					read_wr.next = &inv_wr;
> > -				}
> > -			}
> > -			ctxt->read_hdr = hdr_ctxt;
> > -		}
> > -		/* Post the read */
> > -		err = svc_rdma_send(xprt, &read_wr);
> > -		if (err) {
> > -			printk(KERN_ERR "svcrdma: Error %d posting
> > RDMA_READ\n",
> > -			       err);
> > -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > -			svc_rdma_unmap_dma(ctxt);
> > -			svc_rdma_put_context(ctxt, 0);
> > -			goto out;
> > +		byte_count = ntohl(ch->rc_target.rs_length);
> > +
> > +		while (byte_count > 0) {
> > +			ret = reader(xprt, rqstp, head,
> > +				     &page_no, &page_offset,
> > +				     ntohl(ch->rc_target.rs_handle),
> > +				     byte_count, rs_offset,
> > +				     ((ch+1)->rc_discrim == 0) /* last */
> > +				     );
> > +			if (ret < 0)
> > +				goto err;
> > +			byte_count -= ret;
> > +			rs_offset += ret;
> > +			head->arg.buflen += ret;
> >  		}
> > -		atomic_inc(&rdma_stat_read);
> > -
> > -		if (read_wr.num_sge < chl_map->ch[ch_no].count) {
> > -			chl_map->ch[ch_no].count -= read_wr.num_sge;
> > -			chl_map->ch[ch_no].start += read_wr.num_sge;
> > -			goto next_sge;
> > -		}
> > -		sgl_offset = 0;
> > -		err = 1;
> >  	}
> > -
> > - out:
> > -	svc_rdma_put_req_map(rpl_map);
> > -	svc_rdma_put_req_map(chl_map);
> > -
> > +	ret = 1;
> > + err:
> >  	/* Detach arg pages. svc_recv will replenish them */
> > -	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages;
> > ch_no++)
> > -		rqstp->rq_pages[ch_no] = NULL;
> > +	for (page_no = 0;
> > +	     &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
> > +		rqstp->rq_pages[page_no] = NULL;
> >
> > -	return err;
> > +	return ret;
> >  }
> >
> >  static int rdma_read_complete(struct svc_rqst *rqstp, @@ -595,13 +504,9
> > @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> >  				  struct svc_rdma_op_ctxt,
> >  				  dto_q);
> >  		list_del_init(&ctxt->dto_q);
> > -	}
> > -	if (ctxt) {
> >  		spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
> >  		return rdma_read_complete(rqstp, ctxt);
> > -	}
> > -
> > -	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> > +	} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
> >  		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
> >  				  struct svc_rdma_op_ctxt,
> >  				  dto_q);
> > @@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
> >  		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
> >  			goto close_out;
> >
> > -		BUG_ON(ret);
> >  		goto out;
> >  	}
> >  	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p,
> > status=%d\n", @@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct
> > svc_rqst *rqstp)
> >  	}
> >
> >  	/* Read read-list data. */
> > -	ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
> > +	ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
> >  	if (ret > 0) {
> >  		/* read-list posted, defer until data received from client. */
> >  		goto defer;
> > -	}
> > -	if (ret < 0) {
> > +	} else if (ret < 0) {
> >  		/* Post of read-list failed, free context. */
> >  		svc_rdma_put_context(ctxt, 1);
> >  		return 0;
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > index 7e024a5..49fd21a 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ -49,152
> > +50,6 @@
> >
> >  #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
> >
> > -/* Encode an XDR as an array of IB SGE
> > - *
> > - * Assumptions:
> > - * - head[0] is physically contiguous.
> > - * - tail[0] is physically contiguous.
> > - * - pages[] is not physically or virtually contiguous and consists of
> > - *   PAGE_SIZE elements.
> > - *
> > - * Output:
> > - * SGE[0]              reserved for RCPRDMA header
> > - * SGE[1]              data from xdr->head[]
> > - * SGE[2..sge_count-2] data from xdr->pages[]
> > - * SGE[sge_count-1]    data from xdr->tail.
> > - *
> > - * The max SGE we need is the length of the XDR / pagesize + one for
> > - * head + one for tail + one for RPCRDMA header. Since
> > RPCSVC_MAXPAGES
> > - * reserves a page for both the request and the reply header, and this
> > - * array is only concerned with the reply we are assured that we have
> > - * on extra page for the RPCRMDA header.
> > - */
> > -static int fast_reg_xdr(struct svcxprt_rdma *xprt,
> > -			struct xdr_buf *xdr,
> > -			struct svc_rdma_req_map *vec)
> > -{
> > -	int sge_no;
> > -	u32 sge_bytes;
> > -	u32 page_bytes;
> > -	u32 page_off;
> > -	int page_no = 0;
> > -	u8 *frva;
> > -	struct svc_rdma_fastreg_mr *frmr;
> > -
> > -	frmr = svc_rdma_get_frmr(xprt);
> > -	if (IS_ERR(frmr))
> > -		return -ENOMEM;
> > -	vec->frmr = frmr;
> > -
> > -	/* Skip the RPCRDMA header */
> > -	sge_no = 1;
> > -
> > -	/* Map the head. */
> > -	frva = (void *)((unsigned long)(xdr->head[0].iov_base) &
> > PAGE_MASK);
> > -	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
> > -	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
> > -	vec->count = 2;
> > -	sge_no++;
> > -
> > -	/* Map the XDR head */
> > -	frmr->kva = frva;
> > -	frmr->direction = DMA_TO_DEVICE;
> > -	frmr->access_flags = 0;
> > -	frmr->map_len = PAGE_SIZE;
> > -	frmr->page_list_len = 1;
> > -	page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
> > -	frmr->page_list->page_list[page_no] =
> > -		ib_dma_map_page(xprt->sc_cm_id->device,
> > -				virt_to_page(xdr->head[0].iov_base),
> > -				page_off,
> > -				PAGE_SIZE - page_off,
> > -				DMA_TO_DEVICE);
> > -	if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -				 frmr->page_list->page_list[page_no]))
> > -		goto fatal_err;
> > -	atomic_inc(&xprt->sc_dma_used);
> > -
> > -	/* Map the XDR page list */
> > -	page_off = xdr->page_base;
> > -	page_bytes = xdr->page_len + page_off;
> > -	if (!page_bytes)
> > -		goto encode_tail;
> > -
> > -	/* Map the pages */
> > -	vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
> > -	vec->sge[sge_no].iov_len = page_bytes;
> > -	sge_no++;
> > -	while (page_bytes) {
> > -		struct page *page;
> > -
> > -		page = xdr->pages[page_no++];
> > -		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE -
> > page_off));
> > -		page_bytes -= sge_bytes;
> > -
> > -		frmr->page_list->page_list[page_no] =
> > -			ib_dma_map_page(xprt->sc_cm_id->device,
> > -					page, page_off,
> > -					sge_bytes, DMA_TO_DEVICE);
> > -		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -					 frmr->page_list-
> > >page_list[page_no]))
> > -			goto fatal_err;
> > -
> > -		atomic_inc(&xprt->sc_dma_used);
> > -		page_off = 0; /* reset for next time through loop */
> > -		frmr->map_len += PAGE_SIZE;
> > -		frmr->page_list_len++;
> > -	}
> > -	vec->count++;
> > -
> > - encode_tail:
> > -	/* Map tail */
> > -	if (0 == xdr->tail[0].iov_len)
> > -		goto done;
> > -
> > -	vec->count++;
> > -	vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
> > -
> > -	if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
> > -	    ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
> > -		/*
> > -		 * If head and tail use the same page, we don't need
> > -		 * to map it again.
> > -		 */
> > -		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
> > -	} else {
> > -		void *va;
> > -
> > -		/* Map another page for the tail */
> > -		page_off = (unsigned long)xdr->tail[0].iov_base &
> > ~PAGE_MASK;
> > -		va = (void *)((unsigned long)xdr->tail[0].iov_base &
> > PAGE_MASK);
> > -		vec->sge[sge_no].iov_base = frva + frmr->map_len +
> > page_off;
> > -
> > -		frmr->page_list->page_list[page_no] =
> > -		    ib_dma_map_page(xprt->sc_cm_id->device,
> > virt_to_page(va),
> > -				    page_off,
> > -				    PAGE_SIZE,
> > -				    DMA_TO_DEVICE);
> > -		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -					 frmr->page_list-
> > >page_list[page_no]))
> > -			goto fatal_err;
> > -		atomic_inc(&xprt->sc_dma_used);
> > -		frmr->map_len += PAGE_SIZE;
> > -		frmr->page_list_len++;
> > -	}
> > -
> > - done:
> > -	if (svc_rdma_fastreg(xprt, frmr))
> > -		goto fatal_err;
> > -
> > -	return 0;
> > -
> > - fatal_err:
> > -	printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
> > -	vec->frmr = NULL;
> > -	svc_rdma_put_frmr(xprt, frmr);
> > -	return -EIO;
> > -}
> > -
> >  static int map_xdr(struct svcxprt_rdma *xprt,
> >  		   struct xdr_buf *xdr,
> >  		   struct svc_rdma_req_map *vec)
> > @@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
> >  	BUG_ON(xdr->len !=
> >  	       (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
> >
> > -	if (xprt->sc_frmr_pg_list_len)
> > -		return fast_reg_xdr(xprt, xdr, vec);
> > -
> >  	/* Skip the first sge, this is for the RPCRDMA header */
> >  	sge_no = 1;
> >
> > @@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct
> > svcxprt_rdma *xprt,  }
> >
> >  /* Assumptions:
> > - * - We are using FRMR
> > - *     - or -
> >   * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
> >   */
> >  static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, @@ -
> > 327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> >  		sge_bytes = min_t(size_t,
> >  			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
> >  		sge[sge_no].length = sge_bytes;
> > -		if (!vec->frmr) {
> > -			sge[sge_no].addr =
> > -				dma_map_xdr(xprt, &rqstp->rq_res,
> > xdr_off,
> > -					    sge_bytes, DMA_TO_DEVICE);
> > -			xdr_off += sge_bytes;
> > -			if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > -						 sge[sge_no].addr))
> > -				goto err;
> > -			atomic_inc(&xprt->sc_dma_used);
> > -			sge[sge_no].lkey = xprt->sc_dma_lkey;
> > -		} else {
> > -			sge[sge_no].addr = (unsigned long)
> > -				vec->sge[xdr_sge_no].iov_base + sge_off;
> > -			sge[sge_no].lkey = vec->frmr->mr->lkey;
> > -		}
> > +		sge[sge_no].addr =
> > +			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
> > +				    sge_bytes, DMA_TO_DEVICE);
> > +		xdr_off += sge_bytes;
> > +		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
> > +					 sge[sge_no].addr))
> > +			goto err;
> > +		atomic_inc(&xprt->sc_dma_used);
> > +		sge[sge_no].lkey = xprt->sc_dma_lkey;
> >  		ctxt->count++;
> > -		ctxt->frmr = vec->frmr;
> >  		sge_off = 0;
> >  		sge_no++;
> >  		xdr_sge_no++;
> > @@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct
> > svc_rqst *rqstp,
> >  	return 0;
> >   err:
> >  	svc_rdma_unmap_dma(ctxt);
> > -	svc_rdma_put_frmr(xprt, vec->frmr);
> >  	svc_rdma_put_context(ctxt, 0);
> >  	/* Fatal error, close transport */
> >  	return -EIO;
> > @@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma
> > *xprt,
> >  	res_ary = (struct rpcrdma_write_array *)
> >  		&rdma_resp->rm_body.rm_chunks[1];
> >
> > -	if (vec->frmr)
> > -		max_write = vec->frmr->map_len;
> > -	else
> > -		max_write = xprt->sc_max_sge * PAGE_SIZE;
> > +	max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> >  	/* Write chunks start at the pagelist */
> >  	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; @@ -
> > 472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
> >  	res_ary = (struct rpcrdma_write_array *)
> >  		&rdma_resp->rm_body.rm_chunks[2];
> >
> > -	if (vec->frmr)
> > -		max_write = vec->frmr->map_len;
> > -	else
> > -		max_write = xprt->sc_max_sge * PAGE_SIZE;
> > +	max_write = xprt->sc_max_sge * PAGE_SIZE;
> >
> >  	/* xdr offset starts at RPC message */
> >  	nchunks = ntohl(arg_ary->wc_nchunks);
> > @@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >  		      int byte_count)
> >  {
> >  	struct ib_send_wr send_wr;
> > -	struct ib_send_wr inv_wr;
> >  	int sge_no;
> >  	int sge_bytes;
> >  	int page_no;
> > @@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >  		       "svcrdma: could not post a receive buffer, err=%d."
> >  		       "Closing transport %p.\n", ret, rdma);
> >  		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> > -		svc_rdma_put_frmr(rdma, vec->frmr);
> >  		svc_rdma_put_context(ctxt, 0);
> >  		return -ENOTCONN;
> >  	}
> > @@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >  	/* Prepare the context */
> >  	ctxt->pages[0] = page;
> >  	ctxt->count = 1;
> > -	ctxt->frmr = vec->frmr;
> > -	if (vec->frmr)
> > -		set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> > -	else
> > -		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
> >
> >  	/* Prepare the SGE for the RPCRDMA Header */
> >  	ctxt->sge[0].lkey = rdma->sc_dma_lkey; @@ -590,21 +419,15 @@
> > static int send_reply(struct svcxprt_rdma *rdma,
> >  		int xdr_off = 0;
> >  		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len,
> > byte_count);
> >  		byte_count -= sge_bytes;
> > -		if (!vec->frmr) {
> > -			ctxt->sge[sge_no].addr =
> > -				dma_map_xdr(rdma, &rqstp->rq_res,
> > xdr_off,
> > -					    sge_bytes, DMA_TO_DEVICE);
> > -			xdr_off += sge_bytes;
> > -			if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > -						 ctxt->sge[sge_no].addr))
> > -				goto err;
> > -			atomic_inc(&rdma->sc_dma_used);
> > -			ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> > -		} else {
> > -			ctxt->sge[sge_no].addr = (unsigned long)
> > -				vec->sge[sge_no].iov_base;
> > -			ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
> > -		}
> > +		ctxt->sge[sge_no].addr =
> > +			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
> > +				    sge_bytes, DMA_TO_DEVICE);
> > +		xdr_off += sge_bytes;
> > +		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
> > +					 ctxt->sge[sge_no].addr))
> > +			goto err;
> > +		atomic_inc(&rdma->sc_dma_used);
> > +		ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
> >  		ctxt->sge[sge_no].length = sge_bytes;
> >  	}
> >  	BUG_ON(byte_count != 0);
> > @@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >  			ctxt->sge[page_no+1].length = 0;
> >  	}
> >  	rqstp->rq_next_page = rqstp->rq_respages + 1;
> > +
> >  	BUG_ON(sge_no > rdma->sc_max_sge);
> >  	memset(&send_wr, 0, sizeof send_wr);
> >  	ctxt->wr_op = IB_WR_SEND;
> > @@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >  	send_wr.num_sge = sge_no;
> >  	send_wr.opcode = IB_WR_SEND;
> >  	send_wr.send_flags =  IB_SEND_SIGNALED;
> > -	if (vec->frmr) {
> > -		/* Prepare INVALIDATE WR */
> > -		memset(&inv_wr, 0, sizeof inv_wr);
> > -		inv_wr.opcode = IB_WR_LOCAL_INV;
> > -		inv_wr.send_flags = IB_SEND_SIGNALED;
> > -		inv_wr.ex.invalidate_rkey =
> > -			vec->frmr->mr->lkey;
> > -		send_wr.next = &inv_wr;
> > -	}
> >
> >  	ret = svc_rdma_send(rdma, &send_wr);
> >  	if (ret)
> > @@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
> >
> >   err:
> >  	svc_rdma_unmap_dma(ctxt);
> > -	svc_rdma_put_frmr(rdma, vec->frmr);
> >  	svc_rdma_put_context(ctxt, 1);
> >  	return -EIO;
> >  }
> > diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > index 25688fa..2c5b201 100644
> > --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
> > @@ -1,4 +1,5 @@
> >  /*
> > + * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
> >   * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
> >   *
> >   * This software is available to you under a choice of one of two @@ -160,7
> > +161,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
> >  		schedule_timeout_uninterruptible(msecs_to_jiffies(500));
> >  	}
> >  	map->count = 0;
> > -	map->frmr = NULL;
> >  	return map;
> >  }
> >
> > @@ -336,22 +336,21 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> >
> >  	switch (ctxt->wr_op) {
> >  	case IB_WR_SEND:
> > -		if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
> > -			svc_rdma_put_frmr(xprt, ctxt->frmr);
> > +		BUG_ON(ctxt->frmr);
> >  		svc_rdma_put_context(ctxt, 1);
> >  		break;
> >
> >  	case IB_WR_RDMA_WRITE:
> > +		BUG_ON(ctxt->frmr);
> >  		svc_rdma_put_context(ctxt, 0);
> >  		break;
> >
> >  	case IB_WR_RDMA_READ:
> >  	case IB_WR_RDMA_READ_WITH_INV:
> > +		svc_rdma_put_frmr(xprt, ctxt->frmr);
> >  		if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
> >  			struct svc_rdma_op_ctxt *read_hdr = ctxt-
> > >read_hdr;
> >  			BUG_ON(!read_hdr);
> > -			if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt-
> > >flags))
> > -				svc_rdma_put_frmr(xprt, ctxt->frmr);
> >  			spin_lock_bh(&xprt->sc_rq_dto_lock);
> >  			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
> >  			list_add_tail(&read_hdr->dto_q,
> > @@ -363,6 +362,7 @@ static void process_context(struct svcxprt_rdma
> > *xprt,
> >  		break;
> >
> >  	default:
> > +		BUG_ON(1);
> >  		printk(KERN_ERR "svcrdma: unexpected completion type, "
> >  		       "opcode=%d\n",
> >  		       ctxt->wr_op);
> > @@ -378,29 +378,42 @@ static void process_context(struct svcxprt_rdma
> > *xprt,  static void sq_cq_reap(struct svcxprt_rdma *xprt)  {
> >  	struct svc_rdma_op_ctxt *ctxt = NULL;
> > -	struct ib_wc wc;
> > +	struct ib_wc wc_a[6];
> > +	struct ib_wc *wc;
> >  	struct ib_cq *cq = xprt->sc_sq_cq;
> >  	int ret;
> >
> > +	memset(wc_a, 0, sizeof(wc_a));
> > +
> >  	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
> >  		return;
> >
> >  	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
> >  	atomic_inc(&rdma_stat_sq_poll);
> > -	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
> > -		if (wc.status != IB_WC_SUCCESS)
> > -			/* Close the transport */
> > -			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
> > +	while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
> > +		int i;
> >
> > -		/* Decrement used SQ WR count */
> > -		atomic_dec(&xprt->sc_sq_count);
> > -		wake_up(&xprt->sc_send_wait);
> > +		for (i = 0; i < ret; i++) {
> > +			wc = &wc_a[i];
> > +			if (wc->status != IB_WC_SUCCESS) {
> > +				dprintk("svcrdma: sq wc err status %d\n",
> > +					wc->status);
> >
> > -		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
> > -		if (ctxt)
> > -			process_context(xprt, ctxt);
> > +				/* Close the transport */
> > +				set_bit(XPT_CLOSE, &xprt-
> > >sc_xprt.xpt_flags);
> > +			}
> >
> > -		svc_xprt_put(&xprt->sc_xprt);
> > +			/* Decrement used SQ WR count */
> > +			atomic_dec(&xprt->sc_sq_count);
> > +			wake_up(&xprt->sc_send_wait);
> > +
> > +			ctxt = (struct svc_rdma_op_ctxt *)
> > +				(unsigned long)wc->wr_id;
> > +			if (ctxt)
> > +				process_context(xprt, ctxt);
> > +
> > +			svc_xprt_put(&xprt->sc_xprt);
> > +		}
> >  	}
> >
> >  	if (ctxt)
> > @@ -993,7 +1006,11 @@ static struct svc_xprt *svc_rdma_accept(struct
> > svc_xprt *xprt)
> >  			need_dma_mr = 0;
> >  		break;
> >  	case RDMA_TRANSPORT_IB:
> > -		if (!(devattr.device_cap_flags &
> > IB_DEVICE_LOCAL_DMA_LKEY)) {
> > +		if (!(newxprt->sc_dev_caps &
> > SVCRDMA_DEVCAP_FAST_REG)) {
> > +			need_dma_mr = 1;
> > +			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> > +		} else if (!(devattr.device_cap_flags &
> > +			     IB_DEVICE_LOCAL_DMA_LKEY)) {
> >  			need_dma_mr = 1;
> >  			dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
> >  		} else
> > @@ -1190,14 +1207,7 @@ static int svc_rdma_has_wspace(struct svc_xprt
> > *xprt)
> >  		container_of(xprt, struct svcxprt_rdma, sc_xprt);
> >
> >  	/*
> > -	 * If there are fewer SQ WR available than required to send a
> > -	 * simple response, return false.
> > -	 */
> > -	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
> > -		return 0;
> > -
> > -	/*
> > -	 * ...or there are already waiters on the SQ,
> > +	 * If there are already waiters on the SQ,
> >  	 * return false.
> >  	 */
> >  	if (waitqueue_active(&rdma->sc_send_wait))
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the
> > body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at
> > http://vger.kernel.org/majordomo-info.html

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux