Re: [PATCH v1 05/18] xprtrdma: Replace send and receive arrays

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



> On Sep 20, 2015, at 3:52 AM, Sagi Grimberg <sagig@xxxxxxxxxxxxxxxxxx> wrote:
> 
> On 9/17/2015 11:44 PM, Chuck Lever wrote:
>> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
>> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
>> structs. Replace those arrays with free lists.
>> 
>> To allow more than 512 RPCs in-flight at once, each of these arrays
>> would be larger than a page (assuming 8-byte addresses and 4KB
>> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
>> buffer array would have to be 128 pages. That's an order-6
>> allocation. (Not that we're going there.)
>> 
>> A list is easier to expand dynamically. Instead of allocating a
>> larger array of pointers and copying the existing pointers to the
>> new array, simply append more buffers to each list.
>> 
>> This also makes it simpler to manage receive buffers that might
>> catch backwards-direction calls, or to post receive buffers in
>> bulk to amortize the overhead of ib_post_recv.
>> 
>> Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
> 
> Hi Chuck,
> 
> I get the idea of this patch, but it is a bit confusing (to a
> non-educated reader).

OK, let’s see if there’s room for additional improvement.


> Can you explain why sometimes you call put/get_locked routines
> and sometimes you open-code them?

Are you talking about the later patch that adds support for
receiving backwards calls? That probably should use the
existing helpers, shouldn’t it.


> And is it mandatory to have
> the callers lock before calling get/put? Perhaps the code would
> be simpler if the get/put routines would take care of locking
> since rb_lock looks dedicated to them.

Not sure I understand this comment, I thought that the helpers
were already doing the locking.


> 
>> ---
>>  net/sunrpc/xprtrdma/verbs.c     |  141 +++++++++++++++++----------------------
>>  net/sunrpc/xprtrdma/xprt_rdma.h |    9 +-
>>  2 files changed, 66 insertions(+), 84 deletions(-)
>> 
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index ac1345b..8d99214 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>  {
>>  	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
>>  	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
>> -	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
>> -	char *p;
>> -	size_t len;
>>  	int i, rc;
>> 
>> -	buf->rb_max_requests = cdata->max_requests;
>> +	buf->rb_max_requests = r_xprt->rx_data.max_requests;
>>  	spin_lock_init(&buf->rb_lock);
>> 
>> -	/* Need to allocate:
>> -	 *   1.  arrays for send and recv pointers
>> -	 *   2.  arrays of struct rpcrdma_req to fill in pointers
>> -	 *   3.  array of struct rpcrdma_rep for replies
>> -	 * Send/recv buffers in req/rep need to be registered
>> -	 */
>> -	len = buf->rb_max_requests *
>> -		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
>> -
>> -	p = kzalloc(len, GFP_KERNEL);
>> -	if (p == NULL) {
>> -		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
>> -			__func__, len);
>> -		rc = -ENOMEM;
>> -		goto out;
>> -	}
>> -	buf->rb_pool = p;	/* for freeing it later */
>> -
>> -	buf->rb_send_bufs = (struct rpcrdma_req **) p;
>> -	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
>> -	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
>> -	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
>> -
>>  	rc = ia->ri_ops->ro_init(r_xprt);
>>  	if (rc)
>>  		goto out;
>> 
>> +	INIT_LIST_HEAD(&buf->rb_send_bufs);
>>  	for (i = 0; i < buf->rb_max_requests; i++) {
>>  		struct rpcrdma_req *req;
>> -		struct rpcrdma_rep *rep;
>> 
>>  		req = rpcrdma_create_req(r_xprt);
>>  		if (IS_ERR(req)) {
>> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>  			rc = PTR_ERR(req);
>>  			goto out;
>>  		}
>> -		buf->rb_send_bufs[i] = req;
>> +		list_add(&req->rl_free, &buf->rb_send_bufs);
>> +	}
>> +
>> +	INIT_LIST_HEAD(&buf->rb_recv_bufs);
>> +	for (i = 0; i < buf->rb_max_requests + 2; i++) {
>> +		struct rpcrdma_rep *rep;
>> 
>>  		rep = rpcrdma_create_rep(r_xprt);
>>  		if (IS_ERR(rep)) {
>> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>  			rc = PTR_ERR(rep);
>>  			goto out;
>>  		}
>> -		buf->rb_recv_bufs[i] = rep;
>> +		list_add(&rep->rr_list, &buf->rb_recv_bufs);
>>  	}
>> 
>>  	return 0;
>> @@ -1051,25 +1030,26 @@ void
>>  rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>>  {
>>  	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
>> -	int i;
>> 
>> -	/* clean up in reverse order from create
>> -	 *   1.  recv mr memory (mr free, then kfree)
>> -	 *   2.  send mr memory (mr free, then kfree)
>> -	 *   3.  MWs
>> -	 */
>> -	dprintk("RPC:       %s: entering\n", __func__);
>> +	while (!list_empty(&buf->rb_recv_bufs)) {
>> +		struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
>> +						     struct rpcrdma_rep,
>> +						     rr_list);
>> 
>> -	for (i = 0; i < buf->rb_max_requests; i++) {
>> -		if (buf->rb_recv_bufs)
>> -			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
>> -		if (buf->rb_send_bufs)
>> -			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
>> +		list_del(&rep->rr_list);
>> +		rpcrdma_destroy_rep(ia, rep);
>>  	}
>> 
>> -	ia->ri_ops->ro_destroy(buf);
>> +	while (!list_empty(&buf->rb_send_bufs)) {
>> +		struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
>> +						     struct rpcrdma_req,
>> +						     rl_free);
>> 
>> -	kfree(buf->rb_pool);
>> +		list_del(&req->rl_free);
>> +		rpcrdma_destroy_req(ia, req);
>> +	}
>> +
>> +	ia->ri_ops->ro_destroy(buf);
>>  }
>> 
>>  struct rpcrdma_mw *
>> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
>>  }
>> 
>>  static void
>> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
>> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf)
>>  {
>> -	buf->rb_send_bufs[--buf->rb_send_index] = req;
>> -	req->rl_niovs = 0;
>> -	if (req->rl_reply) {
>> -		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
>> -		req->rl_reply = NULL;
>> -	}
>> +	list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
>> +}
>> +
>> +static struct rpcrdma_rep *
>> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
>> +{
>> +	struct rpcrdma_rep *rep;
>> +
>> +	rep = list_first_entry(&buf->rb_recv_bufs,
>> +			       struct rpcrdma_rep, rr_list);
>> +	list_del(&rep->rr_list);
>> +
>> +	return rep;
>>  }
> 
> There seems to be a distinction between send/recv buffers. Would it
> make sense to have a symmetric handling for both send/recv buffers?

