On Wed, 2011-02-16 at 09:09 -0500, Trond Myklebust wrote: > On Tue, 2011-02-15 at 18:47 -0500, Trond Myklebust wrote: > > diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c > > index 243fc09..11b71b1 100644 > > --- a/net/sunrpc/sched.c > > +++ b/net/sunrpc/sched.c > > @@ -252,23 +252,39 @@ static void rpc_set_active(struct rpc_task *task) > > > > /* > > * Mark an RPC call as having completed by clearing the 'active' bit > > + * and then waking up all tasks that were sleeping. > > */ > > -static void rpc_mark_complete_task(struct rpc_task *task) > > +static int rpc_complete_task(struct rpc_task *task) > > { > > - smp_mb__before_clear_bit(); > > + void *m = &task->tk_runstate; > > + wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); > > + struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); > > + unsigned long flags; > > + int ret; > > + > > + spin_lock_irqsave(&wq->lock, flags); > > clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); > > - smp_mb__after_clear_bit(); > > - wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); > > + if (waitqueue_active(wq)) > > + __wake_up_locked_key(wq, TASK_NORMAL, &k); > > + ret = atomic_dec_and_test(&task->tk_count); > > + spin_unlock_irqrestore(&wq->lock, flags); > > + return ret; > > } > > > > /* > > * Allow callers to wait for completion of an RPC call > > + * > > + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() > > + * to enforce taking of the wq->lock and hence avoid races with > > + * rpc_complete_task(). > > */ > > int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) > > { > > + BUG_ON(!RPC_IS_ASYNC(task)); > > + > > if (action == NULL) > > action = rpc_wait_bit_killable; > > - return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, > > + return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, > > action, TASK_KILLABLE); > > } > > EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); > > Never mind. The above ordering scheme is broken by the fact that > 'wake_bit_function' calls autoremove_wake_function, which again means > that finish_wait optimises away the spin lock. > > Back to the drawing board... ...and after yet another cup of coffee the solution suggests itself: we just need to reorder the calls to __wake_up_locked_key and the atomic_dec_and_test in rpc_complete_task. The revised patch is appended. Cheers Trond 8<---------------------------------------------------------------------------- >From f7a251b19f1c92d47e7f2231be61fa31abad4660 Mon Sep 17 00:00:00 2001 From: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx> Date: Wed, 16 Feb 2011 09:19:21 -0500 Subject: [PATCH v2] SUNRPC: Close a race in __rpc_wait_for_completion_task() Although they run as rpciod background tasks, under normal operation (i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck() and nfs4_do_close() want to be fully synchronous. This means that when we exit, we want all references to the rpc_task to be gone, and we want any dentry references etc. held by that task to be released. For this reason these functions call __rpc_wait_for_completion_task(), followed by rpc_put_task() in the expectation that the latter will be releasing the last reference to the rpc_task, and thus ensuring that the callback_ops->rpc_release() has been called synchronously. This patch fixes a race which exists due to the fact that rpciod calls rpc_complete_task() (in order to wake up the callers of __rpc_wait_for_completion_task()) and then subsequently calls rpc_put_task() without ensuring that these two steps are done atomically. In order to avoid adding new spin locks, the patch uses the existing waitqueue spin lock to order the rpc_task reference count releases between the waiting process and rpciod. The common case where nobody is waiting for completion is optimised for by checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task reference count is 1: in those cases we drop trying to grab the spin lock, and immediately free up the rpc_task. Those few processes that need to put the rpc_task from inside an asynchronous context and that do not care about ordering are given a new helper: rpc_put_task_async(). Signed-off-by: Trond Myklebust <Trond.Myklebust@xxxxxxxxxx> --- fs/nfs/nfs4proc.c | 4 +- fs/nfs/unlink.c | 2 +- include/linux/sunrpc/sched.h | 1 + net/sunrpc/sched.c | 70 +++++++++++++++++++++++++++++++++--------- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 78936a8..40381d2 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -4110,7 +4110,7 @@ static void nfs4_lock_release(void *calldata) task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp, data->arg.lock_seqid); if (!IS_ERR(task)) - rpc_put_task(task); + rpc_put_task_async(task); dprintk("%s: cancelling lock!\n", __func__); } else nfs_free_seqid(data->arg.lock_seqid); @@ -5187,7 +5187,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr if (IS_ERR(task)) ret = PTR_ERR(task); else - rpc_put_task(task); + rpc_put_task_async(task); dprintk("<-- %s status=%d\n", __func__, ret); return ret; } diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c index e313a51..6481d53 100644 --- a/fs/nfs/unlink.c +++ b/fs/nfs/unlink.c @@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n task_setup_data.rpc_client = NFS_CLIENT(dir); task = rpc_run_task(&task_setup_data); if (!IS_ERR(task)) - rpc_put_task(task); + rpc_put_task_async(task); return 1; } diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index 88513fd..d81db80 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *); struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, const struct rpc_call_ops *ops); void rpc_put_task(struct rpc_task *); +void rpc_put_task_async(struct rpc_task *); void rpc_exit_task(struct rpc_task *); void rpc_exit(struct rpc_task *, int); void rpc_release_calldata(const struct rpc_call_ops *, void *); diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 243fc09..0b86fb0 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -252,23 +252,39 @@ static void rpc_set_active(struct rpc_task *task) /* * Mark an RPC call as having completed by clearing the 'active' bit + * and then waking up all tasks that were sleeping. */ -static void rpc_mark_complete_task(struct rpc_task *task) +static int rpc_complete_task(struct rpc_task *task) { - smp_mb__before_clear_bit(); + void *m = &task->tk_runstate; + wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); + struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); + unsigned long flags; + int ret; + + spin_lock_irqsave(&wq->lock, flags); clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); - smp_mb__after_clear_bit(); - wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); + ret = atomic_dec_and_test(&task->tk_count); + if (waitqueue_active(wq)) + __wake_up_locked_key(wq, TASK_NORMAL, &k); + spin_unlock_irqrestore(&wq->lock, flags); + return ret; } /* * Allow callers to wait for completion of an RPC call + * + * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() + * to enforce taking of the wq->lock and hence avoid races with + * rpc_complete_task(). */ int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) { + BUG_ON(!RPC_IS_ASYNC(task)); + if (action == NULL) action = rpc_wait_bit_killable; - return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, action, TASK_KILLABLE); } EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); @@ -857,34 +873,58 @@ static void rpc_async_release(struct work_struct *work) rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); } -void rpc_put_task(struct rpc_task *task) +static void rpc_release_resources_task(struct rpc_task *task) { - if (!atomic_dec_and_test(&task->tk_count)) - return; - /* Release resources */ if (task->tk_rqstp) xprt_release(task); if (task->tk_msg.rpc_cred) put_rpccred(task->tk_msg.rpc_cred); rpc_task_release_client(task); - if (task->tk_workqueue != NULL) { +} + +static void rpc_final_put_task(struct rpc_task *task, + struct workqueue_struct *q) +{ + if (q != NULL) { INIT_WORK(&task->u.tk_work, rpc_async_release); - queue_work(task->tk_workqueue, &task->u.tk_work); + queue_work(q, &task->u.tk_work); } else rpc_free_task(task); } + +static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q) +{ + if (atomic_dec_and_test(&task->tk_count)) { + rpc_release_resources_task(task); + rpc_final_put_task(task, q); + } +} + +void rpc_put_task(struct rpc_task *task) +{ + rpc_do_put_task(task, NULL); +} EXPORT_SYMBOL_GPL(rpc_put_task); +void rpc_put_task_async(struct rpc_task *task) +{ + rpc_do_put_task(task, task->tk_workqueue); +} +EXPORT_SYMBOL_GPL(rpc_put_task_async); + static void rpc_release_task(struct rpc_task *task) { dprintk("RPC: %5u release task\n", task->tk_pid); BUG_ON (RPC_IS_QUEUED(task)); - /* Wake up anyone who is waiting for task completion */ - rpc_mark_complete_task(task); - - rpc_put_task(task); + if (RPC_IS_ASYNC(task)) { + rpc_release_resources_task(task); + /* Wake up anyone who may be waiting for task completion */ + if (atomic_read(&task->tk_count) == 1 || rpc_complete_task(task)) + rpc_final_put_task(task, task->tk_workqueue); + } else + rpc_put_task(task); } int rpciod_up(void) -- 1.7.4 -- Trond Myklebust Linux NFS client maintainer NetApp Trond.Myklebust@xxxxxxxxxx www.netapp.com -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html