On Tue, 2011-02-15 at 11:30 -0500, Jeff Layton wrote: > On Tue, 15 Feb 2011 10:31:38 -0500 > Trond Myklebust <Trond.Myklebust@xxxxxxxxxx> wrote: > > > On Tue, 2011-02-15 at 09:58 -0500, Jeff Layton wrote: > > > I recently had some of our QA people report some connectathon test > > > failures in RHEL5 (2.6.18-based kernel). For some odd reason (maybe > > > scheduling differences that make the race more likely?) the problem > > > occurs more frequently on s390. > > > > > > The problem generally manifests itself on NFSv4 as a race where an rmdir > > > fails because a silly-renamed file in the directory wasn't deleted in > > > time. Looking at traces, what you usually see is the failing rmdir > > > attempt that fails with the sillydelete of the file that prevented it > > > very soon afterward. > > > > > > Silly deletes are handled via dentry_iput and in the case of a close on > > > NFSv4, the last dentry reference is often held by the CLOSE RPC task. > > > nfs4_do_close does the close as an async RPC task that it conditionally > > > waits on depending on whether the close is synchronous or not. > > > > > > It also sets the workqueue for the task to nfsiod_workqueue. When > > > tk_workqueue is set, the rpc_release operation is queued to that > > > workqueue. rpc_release is where the dentry reference held by the task is > > > put. The caller has no way to wait for that to complete, so the close(2) > > > syscall can easily return before the rpc_release call is ever done. In > > > some cases, that rpc_release is delayed for a long enough to prevent a > > > subsequent rmdir of the containing directory. > > > > > > I believe this is a bug, or at least not ideal behavior. We should try > > > not to have the close(2) call return in this situation until the > > > sillydelete is done. > > > > > > I've been able to reproduce this more reliably by adding a 100ms sleep > > > at the top of nfs4_free_closedata. I've not seen it "in the wild" on > > > mainline kernels, but it seems quite possible when a machine is heavily > > > loaded. <snip> > > There is no guarantee that rpc_wait_for_completion_task() will wait > > until rpciod had called rpc_put_task(), in which case the above change > > ends up with a dput() on the rpciod workqueue, and potential deadlocks. > > > > Let's rather look at making rpc_put_task a little bit more safe, by > > ensuring that rpciod always calls queue_work(), and by allowing other > > callers to switch it off. > > > > Ok, I see the potential deadlock. To make sure I understand correctly, > is this what you'd like to see? > > 1) If tk_workqueue is set, then queue rpc_release to that. > > 2) If it's not, queue it to rpciod. > > 3) add a flag to rpc_put_task (or a variant of that function) that > calls rpc_release synchronously, and have nfs4_do_close use that > variant when it puts the task. > > ...does that sound correct? If so, then I'm not sure that it'll 100% > close the race. It's still possible that the rpc_put_task call in > rpc_release_task would be the last one. If so and we queue it to the > workqueue, then we're back in the same situation. Agreed. The problem is lack of atomicity between rpc_complete_task() and rpc_put_task(). How about something like the following? Cheers Trond 8<----------------------------------------------------------------------------------------- >From 72d08b911033dae1b0bf6b14c5d5d26a0ed3abdd Mon Sep 17 00:00:00 2001 From: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx> Date: Tue, 15 Feb 2011 18:41:15 -0500 Subject: [PATCH] SUNRPC: Close a race in __rpc_wait_for_completion_task() Although they run as rpciod background tasks, under normal operation (i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck() and nfs4_do_close() want to be fully synchronous. This means that when we exit, we want all references to the rpc_task to be gone, and we want any dentry references etc. held by that task to be released. For this reason these functions call __rpc_wait_for_completion_task(), followed by rpc_put_task() in the expectation that the latter will be releasing the last reference to the rpc_task, and thus ensuring that the callback_ops->rpc_release() has been called synchronously. This patch fixes a race which exists due to the fact that rpciod calls rpc_complete_task() (in order to wake up the callers of __rpc_wait_for_completion_task()) and then subsequently calls rpc_put_task() without ensuring that these two steps are done atomically. In order to avoid adding new spin locks, the patch uses the existing waitqueue spin lock to order the rpc_task reference count releases between the waiting process and rpciod. The common case where nobody is waiting for completion is optimised for by checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task reference count is 1: in those cases we drop trying to grab the spin lock, and immediately free up the rpc_task. Those few processes that need to put the rpc_task from inside an asynchronous context and that do not care about ordering are given a new helper: rpc_put_task_async(). Signed-off-by: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx> --- fs/nfs/nfs4proc.c | 4 +- fs/nfs/unlink.c | 2 +- include/linux/sunrpc/sched.h | 1 + net/sunrpc/sched.c | 70 +++++++++++++++++++++++++++++++++--------- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 78936a8..40381d2 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -4110,7 +4110,7 @@ static void nfs4_lock_release(void *calldata) task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp, data->arg.lock_seqid); if (!IS_ERR(task)) - rpc_put_task(task); + rpc_put_task_async(task); dprintk("%s: cancelling lock!\n", __func__); } else nfs_free_seqid(data->arg.lock_seqid); @@ -5187,7 +5187,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr if (IS_ERR(task)) ret = PTR_ERR(task); else - rpc_put_task(task); + rpc_put_task_async(task); dprintk("<-- %s status=%d\n", __func__, ret); return ret; } diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c index e313a51..6481d53 100644 --- a/fs/nfs/unlink.c +++ b/fs/nfs/unlink.c @@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n task_setup_data.rpc_client = NFS_CLIENT(dir); task = rpc_run_task(&task_setup_data); if (!IS_ERR(task)) - rpc_put_task(task); + rpc_put_task_async(task); return 1; } diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index 88513fd..d81db80 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *); struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, const struct rpc_call_ops *ops); void rpc_put_task(struct rpc_task *); +void rpc_put_task_async(struct rpc_task *); void rpc_exit_task(struct rpc_task *); void rpc_exit(struct rpc_task *, int); void rpc_release_calldata(const struct rpc_call_ops *, void *); diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 243fc09..11b71b1 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -252,23 +252,39 @@ static void rpc_set_active(struct rpc_task *task) /* * Mark an RPC call as having completed by clearing the 'active' bit + * and then waking up all tasks that were sleeping. */ -static void rpc_mark_complete_task(struct rpc_task *task) +static int rpc_complete_task(struct rpc_task *task) { - smp_mb__before_clear_bit(); + void *m = &task->tk_runstate; + wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); + struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); + unsigned long flags; + int ret; + + spin_lock_irqsave(&wq->lock, flags); clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); - smp_mb__after_clear_bit(); - wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); + if (waitqueue_active(wq)) + __wake_up_locked_key(wq, TASK_NORMAL, &k); + ret = atomic_dec_and_test(&task->tk_count); + spin_unlock_irqrestore(&wq->lock, flags); + return ret; } /* * Allow callers to wait for completion of an RPC call + * + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() + * to enforce taking of the wq->lock and hence avoid races with + * rpc_complete_task(). */ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) { + BUG_ON(!RPC_IS_ASYNC(task)); + if (action == NULL) action = rpc_wait_bit_killable; - return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, action, TASK_KILLABLE); } EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); @@ -857,34 +873,58 @@ static void rpc_async_release(struct work_struct *work) rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); } -void rpc_put_task(struct rpc_task *task) +static void rpc_release_resources_task(struct rpc_task *task) { - if (!atomic_dec_and_test(&task->tk_count)) - return; - /* Release resources */ if (task->tk_rqstp) xprt_release(task); if (task->tk_msg.rpc_cred) put_rpccred(task->tk_msg.rpc_cred); rpc_task_release_client(task); - if (task->tk_workqueue != NULL) { +} + +static void rpc_final_put_task(struct rpc_task *task, + struct workqueue_struct *q) +{ + if (q != NULL) { INIT_WORK(&task->u.tk_work, rpc_async_release); - queue_work(task->tk_workqueue, &task->u.tk_work); + queue_work(q, &task->u.tk_work); } else rpc_free_task(task); } + +static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q) +{ + if (atomic_dec_and_test(&task->tk_count)) { + rpc_release_resources_task(task); + rpc_final_put_task(task, q); + } +} + +void rpc_put_task(struct rpc_task *task) +{ + rpc_do_put_task(task, NULL); +} EXPORT_SYMBOL_GPL(rpc_put_task); +void rpc_put_task_async(struct rpc_task *task) +{ + rpc_do_put_task(task, task->tk_workqueue); +} +EXPORT_SYMBOL_GPL(rpc_put_task_async); + static void rpc_release_task(struct rpc_task *task) { dprintk("RPC: %5u release task\n", task->tk_pid); BUG_ON (RPC_IS_QUEUED(task)); - /* Wake up anyone who is waiting for task completion */ - rpc_mark_complete_task(task); - - rpc_put_task(task); + if (RPC_IS_ASYNC(task)) { + rpc_release_resources_task(task); + /* Wake up anyone who may be waiting for task completion */ + if (atomic_read(&task->tk_count) == 1 || rpc_complete_task(task)) + rpc_final_put_task(task, task->tk_workqueue); + } else + rpc_put_task(task); } int rpciod_up(void) -- 1.7.4 -- Trond Myklebust Linux NFS client maintainer NetApp Trond.Myklebust@xxxxxxxxxx www.netapp.com -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html