Or maybe the same helpers could handle both. I’ll have a look
when I get back from SNIA SDC.


>>  /*
>>   * Get a set of request/reply buffers.
>>   *
>> - * Reply buffer (if needed) is attached to send buffer upon return.
>> - * Rule:
>> - *    rb_send_index and rb_recv_index MUST always be pointing to the
>> - *    *next* available buffer (non-NULL). They are incremented after
>> - *    removing buffers, and decremented *before* returning them.
>> + * Reply buffer (if available) is attached to send buffer upon return.
>>   */
>>  struct rpcrdma_req *
>>  rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>> 
>>  	spin_lock_irqsave(&buffers->rb_lock, flags);
>> 
>> -	if (buffers->rb_send_index == buffers->rb_max_requests) {
>> +	if (list_empty(&buffers->rb_send_bufs)) {
>>  		spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> -		dprintk("RPC:       %s: out of request buffers\n", __func__);
>> -		return ((struct rpcrdma_req *)NULL);
>> -	}
>> -
>> -	req = buffers->rb_send_bufs[buffers->rb_send_index];
>> -	if (buffers->rb_send_index < buffers->rb_recv_index) {
>> -		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
>> -			__func__,
>> -			buffers->rb_recv_index - buffers->rb_send_index);
>> -		req->rl_reply = NULL;
>> -	} else {
>> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> +		pr_warn("RPC:       %s: out of request buffers\n", __func__);
>> +		return NULL;
>>  	}
>> -	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
>> +	req = list_first_entry(&buffers->rb_send_bufs,
>> +			       struct rpcrdma_req, rl_free);
>> +	list_del(&req->rl_free);
>> 
>> +	req->rl_reply = NULL;
>> +	if (!list_empty(&buffers->rb_recv_bufs))
>> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);
> 
> Would it make sense to check !list_empty() inside _get_locked and handle
> a possible NULL return?
> 
>>  	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> +
>> +	if (!req->rl_reply)
>> +		pr_warn("RPC:       %s: out of reply buffers\n", __func__);
>>  	return req;
>>  }
>> 
>> @@ -1159,17 +1139,22 @@ void
>>  rpcrdma_buffer_put(struct rpcrdma_req *req)
>>  {
>>  	struct rpcrdma_buffer *buffers = req->rl_buffer;
>> +	struct rpcrdma_rep *rep = req->rl_reply;
>>  	unsigned long flags;
>> 
>> +	req->rl_niovs = 0;
>> +	req->rl_reply = NULL;
>> +
>>  	spin_lock_irqsave(&buffers->rb_lock, flags);
>> -	rpcrdma_buffer_put_sendbuf(req, buffers);
>> +	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
>> +	if (rep)
>> +		rpcrdma_buffer_put_locked(rep, buffers);
>>  	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>>  /*
>>   * Recover reply buffers from pool.
>> - * This happens when recovering from error conditions.
>> - * Post-increment counter/array index.
>> + * This happens when recovering from disconnect.
>>   */
>>  void
>>  rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>>  	unsigned long flags;
>> 
>>  	spin_lock_irqsave(&buffers->rb_lock, flags);
>> -	if (buffers->rb_recv_index < buffers->rb_max_requests) {
>> -		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> -		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> -	}
>> +	if (!list_empty(&buffers->rb_recv_bufs))
>> +		req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>>  	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
>>  	unsigned long flags;
>> 
>>  	spin_lock_irqsave(&buffers->rb_lock, flags);
>> -	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
>> +	rpcrdma_buffer_put_locked(rep, buffers);
>>  	spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index a13508b..e6a358f 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
>>  #define RPCRDMA_MAX_IOVS	(2)
>> 
>>  struct rpcrdma_req {
>> +	struct list_head	rl_free;
>>  	unsigned int		rl_niovs;
>>  	unsigned int		rl_nchunks;
>>  	unsigned int		rl_connect_cookie;
>> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
>>  	struct list_head	rb_all;
>>  	char			*rb_pool;
>> 
>> -	spinlock_t		rb_lock;	/* protect buf arrays */
>> +	spinlock_t		rb_lock;	/* protect buf lists */
>> +	struct list_head	rb_send_bufs;
>> +	struct list_head	rb_recv_bufs;
>>  	u32			rb_max_requests;
>> -	int			rb_send_index;
>> -	int			rb_recv_index;
>> -	struct rpcrdma_req	**rb_send_bufs;
>> -	struct rpcrdma_rep	**rb_recv_bufs;
>>  };
>>  #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>> 
>> 
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to majordomo@xxxxxxxxxxxxxxx
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

—
Chuck Lever



--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux