[PATCH 1/2] SUNRPC: only put task on cl_tasks list after the RPC call slot is reserved.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Under heavy write load, we've seen the cl_tasks list grows to
millions of entries. Even though the list is extremely long,
the system still runs fine until the user wants to get the
information of all active RPC tasks by doing:

When this happens, tasks_start acquires the cl_lock to walk the
cl_tasks list, returning one entry at a time to the caller. The
cl_lock is held until all tasks on this list have been processed.

While the cl_lock is held, completed RPC tasks have to spin wait
in rpc_task_release_client for the cl_lock. If there are millions
of entries in the cl_tasks list it will take a long time before
tasks_stop is called and the cl_lock is released.

The spin wait tasks can use up all the available CPUs in the system,
preventing other jobs to run, this causes the system to temporarily
lock up.

This patch fixes this problem by delaying inserting the RPC
task on the cl_tasks list until the RPC call slot is reserved.
This limits the length of the cl_tasks to the number of call
slots available in the system.

Signed-off-by: Dai Ngo <dai.ngo@xxxxxxxxxx>
---
 include/linux/sunrpc/clnt.h |  1 +
 net/sunrpc/clnt.c           | 18 +++++++++++++-----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 5321585c778f..fec976e58174 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -93,6 +93,7 @@ struct rpc_clnt {
 	const struct cred	*cl_cred;
 	unsigned int		cl_max_connect; /* max number of transports not to the same IP */
 	struct super_block *pipefs_sb;
+	atomic_t		cl_task_count;
 };
 
 /*
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 0090162ee8c3..cc5014b58e3b 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -958,12 +958,17 @@ void rpc_shutdown_client(struct rpc_clnt *clnt)
 
 	trace_rpc_clnt_shutdown(clnt);
 
+	clnt->cl_shutdown = 1;
 	while (!list_empty(&clnt->cl_tasks)) {
 		rpc_killall_tasks(clnt);
 		wait_event_timeout(destroy_wait,
 			list_empty(&clnt->cl_tasks), 1*HZ);
 	}
 
+	/* wait for tasks still in workqueue or waitqueue */
+	wait_event_timeout(destroy_wait,
+			   atomic_read(&clnt->cl_task_count) == 0, 1 * HZ);
+
 	rpc_release_client(clnt);
 }
 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
@@ -1139,6 +1144,7 @@ void rpc_task_release_client(struct rpc_task *task)
 		list_del(&task->tk_task);
 		spin_unlock(&clnt->cl_lock);
 		task->tk_client = NULL;
+		atomic_dec(&clnt->cl_task_count);
 
 		rpc_release_client(clnt);
 	}
@@ -1189,10 +1195,7 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 		task->tk_flags |= RPC_TASK_TIMEOUT;
 	if (clnt->cl_noretranstimeo)
 		task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
-	/* Add to the client's list of all tasks */
-	spin_lock(&clnt->cl_lock);
-	list_add_tail(&task->tk_task, &clnt->cl_tasks);
-	spin_unlock(&clnt->cl_lock);
+	atomic_inc(&clnt->cl_task_count);
 }
 
 static void
@@ -1787,9 +1790,14 @@ call_reserveresult(struct rpc_task *task)
 	if (status >= 0) {
 		if (task->tk_rqstp) {
 			task->tk_action = call_refresh;
+
+			/* Add to the client's list of all tasks */
+			spin_lock(&task->tk_client->cl_lock);
+			if (list_empty(&task->tk_task))
+				list_add_tail(&task->tk_task, &task->tk_client->cl_tasks);
+			spin_unlock(&task->tk_client->cl_lock);
 			return;
 		}
-
 		rpc_call_rpcerror(task, -EIO);
 		return;
 	}
-- 
2.43.5





[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux