Re: [PATCH] nfs: don't queue synchronous NFSv4 close rpc_release to nfsiod

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, 2011-02-16 at 09:09 -0500, Trond Myklebust wrote: 
> On Tue, 2011-02-15 at 18:47 -0500, Trond Myklebust wrote: 
> > diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
> > index 243fc09..11b71b1 100644
> > --- a/net/sunrpc/sched.c
> > +++ b/net/sunrpc/sched.c
> > @@ -252,23 +252,39 @@ static void rpc_set_active(struct rpc_task *task)
> >  
> >  /*
> >   * Mark an RPC call as having completed by clearing the 'active' bit
> > + * and then waking up all tasks that were sleeping.
> >   */
> > -static void rpc_mark_complete_task(struct rpc_task *task)
> > +static int rpc_complete_task(struct rpc_task *task)
> >  {
> > -	smp_mb__before_clear_bit();
> > +	void *m = &task->tk_runstate;
> > +	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
> > +	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
> > +	unsigned long flags;
> > +	int ret;
> > +
> > +	spin_lock_irqsave(&wq->lock, flags);
> >  	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
> > -	smp_mb__after_clear_bit();
> > -	wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
> > +	if (waitqueue_active(wq))
> > +		__wake_up_locked_key(wq, TASK_NORMAL, &k);
> > +	ret = atomic_dec_and_test(&task->tk_count);
> > +	spin_unlock_irqrestore(&wq->lock, flags);
> > +	return ret;
> >  }
> >  
> >  /*
> >   * Allow callers to wait for completion of an RPC call
> > + *
> > + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
> > + * to enforce taking of the wq->lock and hence avoid races with
> > + * rpc_complete_task().
> >   */
> >  int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
> >  {
> > +	BUG_ON(!RPC_IS_ASYNC(task));
> > +
> >  	if (action == NULL)
> >  		action = rpc_wait_bit_killable;
> > -	return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
> > +	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
> >  			action, TASK_KILLABLE);
> >  }
> >  EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
> 
> Never mind. The above ordering scheme is broken by the fact that
> 'wake_bit_function' calls autoremove_wake_function, which again means
> that finish_wait optimises away the spin lock.
> 
> Back to the drawing board...

...and after yet another cup of coffee the solution suggests itself: we
just need to reorder the calls to __wake_up_locked_key and the
atomic_dec_and_test in rpc_complete_task. The revised patch is appended.

Cheers
  Trond 

8<---------------------------------------------------------------------------- 
>From f7a251b19f1c92d47e7f2231be61fa31abad4660 Mon Sep 17 00:00:00 2001
From: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx>
Date: Wed, 16 Feb 2011 09:19:21 -0500
Subject: [PATCH v2] SUNRPC: Close a race in __rpc_wait_for_completion_task()

Although they run as rpciod background tasks, under normal operation
(i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck()
and nfs4_do_close() want to be fully synchronous. This means that when we
exit, we want all references to the rpc_task to be gone, and we want
any dentry references etc. held by that task to be released.

For this reason these functions call __rpc_wait_for_completion_task(),
followed by rpc_put_task() in the expectation that the latter will be
releasing the last reference to the rpc_task, and thus ensuring that the
callback_ops->rpc_release() has been called synchronously.

This patch fixes a race which exists due to the fact that
rpciod calls rpc_complete_task() (in order to wake up the callers of
__rpc_wait_for_completion_task()) and then subsequently calls
rpc_put_task() without ensuring that these two steps are done atomically.

In order to avoid adding new spin locks, the patch uses the existing
waitqueue spin lock to order the rpc_task reference count releases between
the waiting process and rpciod.
The common case where nobody is waiting for completion is optimised for by
checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task
reference count is 1: in those cases we drop trying to grab the spin lock,
and immediately free up the rpc_task.

Those few processes that need to put the rpc_task from inside an
asynchronous context and that do not care about ordering are given a new
helper: rpc_put_task_async().

Signed-off-by: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx>
---
 fs/nfs/nfs4proc.c            |    4 +-
 fs/nfs/unlink.c              |    2 +-
 include/linux/sunrpc/sched.h |    1 +
 net/sunrpc/sched.c           |   70 +++++++++++++++++++++++++++++++++---------
 4 files changed, 59 insertions(+), 18 deletions(-)

diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 78936a8..40381d2 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -4110,7 +4110,7 @@ static void nfs4_lock_release(void *calldata)
 		task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp,
 				data->arg.lock_seqid);
 		if (!IS_ERR(task))
