[PATCH v1 1/2] svcrdma: Reduce Receive doorbell rate

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This is similar to commit e340c2d6ef2a ("xprtrdma: Reduce the
doorbell rate (Receive)") which added Receive batching to the
client.

Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
---
 include/linux/sunrpc/svc_rdma.h         |    1 
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c |   82 ++++++++++++++++---------------
 2 files changed, 44 insertions(+), 39 deletions(-)

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 1e76ed688044..7c693b31965e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -104,6 +104,7 @@ struct svcxprt_rdma {
 
 	wait_queue_head_t    sc_send_wait;	/* SQ exhaustion waitlist */
 	unsigned long	     sc_flags;
+	u32		     sc_pending_recvs;
 	struct list_head     sc_read_complete_q;
 	struct work_struct   sc_work;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 7d14a74df716..ab0b7e9777bc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -266,33 +266,46 @@ void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 		svc_rdma_recv_ctxt_put(rdma, ctxt);
 }
 
-static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma,
-				struct svc_rdma_recv_ctxt *ctxt)
+static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
+				   unsigned int wanted, bool temp)
 {
+	const struct ib_recv_wr *bad_wr = NULL;
+	struct svc_rdma_recv_ctxt *ctxt;
+	struct ib_recv_wr *recv_chain;
 	int ret;
 
-	trace_svcrdma_post_recv(ctxt);
-	ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL);
+	recv_chain = NULL;
+	while (wanted--) {
+		ctxt = svc_rdma_recv_ctxt_get(rdma);
+		if (!ctxt)
+			break;
+
+		trace_svcrdma_post_recv(ctxt);
+		ctxt->rc_temp = temp;
+		ctxt->rc_recv_wr.next = recv_chain;
+		recv_chain = &ctxt->rc_recv_wr;
+		rdma->sc_pending_recvs++;
+	}
+	if (!recv_chain)
+		return false;
+
+	ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
 	if (ret)
 		goto err_post;
-	return 0;
+	return true;
 
 err_post:
-	trace_svcrdma_rq_post_err(rdma, ret);
-	svc_rdma_recv_ctxt_put(rdma, ctxt);
-	return ret;
-}
-
-static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
-{
-	struct svc_rdma_recv_ctxt *ctxt;
+	while (bad_wr) {
+		ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
+				    rc_recv_wr);
+		bad_wr = bad_wr->next;
+		svc_rdma_recv_ctxt_put(rdma, ctxt);
+	}
 
-	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
-		return 0;
-	ctxt = svc_rdma_recv_ctxt_get(rdma);
-	if (!ctxt)
-		return -ENOMEM;
-	return __svc_rdma_post_recv(rdma, ctxt);
+	trace_svcrdma_rq_post_err(rdma, ret);
+	/* Since we're destroying the xprt, no need to reset
+	 * sc_pending_recvs. */
+	return false;
 }
 
 /**
@@ -303,20 +316,7 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
  */
 bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
 {
-	struct svc_rdma_recv_ctxt *ctxt;
-	unsigned int i;
-	int ret;
-
-	for (i = 0; i < rdma->sc_max_requests; i++) {
-		ctxt = svc_rdma_recv_ctxt_get(rdma);
-		if (!ctxt)
-			return false;
-		ctxt->rc_temp = true;
-		ret = __svc_rdma_post_recv(rdma, ctxt);
-		if (ret)
-			return false;
-	}
-	return true;
+	return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
 }
 
 /**
@@ -324,8 +324,6 @@ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
  * @cq: Completion Queue context
  * @wc: Work Completion object
  *
- * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
- * the Receive completion handler could be running.
  */
 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 {
@@ -333,6 +331,8 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 	struct ib_cqe *cqe = wc->wr_cqe;
 	struct svc_rdma_recv_ctxt *ctxt;
 
+	rdma->sc_pending_recvs--;
+
 	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
 	ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
 
@@ -340,9 +340,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 	if (wc->status != IB_WC_SUCCESS)
 		goto flushed;
 
-	if (svc_rdma_post_recv(rdma))
-		goto post_err;
-
 	/* All wc fields are now known to be valid */
 	ctxt->rc_byte_len = wc->byte_len;
 	ib_dma_sync_single_for_cpu(rdma->sc_pd->device,
@@ -356,11 +353,18 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 	spin_unlock(&rdma->sc_rq_dto_lock);
 	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
 		svc_xprt_enqueue(&rdma->sc_xprt);
+
+	if (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) &&
+	    rdma->sc_pending_recvs < rdma->sc_max_requests)
+		if (!svc_rdma_refresh_recvs(rdma, RPCRDMA_MAX_RECV_BATCH,
+					    false))
+			goto post_err;
+
 	return;
 
 flushed:
-post_err:
 	svc_rdma_recv_ctxt_put(rdma, ctxt);
+post_err:
 	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 	svc_xprt_enqueue(&rdma->sc_xprt);
 }





[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux