[RFC V2 1/1] SUNRPC: dynamic rpc_slot allocator

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Andy Adamson <andros@xxxxxxxxxx>

Add a dynamic rpc slot allocator at the beginning of the RPC call_transmit
state.  The allocator is triggered by a new rpc_xprt bit XPRT_ADD_SLOT.

The dynamic slot allocator uses GFP_NOWAIT and will return without
allocating a slot if GFP_NOWAIT allocation fails. This is OK because the
XPRT_ADD_SLOT bit will be set the next time UDP calls xprt_alloc, or the
next time the TCP write_space callback is called.

XXX Is this really true? XXX
Change the rpc_xprt allocation, including the intial slot table entries, to
GFP_ATOMIC as it can be called in the file system writeout path.

In the UDP and BC_TCP case, this means we no longer allocate slots we don't
need. In the TCP case, this means we follow the TCP window size up to
RPC_MAX_SLOT_TABLE.

Current transport slot allocation:

RDMA: No change, allocate xprt_rdma_slot_table_entries slots in
      xprt_setup_rdma.

UDP:  Changed: Start with RPC_MIN_SLOT_TABLE slots and dynamically add
      up to xprt_udp_slot_table_entries slots.
      Allocate RPC_MIN_SLOT_TABLE in xs_setup_udp. Add a set_add_slot
      operation so that if xprt_alloc can not find a slot on the free list,
      and max_reqs < xprt_udp_slot_table_entries, XPRT_ADD_SLOT is set and
      a slot is dynamically allocated.

TCP:  Changed: Start with xprt_tcp_slot_table_entries slots and dynamically
      add up to the TCP window size or (upper bound) RPC_MAX_SLOT_TABLE slots.
      Allocate xprt_tcp_slot_table_entries in xs_setup_tcp. Set XPRT_ADD_SLOT
      in xs_tcp_write_space so that we hookup TCP congestion feedback into
      the dynamic rpc_slot allocation so that the RPC layer can fully utilize
      the negotiated TCP window.

BC_TCP: Changed: The back channel will not use more than two slots.
      Allocate RPC_MIN_SLOT_TABLE slots instead of xprt_tcp_slot_table_entries
      slots in xs_setup_bc_tcp.