-			rpc_put_task(task);
+			rpc_put_task_async(task);
 		dprintk("%s: cancelling lock!\n", __func__);
 	} else
 		nfs_free_seqid(data->arg.lock_seqid);
@@ -5187,7 +5187,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr
 	if (IS_ERR(task))
 		ret = PTR_ERR(task);
 	else
-		rpc_put_task(task);
+		rpc_put_task_async(task);
 	dprintk("<-- %s status=%d\n", __func__, ret);
 	return ret;
 }
diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c
index e313a51..6481d53 100644
--- a/fs/nfs/unlink.c
+++ b/fs/nfs/unlink.c
@@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n
 	task_setup_data.rpc_client = NFS_CLIENT(dir);
 	task = rpc_run_task(&task_setup_data);
 	if (!IS_ERR(task))
-		rpc_put_task(task);
+		rpc_put_task_async(task);
 	return 1;
 }
 
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 88513fd..d81db80 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
 				const struct rpc_call_ops *ops);
 void		rpc_put_task(struct rpc_task *);
+void		rpc_put_task_async(struct rpc_task *);
 void		rpc_exit_task(struct rpc_task *);
 void		rpc_exit(struct rpc_task *, int);
 void		rpc_release_calldata(const struct rpc_call_ops *, void *);
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 243fc09..0b86fb0 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -252,23 +252,39 @@ static void rpc_set_active(struct rpc_task *task)
 
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
  */
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
 {
-	smp_mb__before_clear_bit();
+	void *m = &task->tk_runstate;
+	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&wq->lock, flags);
 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
-	smp_mb__after_clear_bit();
-	wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+	ret = atomic_dec_and_test(&task->tk_count);
+	if (waitqueue_active(wq))
+		__wake_up_locked_key(wq, TASK_NORMAL, &k);
+	spin_unlock_irqrestore(&wq->lock, flags);
+	return ret;
 }
 
 /*
  * Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
  */
 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
+	BUG_ON(!RPC_IS_ASYNC(task));
+
 	if (action == NULL)
 		action = rpc_wait_bit_killable;
-	return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
 			action, TASK_KILLABLE);
 }
 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -857,34 +873,58 @@ static void rpc_async_release(struct work_struct *work)
 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 }
 
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
 {
-	if (!atomic_dec_and_test(&task->tk_count))
-		return;
-	/* Release resources */
 	if (task->tk_rqstp)
 		xprt_release(task);
 	if (task->tk_msg.rpc_cred)
 		put_rpccred(task->tk_msg.rpc_cred);
 	rpc_task_release_client(task);
-	if (task->tk_workqueue != NULL) {
+}
+
+static void rpc_final_put_task(struct rpc_task *task,
+		struct workqueue_struct *q)
+{
+	if (q != NULL) {
 		INIT_WORK(&task->u.tk_work, rpc_async_release);
-		queue_work(task->tk_workqueue, &task->u.tk_work);
+		queue_work(q, &task->u.tk_work);
 	} else
 		rpc_free_task(task);
 }
+
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
+{
+	if (atomic_dec_and_test(&task->tk_count)) {
+		rpc_release_resources_task(task);
+		rpc_final_put_task(task, q);
+	}
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
+	rpc_do_put_task(task, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_put_task);
 
+void rpc_put_task_async(struct rpc_task *task)
+{
+	rpc_do_put_task(task, task->tk_workqueue);
+}
+EXPORT_SYMBOL_GPL(rpc_put_task_async);
+
 static void rpc_release_task(struct rpc_task *task)
 {
 	dprintk("RPC: %5u release task\n", task->tk_pid);
 
 	BUG_ON (RPC_IS_QUEUED(task));
 
-	/* Wake up anyone who is waiting for task completion */
-	rpc_mark_complete_task(task);
-
-	rpc_put_task(task);
+	if (RPC_IS_ASYNC(task)) {
+		rpc_release_resources_task(task);
+		/* Wake up anyone who may be waiting for task completion */
+		if (atomic_read(&task->tk_count) == 1 || rpc_complete_task(task))
+			rpc_final_put_task(task, task->tk_workqueue);
+	} else
+		rpc_put_task(task);
 }
 
 int rpciod_up(void)
-- 
1.7.4



-- 
Trond Myklebust
Linux NFS client maintainer

NetApp
Trond.Myklebust@xxxxxxxxxx
www.netapp.com

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux