The refcounting before wasn't very clear; there are two refcounts in struct kioctx, with an unclear relationship between them (or between them and ctx->dead). Now, reqs_active holds a refcount on users (when reqs_active is nonzero), and the initial refcount is taken on reqs_active - when ctx->dead goes to 1, we drop that initial refcount. Some other semi related cleanup too. Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx> --- fs/aio.c | 187 +++++++++++++++++---------------------------------- include/linux/aio.h | 5 +- 2 files changed, 63 insertions(+), 129 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index 95419c4..3ab12f6 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -86,8 +86,9 @@ static void aio_free_ring(struct kioctx *ctx) put_page(info->ring_pages[i]); if (info->mmap_size) { - BUG_ON(ctx->mm != current->mm); - vm_munmap(info->mmap_base, info->mmap_size); + down_write(&ctx->mm->mmap_sem); + do_munmap(ctx->mm, info->mmap_base, info->mmap_size); + up_write(&ctx->mm->mmap_sem); } if (info->ring_pages && info->ring_pages != info->internal_pages) @@ -188,45 +189,37 @@ static int aio_setup_ring(struct kioctx *ctx) kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \ } while(0) -static void ctx_rcu_free(struct rcu_head *head) -{ - struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); - kmem_cache_free(kioctx_cachep, ctx); -} - -/* __put_ioctx - * Called when the last user of an aio context has gone away, - * and the struct needs to be freed. - */ -static void __put_ioctx(struct kioctx *ctx) +static void free_ioctx(struct work_struct *work) { - unsigned nr_events = ctx->max_reqs; - BUG_ON(ctx->reqs_active); + struct kioctx *ctx = container_of(work, struct kioctx, free_work); cancel_delayed_work_sync(&ctx->wq); aio_free_ring(ctx); mmdrop(ctx->mm); - ctx->mm = NULL; - if (nr_events) { - spin_lock(&aio_nr_lock); - BUG_ON(aio_nr - nr_events > aio_nr); - aio_nr -= nr_events; - spin_unlock(&aio_nr_lock); - } - pr_debug("__put_ioctx: freeing %p\n", ctx); - call_rcu(&ctx->rcu_head, ctx_rcu_free); -} -static inline int try_get_ioctx(struct kioctx *kioctx) -{ - return atomic_inc_not_zero(&kioctx->users); + spin_lock(&aio_nr_lock); + BUG_ON(aio_nr - ctx->max_reqs > aio_nr); + aio_nr -= ctx->max_reqs; + spin_unlock(&aio_nr_lock); + + synchronize_rcu(); + + pr_debug("__put_ioctx: freeing %p\n", ctx); + kmem_cache_free(kioctx_cachep, ctx); } static inline void put_ioctx(struct kioctx *kioctx) { BUG_ON(atomic_read(&kioctx->users) <= 0); if (unlikely(atomic_dec_and_test(&kioctx->users))) - __put_ioctx(kioctx); + schedule_work(&kioctx->free_work); +} + +static inline void req_put_ioctx(struct kioctx *kioctx) +{ + BUG_ON(atomic_read(&kioctx->reqs_active) <= 0); + if (unlikely(atomic_dec_and_test(&kioctx->reqs_active))) + put_ioctx(kioctx); } static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, @@ -280,12 +273,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) atomic_inc(&mm->mm_count); atomic_set(&ctx->users, 2); + atomic_set(&ctx->reqs_active, 1); + spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ring_info.ring_lock); init_waitqueue_head(&ctx->wait); INIT_LIST_HEAD(&ctx->active_reqs); INIT_LIST_HEAD(&ctx->run_list); + INIT_WORK(&ctx->free_work, free_ioctx); INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler); if (aio_setup_ring(ctx) < 0) @@ -327,36 +323,25 @@ out_freectx: */ static void kill_ctx(struct kioctx *ctx) { - struct task_struct *tsk = current; - DECLARE_WAITQUEUE(wait, tsk); + struct kiocb *iocb, *t; struct io_event res; + int put = 0; spin_lock_irq(&ctx->ctx_lock); - ctx->dead = 1; - while (!list_empty(&ctx->active_reqs)) { - struct list_head *pos = ctx->active_reqs.next; - struct kiocb *iocb = list_kiocb(pos); - list_del_init(&iocb->ki_list); - kiocb_cancel(ctx, iocb, &res); + if (!ctx->dead) { + put = 1; + ctx->dead = 1; + hlist_del_rcu(&ctx->list); } - if (!ctx->reqs_active) - goto out; - - add_wait_queue(&ctx->wait, &wait); - set_task_state(tsk, TASK_UNINTERRUPTIBLE); - while (ctx->reqs_active) { - spin_unlock_irq(&ctx->ctx_lock); - io_schedule(); - set_task_state(tsk, TASK_UNINTERRUPTIBLE); - spin_lock_irq(&ctx->ctx_lock); - } - __set_task_state(tsk, TASK_RUNNING); - remove_wait_queue(&ctx->wait, &wait); + list_for_each_entry_safe(iocb, t, &ctx->active_reqs, ki_list) + kiocb_cancel(ctx, iocb, &res); -out: spin_unlock_irq(&ctx->ctx_lock); + + if (put) + req_put_ioctx(ctx); } /* wait_on_sync_kiocb: @@ -385,18 +370,16 @@ EXPORT_SYMBOL(wait_on_sync_kiocb); void exit_aio(struct mm_struct *mm) { struct kioctx *ctx; + struct hlist_node *p; - while (!hlist_empty(&mm->ioctx_list)) { - ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list); - hlist_del_rcu(&ctx->list); - - kill_ctx(ctx); + spin_lock(&mm->ioctx_lock); + hlist_for_each_entry(ctx, p, &mm->ioctx_list, list) { if (1 != atomic_read(&ctx->users)) printk(KERN_DEBUG "exit_aio:ioctx still alive: %d %d %d\n", atomic_read(&ctx->users), ctx->dead, - ctx->reqs_active); + atomic_read(&ctx->reqs_active)); /* * We don't need to bother with munmap() here - * exit_mmap(mm) is coming and it'll unmap everything. @@ -408,39 +391,34 @@ void exit_aio(struct mm_struct *mm) * all other callers have ctx->mm == current->mm. */ ctx->ring_info.mmap_size = 0; - put_ioctx(ctx); + kill_ctx(ctx); } + + spin_unlock(&mm->ioctx_lock); } /* aio_get_req - * Allocate a slot for an aio request. Increments the users count + * Allocate a slot for an aio request. Increments the ki_users count * of the kioctx so that the kioctx stays around until all requests are * complete. Returns NULL if no requests are free. * - * Returns with kiocb->users set to 2. The io submit code path holds + * Returns with kiocb->ki_users set to 2. The io submit code path holds * an extra reference while submitting the i/o. * This prevents races between the aio code path referencing the * req (after submitting it) and aio_complete() freeing the req. */ static struct kiocb *__aio_get_req(struct kioctx *ctx) { - struct kiocb *req = NULL; + struct kiocb *req; req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL); if (unlikely(!req)) return NULL; - req->ki_flags = 0; + memset(req, 0, sizeof(*req)); req->ki_users = 2; - req->ki_key = 0; req->ki_ctx = ctx; - req->ki_cancel = NULL; - req->ki_retry = NULL; - req->ki_dtor = NULL; - req->private = NULL; - req->ki_iovec = NULL; INIT_LIST_HEAD(&req->ki_run_list); - req->ki_eventfd = NULL; return req; } @@ -473,10 +451,8 @@ static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch) list_del(&req->ki_batch); list_del(&req->ki_list); kmem_cache_free(kiocb_cachep, req); - ctx->reqs_active--; + req_put_ioctx(ctx); } - if (unlikely(!ctx->reqs_active && ctx->dead)) - wake_up_all(&ctx->wait); spin_unlock_irq(&ctx->ctx_lock); } @@ -506,7 +482,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) spin_lock_irq(&ctx->ctx_lock); ring = kmap_atomic(ctx->ring_info.ring_pages[0]); - avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active; + avail = aio_ring_avail(&ctx->ring_info, ring) - atomic_read(&ctx->reqs_active); BUG_ON(avail < 0); if (avail < allocated) { /* Trim back the number of requests. */ @@ -521,7 +497,7 @@ static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch) batch->count -= allocated; list_for_each_entry(req, &batch->head, ki_batch) { list_add(&req->ki_list, &ctx->active_reqs); - ctx->reqs_active++; + atomic_inc(&ctx->reqs_active); } kunmap_atomic(ring); @@ -546,8 +522,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx, static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) { - assert_spin_locked(&ctx->ctx_lock); - + fput(req->ki_filp); if (req->ki_eventfd != NULL) eventfd_ctx_put(req->ki_eventfd); if (req->ki_dtor) @@ -555,10 +530,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) if (req->ki_iovec != &req->ki_inline_vec) kfree(req->ki_iovec); kmem_cache_free(kiocb_cachep, req); - ctx->reqs_active--; - if (unlikely(!ctx->reqs_active && ctx->dead)) - wake_up_all(&ctx->wait); + req_put_ioctx(ctx); } /* __aio_put_req @@ -566,8 +539,8 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) */ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req) { - dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n", - req, atomic_long_read(&req->ki_filp->f_count)); + pr_debug("req=%p f_count=%ld\n", + req, atomic_long_read(&req->ki_filp->f_count)); assert_spin_locked(&ctx->ctx_lock); @@ -576,11 +549,7 @@ static void __aio_put_req(struct kioctx *ctx, struct kiocb *req) if (likely(req->ki_users)) return; list_del(&req->ki_list); /* remove from active_reqs */ - req->ki_cancel = NULL; - req->ki_retry = NULL; - fput(req->ki_filp); - req->ki_filp = NULL; really_put_req(ctx, req); } @@ -605,18 +574,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id) rcu_read_lock(); - hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) { - /* - * RCU protects us against accessing freed memory but - * we have to be careful not to get a reference when the - * reference count already dropped to 0 (ctx->dead test - * is unreliable because of races). - */ - if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){ + hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) + if (ctx->user_id == ctx_id){ + BUG_ON(ctx->dead); + atomic_inc(&ctx->users); ret = ctx; break; } - } rcu_read_unlock(); return ret; @@ -1162,7 +1126,7 @@ retry: break; /* Try to only show up in io wait if there are ops * in flight */ - if (ctx->reqs_active) + if (atomic_read(&ctx->reqs_active) > 1) io_schedule(); else schedule(); @@ -1197,35 +1161,6 @@ out: return i ? i : ret; } -/* Take an ioctx and remove it from the list of ioctx's. Protects - * against races with itself via ->dead. - */ -static void io_destroy(struct kioctx *ioctx) -{ - struct mm_struct *mm = current->mm; - int was_dead; - - /* delete the entry from the list is someone else hasn't already */ - spin_lock(&mm->ioctx_lock); - was_dead = ioctx->dead; - ioctx->dead = 1; - hlist_del_rcu(&ioctx->list); - spin_unlock(&mm->ioctx_lock); - - dprintk("aio_release(%p)\n", ioctx); - if (likely(!was_dead)) - put_ioctx(ioctx); /* twice for the list */ - - kill_ctx(ioctx); - - /* - * Wake up any waiters. The setting of ctx->dead must be seen - * by other CPUs at this point. Right now, we rely on the - * locking done by the above calls to ensure this consistency. - */ - wake_up_all(&ioctx->wait); -} - /* sys_io_setup: * Create an aio_context capable of receiving at least nr_events. * ctxp must not point to an aio_context that already exists, and @@ -1261,7 +1196,7 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp) if (!IS_ERR(ioctx)) { ret = put_user(ioctx->user_id, ctxp); if (ret) - io_destroy(ioctx); + kill_ctx(ioctx); put_ioctx(ioctx); } @@ -1279,7 +1214,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx) { struct kioctx *ioctx = lookup_ioctx(ctx); if (likely(NULL != ioctx)) { - io_destroy(ioctx); + kill_ctx(ioctx); put_ioctx(ioctx); return 0; } diff --git a/include/linux/aio.h b/include/linux/aio.h index 4cde86d..eb6e5e4 100644 --- a/include/linux/aio.h +++ b/include/linux/aio.h @@ -192,7 +192,7 @@ struct kioctx { spinlock_t ctx_lock; - int reqs_active; + atomic_t reqs_active; struct list_head active_reqs; /* used for cancellation */ struct list_head run_list; /* used for kicked reqs */ @@ -202,8 +202,7 @@ struct kioctx { struct aio_ring_info ring_info; struct delayed_work wq; - - struct rcu_head rcu_head; + struct work_struct free_work; }; /* prototypes */ -- 1.7.10.4 -- dm-devel mailing list dm-devel@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/dm-devel