Signed-off-by: Andy Adamson <andros@xxxxxxxxx>
---
 include/linux/sunrpc/xprt.h |    8 ++++
 net/sunrpc/clnt.c           |    4 ++
 net/sunrpc/xprt.c           |   90 +++++++++++++++++++++++++++++++++++++------
 net/sunrpc/xprtsock.c       |   34 +++++++++++++++-
 4 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index a0f998c..6c3c78e 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -115,6 +115,7 @@ struct rpc_xprt_ops {
 	void		(*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
 	void		(*rpcbind)(struct rpc_task *task);
 	void		(*set_port)(struct rpc_xprt *xprt, unsigned short port);
+	void		(*set_add_slot)(struct rpc_xprt *xprt);
 	void		(*connect)(struct rpc_task *task);
 	void *		(*buf_alloc)(struct rpc_task *task, size_t size);
 	void		(*buf_free)(void *buffer);
@@ -307,6 +308,7 @@ void			xprt_release_rqst_cong(struct rpc_task *task);
 void			xprt_disconnect_done(struct rpc_xprt *xprt);
 void			xprt_force_disconnect(struct rpc_xprt *xprt);
 void			xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
+void			xprt_nowait_add_slot(struct rpc_xprt *xprt);
 
 /*
  * Reserved bit positions in xprt->state
@@ -321,6 +323,7 @@ void			xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
 #define XPRT_CONNECTION_ABORT	(7)
 #define XPRT_CONNECTION_CLOSE	(8)
 #define XPRT_INITIALIZED	(9)
+#define XPRT_ADD_SLOT		(10)
 
 static inline void xprt_set_connected(struct rpc_xprt *xprt)
 {
@@ -391,6 +394,11 @@ static inline int xprt_test_and_set_binding(struct rpc_xprt *xprt)
 	return test_and_set_bit(XPRT_BINDING, &xprt->state);
 }
 
+static inline void xprt_set_add_slot(struct rpc_xprt *xprt)
+{
+	set_bit(XPRT_ADD_SLOT, &xprt->state);
+}
+
 #endif /* __KERNEL__*/
 
 #endif /* _LINUX_SUNRPC_XPRT_H */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 8d83f9d..82e69fe 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1276,6 +1276,10 @@ call_transmit(struct rpc_task *task)
 	task->tk_action = call_status;
 	if (task->tk_status < 0)
 		return;
+
+	/* Add a slot if XPRT_ADD_SLOT is set */
+	xprt_nowait_add_slot(task->tk_xprt);
+
 	task->tk_status = xprt_prepare_transmit(task);
 	if (task->tk_status != 0)
 		return;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ce5eb68..6d11d95 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -942,6 +942,8 @@ static void xprt_alloc_slot(struct rpc_task *task)
 		xprt_request_init(task, xprt);
 		return;
 	}
+	if (xprt->ops->set_add_slot)
+		xprt->ops->set_add_slot(xprt);
 	dprintk("RPC:       waiting for request slot\n");
 	task->tk_status = -EAGAIN;
 	task->tk_timeout = 0;
@@ -958,24 +960,94 @@ static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 	spin_unlock(&xprt->reserve_lock);
 }
 
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
+
+/*
+ * Initial slot table allocation called only at rpc_xprt allocation.
+ * No need to take the xprt->reserve_lock.
+ * GFP_ATOMIC used because an RPC can be triggered in the writeout path.
+ *
+ */
+static int xprt_alloc_slot_table_entries(struct rpc_xprt *xprt, int num_req)
+{
+	struct rpc_rqst *req;
+	int i;
+
+	for (i = 0; i < num_req; i++) {
+		req = kzalloc(sizeof(*req), GFP_ATOMIC);
+		if (!req)
+			return -ENOMEM;
+		list_add(&req->rq_list, &xprt->free);
+	}
+	dprintk("<-- %s mempool_alloc %d reqs\n", __func__,
+		xprt->max_reqs);
+	return 0;
+}
+
+static void
+xprt_free_slot_table_entries(struct rpc_xprt *xprt)
+{
+	struct rpc_rqst *req;
+	int cnt = 0;
+
+	while (!list_empty(&xprt->free)) {
+		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+		list_del(&req->rq_list);
+		kfree(req);
+		cnt++;
+	}
+	dprintk("<-- %s freed %d reqs\n", __func__, cnt);
+}
+
+/*
+ * Dynamic rpc_slot allocator. GFP_NOWAIT will not cause rpciod to sleep.
+ * Return NULL if allocation can't be serviced immediately.
+ * Triggered by write_space callback.
+ */
+void
+xprt_nowait_add_slot(struct rpc_xprt *xprt)
+{
+	struct rpc_rqst *req;
+
+	if (!test_and_clear_bit(XPRT_ADD_SLOT, &xprt->state))
+		return;
+
+	if (xprt->max_reqs == RPC_MAX_SLOT_TABLE) {
+		dprintk("RPC    %s: %p has RPC_MAX_SLOT_TABLE %d slots\n",
+			__func__, xprt, RPC_MAX_SLOT_TABLE);
+		return;
+	}
+	req = kzalloc(sizeof(*req), GFP_NOWAIT);
+	if (!req)
+		return;
+	spin_lock(&xprt->reserve_lock);
+	list_add(&req->rq_list, &xprt->free);
+	xprt->max_reqs += 1;
+	spin_unlock(&xprt->reserve_lock);
+
+	dprintk("RPC    %s added rpc_slot to transport %p max_req %d\n",
+		__func__, xprt, xprt->max_reqs);
+}
+
+struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_req)
 {
 	struct rpc_xprt *xprt;
 
-	xprt = kzalloc(size, GFP_KERNEL);
+	xprt = kzalloc(size, GFP_ATOMIC);
 	if (xprt == NULL)
 		goto out;
 	atomic_set(&xprt->count, 1);
 
-	xprt->max_reqs = max_req;
-	xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
-	if (xprt->slot == NULL)
+	xprt->max_reqs = num_req;
+	/* allocate slots and place them on the free list */
+	INIT_LIST_HEAD(&xprt->free);
+	if (xprt_alloc_slot_table_entries(xprt, num_req) != 0)
 		goto out_free;
 
 	xprt->xprt_net = get_net(net);
 	return xprt;
 
 out_free:
+	xprt_free_slot_table_entries(xprt);
 	kfree(xprt);
 out:
 	return NULL;
@@ -985,7 +1057,7 @@ EXPORT_SYMBOL_GPL(xprt_alloc);
 void xprt_free(struct rpc_xprt *xprt)
 {
 	put_net(xprt->xprt_net);
-	kfree(xprt->slot);
+	xprt_free_slot_table_entries(xprt);
 	kfree(xprt);
 }
 EXPORT_SYMBOL_GPL(xprt_free);
@@ -1081,7 +1153,6 @@ void xprt_release(struct rpc_task *task)
 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
 {
 	struct rpc_xprt	*xprt;
-	struct rpc_rqst	*req;
 	struct xprt_class *t;
 
 	spin_lock(&xprt_list_lock);
@@ -1109,7 +1180,6 @@ found:
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
 
-	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv);
 #if defined(CONFIG_NFS_V4_1)
 	spin_lock_init(&xprt->bc_pa_lock);
@@ -1132,10 +1202,6 @@ found:
 	rpc_init_wait_queue(&xprt->resend, "xprt_resend");
 	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
 
-	/* initialize free list */
-	for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
-		list_add(&req->rq_list, &xprt->free);
-
 	xprt_init_xid(xprt);
 
 	dprintk("RPC:       created transport %p with %u slots\n", xprt,
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index bf005d3..310c73a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1405,6 +1405,29 @@ static void xs_write_space(struct sock *sk)
 	xprt_write_space(xprt);
 }
 
+/*
+ * called from xs_tcp_write_space to dynamically add a slot
+ * in response to the TCP window size.
+ */
+static void xs_tcp_add_slot(struct sock *sk)
+{
+	struct rpc_xprt *xprt;
+
+	if (unlikely(!(xprt = xprt_from_sock(sk))))
+                return;
+
+	dprintk("%s xprt %p\n", __func__, xprt);
+	xprt_set_add_slot(xprt);
+}
+
+/* set_add_slot xprt operation for UDP */
+static void xs_udp_set_add_slot(struct rpc_xprt *xprt)
+{
+	dprintk("--> %s xprt %p\n", __func__, xprt);
+	if (xprt->max_reqs < xprt_udp_slot_table_entries)
+		xprt_set_add_slot(xprt);
+}
+
 /**
  * xs_udp_write_space - callback invoked when socket buffer space
  *                             becomes available
@@ -1417,6 +1440,7 @@ static void xs_write_space(struct sock *sk)
  */
 static void xs_udp_write_space(struct sock *sk)
 {
+	dprintk("--> %s\n", __func__);
 	read_lock_bh(&sk->sk_callback_lock);
 
 	/* from net/core/sock.c:sock_def_write_space */
@@ -1438,11 +1462,14 @@ static void xs_udp_write_space(struct sock *sk)
  */
 static void xs_tcp_write_space(struct sock *sk)
 {
+	dprintk("--> %s\n", __func__);
 	read_lock_bh(&sk->sk_callback_lock);
 
 	/* from net/core/stream.c:sk_stream_write_space */
-	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
+	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
+		xs_tcp_add_slot(sk);
 		xs_write_space(sk);
+	}
 
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -2095,6 +2122,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
 	.release_xprt		= xprt_release_xprt_cong,
 	.rpcbind		= rpcb_getport_async,
 	.set_port		= xs_set_port,
+	.set_add_slot		= xs_udp_set_add_slot,
 	.connect		= xs_connect,
 	.buf_alloc		= rpc_malloc,
 	.buf_free		= rpc_free,
@@ -2216,7 +2244,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 	struct sock_xprt *transport;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
+	xprt = xs_setup_xprt(args, RPC_MIN_SLOT_TABLE);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2371,7 +2399,7 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
 		 */
 		 return args->bc_xprt->xpt_bc_xprt;
 	}
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, RPC_MIN_SLOT_TABLE);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
-- 
1.7.3.1

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux