This patch does a couple things: * Allows cancellation of any kiocb, even if the driver doesn't implement a ki_cancel callback function. This will be used for block layer cancellation - there, implementing a callback is problematic, but we can implement useful cancellation by just checking if the kicob has been marked as cancelled when it goes to dequeue the request. * Implements a new lookup mechanism for cancellation. Previously, to cancel a kiocb we had to look it up in a linked list, and kiocbs were added to the linked list lazily. But if any kiocb is cancellable, the lazy list adding no longer works, so we need a new mechanism. This is done by allocating kiocbs out of a (lazily allocated) array of pages, which means we can refer to the kiocbs (and iterate over them) with small integers - we use the percpu tag allocation code for allocating individual kiocbs. Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx> Cc: Zach Brown <zab@xxxxxxxxxx> Cc: Felipe Balbi <balbi@xxxxxx> Cc: Greg Kroah-Hartman <gregkh@xxxxxxxxxxxxxxxxxxx> Cc: Mark Fasheh <mfasheh@xxxxxxxx> Cc: Joel Becker <jlbec@xxxxxxxxxxxx> Cc: Rusty Russell <rusty@xxxxxxxxxxxxxxx> Cc: Jens Axboe <axboe@xxxxxxxxx> Cc: Asai Thambi S P <asamymuthupa@xxxxxxxxxx> Cc: Selvan Mani <smani@xxxxxxxxxx> Cc: Sam Bradshaw <sbradshaw@xxxxxxxxxx> Cc: Jeff Moyer <jmoyer@xxxxxxxxxx> Cc: Al Viro <viro@xxxxxxxxxxxxxxxxxx> Cc: Benjamin LaHaise <bcrl@xxxxxxxxx> --- fs/aio.c | 207 +++++++++++++++++++++++++++++++++------------------- include/linux/aio.h | 92 ++++++++++++++++------- 2 files changed, 197 insertions(+), 102 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index aa39194..f4ea8d5 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -39,6 +39,7 @@ #include <linux/compat.h> #include <linux/percpu-refcount.h> #include <linux/radix-tree.h> +#include <linux/tags.h> #include <asm/kmap_types.h> #include <asm/uaccess.h> @@ -74,6 +75,9 @@ struct kioctx { struct __percpu kioctx_cpu *cpu; + struct tag_pool kiocb_tags; + struct page **kiocb_pages; + /* * For percpu reqs_available, number of slots we move to/from global * counter at a time: @@ -113,11 +117,6 @@ struct kioctx { } ____cacheline_aligned_in_smp; struct { - spinlock_t ctx_lock; - struct list_head active_reqs; /* used for cancellation */ - } ____cacheline_aligned_in_smp; - - struct { struct mutex ring_lock; wait_queue_head_t wait; } ____cacheline_aligned_in_smp; @@ -136,16 +135,25 @@ unsigned long aio_nr; /* current system wide number of aio requests */ unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ /*----end sysctl variables---*/ -static struct kmem_cache *kiocb_cachep; static struct kmem_cache *kioctx_cachep; +#define KIOCBS_PER_PAGE (PAGE_SIZE / sizeof(struct kiocb)) + +static inline struct kiocb *kiocb_from_id(struct kioctx *ctx, unsigned id) +{ + struct page *p = ctx->kiocb_pages[id / KIOCBS_PER_PAGE]; + + return p + ? ((struct kiocb *) page_address(p)) + (id % KIOCBS_PER_PAGE) + : NULL; +} + /* aio_setup * Creates the slab caches used by the aio routines, panic on * failure as this is done early during the boot sequence. */ static int __init aio_setup(void) { - kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC); kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC); pr_debug("sizeof(struct page) = %zu\n", sizeof(struct page)); @@ -245,45 +253,58 @@ static int aio_setup_ring(struct kioctx *ctx) void kiocb_set_cancel_fn(struct kiocb *req, kiocb_cancel_fn *cancel) { - struct kioctx *ctx = req->ki_ctx; - unsigned long flags; - - spin_lock_irqsave(&ctx->ctx_lock, flags); + kiocb_cancel_fn *p, *old = req->ki_cancel; - if (!req->ki_list.next) - list_add(&req->ki_list, &ctx->active_reqs); - - req->ki_cancel = cancel; + do { + if (old == KIOCB_CANCELLED) { + cancel(req); + return; + } - spin_unlock_irqrestore(&ctx->ctx_lock, flags); + p = old; + old = cmpxchg(&req->ki_cancel, old, cancel); + } while (old != p); } EXPORT_SYMBOL(kiocb_set_cancel_fn); -static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb) +static void kiocb_cancel(struct kioctx *ctx, struct kiocb *req) { - kiocb_cancel_fn *old, *cancel; + kiocb_cancel_fn *old, *new, *cancel = req->ki_cancel; - /* - * Don't want to set kiocb->ki_cancel = KIOCB_CANCELLED unless it - * actually has a cancel function, hence the cmpxchg() - */ + local_irq_disable(); - cancel = ACCESS_ONCE(kiocb->ki_cancel); do { - if (!cancel || cancel == KIOCB_CANCELLED) - return -EINVAL; + if (cancel == KIOCB_CANCELLING || + cancel == KIOCB_CANCELLED) + goto out; old = cancel; - cancel = cmpxchg(&kiocb->ki_cancel, old, KIOCB_CANCELLED); - } while (cancel != old); + new = cancel ? KIOCB_CANCELLING : KIOCB_CANCELLED; + + cancel = cmpxchg(&req->ki_cancel, old, KIOCB_CANCELLING); + } while (old != cancel); - return cancel(kiocb); + if (cancel) { + cancel(req); + smp_wmb(); + req->ki_cancel = KIOCB_CANCELLED; + } +out: + local_irq_enable(); } static void free_ioctx_rcu(struct rcu_head *head) { struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + unsigned i; + + for (i = 0; i < DIV_ROUND_UP(ctx->nr_events, KIOCBS_PER_PAGE); i++) + if (ctx->kiocb_pages[i]) + __free_page(ctx->kiocb_pages[i]); + kfree(ctx->kiocb_pages); + + tag_pool_free(&ctx->kiocb_tags); free_percpu(ctx->cpu); kmem_cache_free(kioctx_cachep, ctx); } @@ -296,21 +317,16 @@ static void free_ioctx_rcu(struct rcu_head *head) static void free_ioctx(struct kioctx *ctx) { struct aio_ring *ring; - struct kiocb *req; - unsigned cpu, avail; + unsigned i, cpu, avail; DEFINE_WAIT(wait); - spin_lock_irq(&ctx->ctx_lock); + for (i = 0; i < ctx->nr_events; i++) { + struct kiocb *req = kiocb_from_id(ctx, i); - while (!list_empty(&ctx->active_reqs)) { - req = list_first_entry(&ctx->active_reqs, - struct kiocb, ki_list); - - list_del_init(&req->ki_list); - kiocb_cancel(ctx, req); + if (req) + kiocb_cancel(ctx, req); } - spin_unlock_irq(&ctx->ctx_lock); for_each_possible_cpu(cpu) { struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); @@ -409,13 +425,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) percpu_ref_get(&ctx->users); rcu_read_unlock(); - spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->completion_lock); mutex_init(&ctx->ring_lock); init_waitqueue_head(&ctx->wait); - INIT_LIST_HEAD(&ctx->active_reqs); - ctx->cpu = alloc_percpu(struct kioctx_cpu); if (!ctx->cpu) goto out_freeref; @@ -427,6 +440,15 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4); BUG_ON(!ctx->req_batch); + if (tag_pool_init(&ctx->kiocb_tags, ctx->nr_events)) + goto out_freering; + + ctx->kiocb_pages = + kzalloc(DIV_ROUND_UP(ctx->nr_events, KIOCBS_PER_PAGE) * + sizeof(struct page *), GFP_KERNEL); + if (!ctx->kiocb_pages) + goto out_freetags; + /* limit the number of system wide aios */ spin_lock(&aio_nr_lock); if (aio_nr + nr_events > aio_max_nr || @@ -456,6 +478,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) out_cleanup: err = -EAGAIN; + kfree(ctx->kiocb_pages); +out_freetags: + tag_pool_free(&ctx->kiocb_tags); +out_freering: aio_free_ring(ctx); out_freepcpu: free_percpu(ctx->cpu); @@ -619,17 +645,46 @@ out: static inline struct kiocb *aio_get_req(struct kioctx *ctx) { struct kiocb *req; + unsigned id; if (!get_reqs_available(ctx)) return NULL; - req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); - if (unlikely(!req)) - goto out_put; + id = tag_alloc(&ctx->kiocb_tags, false); + if (!id) + goto err; + + req = kiocb_from_id(ctx, id); + if (!req) { + unsigned i, page_nr = id / KIOCBS_PER_PAGE; + struct page *p = alloc_page(GFP_KERNEL); + if (!p) + goto err; + req = page_address(p); + + for (i = 0; i < KIOCBS_PER_PAGE; i++) { + req[i].ki_cancel = KIOCB_CANCELLED; + req[i].ki_id = page_nr * KIOCBS_PER_PAGE + i; + } + + smp_wmb(); + + if (cmpxchg(&ctx->kiocb_pages[page_nr], NULL, p) != NULL) + __free_page(p); + } + + req = kiocb_from_id(ctx, id); + + /* + * Can't set ki_cancel to NULL until we're ready for it to be + * cancellable - leave it as KIOCB_CANCELLED until then + */ + memset(req, 0, offsetof(struct kiocb, ki_cancel)); req->ki_ctx = ctx; + return req; -out_put: +err: put_reqs_available(ctx, 1); return NULL; } @@ -640,7 +695,7 @@ static void kiocb_free(struct kiocb *req) fput(req->ki_filp); if (req->ki_eventfd != NULL) eventfd_ctx_put(req->ki_eventfd); - kmem_cache_free(kiocb_cachep, req); + tag_free(&req->ki_ctx->kiocb_tags, req->ki_id); } static struct kioctx *lookup_ioctx(unsigned long ctx_id) @@ -770,17 +825,21 @@ EXPORT_SYMBOL(batch_complete_aio); void aio_complete_batch(struct kiocb *req, long res, long res2, struct batch_complete *batch) { - req->ki_res = res; - req->ki_res2 = res2; + kiocb_cancel_fn *old = NULL, *cancel = req->ki_cancel; + + do { + if (cancel == KIOCB_CANCELLING) { + cpu_relax(); + cancel = req->ki_cancel; + continue; + } - if (req->ki_list.next) { - struct kioctx *ctx = req->ki_ctx; - unsigned long flags; + old = cancel; + cancel = cmpxchg(&req->ki_cancel, old, KIOCB_CANCELLED); + } while (old != cancel); - spin_lock_irqsave(&ctx->ctx_lock, flags); - list_del(&req->ki_list); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - } + req->ki_res = res; + req->ki_res2 = res2; /* * Special case handling for sync iocbs: @@ -1204,7 +1263,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, } } - ret = put_user(KIOCB_KEY, &user_iocb->aio_key); + ret = put_user(req->ki_id, &user_iocb->aio_key); if (unlikely(ret)) { pr_debug("EFAULT: aio_key\n"); goto out_put_req; @@ -1215,6 +1274,13 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, req->ki_pos = iocb->aio_offset; req->ki_nbytes = iocb->aio_nbytes; + /* + * ki_obj.user must point to the right iocb before making the kiocb + * cancellable by setting ki_cancel = NULL: + */ + smp_wmb(); + req->ki_cancel = NULL; + ret = aio_run_iocb(req, iocb->aio_lio_opcode, (char __user *)(unsigned long)iocb->aio_buf, compat); @@ -1305,19 +1371,16 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr, static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb, u32 key) { - struct list_head *pos; - - assert_spin_locked(&ctx->ctx_lock); + struct kiocb *req; - if (key != KIOCB_KEY) + if (key > ctx->nr_events) return NULL; - /* TODO: use a hash or array, this sucks. */ - list_for_each(pos, &ctx->active_reqs) { - struct kiocb *kiocb = list_kiocb(pos); - if (kiocb->ki_obj.user == iocb) - return kiocb; - } + req = kiocb_from_id(ctx, key); + + if (req && req->ki_obj.user == iocb) + return req; + return NULL; } @@ -1347,17 +1410,9 @@ SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb, if (unlikely(!ctx)) return -EINVAL; - spin_lock_irq(&ctx->ctx_lock); - kiocb = lookup_kiocb(ctx, iocb, key); - if (kiocb) - ret = kiocb_cancel(ctx, kiocb); - else - ret = -EINVAL; - - spin_unlock_irq(&ctx->ctx_lock); - - if (!ret) { + if (kiocb) { + kiocb_cancel(ctx, kiocb); /* * The result argument is no longer used - the io_event is * always delivered via the ring buffer. -EINPROGRESS indicates diff --git a/include/linux/aio.h b/include/linux/aio.h index a6fe048..985e664 100644 --- a/include/linux/aio.h +++ b/include/linux/aio.h @@ -13,31 +13,80 @@ struct kioctx; struct kiocb; struct batch_complete; -#define KIOCB_KEY 0 - /* - * We use ki_cancel == KIOCB_CANCELLED to indicate that a kiocb has been either - * cancelled or completed (this makes a certain amount of sense because - * successful cancellation - io_cancel() - does deliver the completion to - * userspace). + * CANCELLATION + * + * SEMANTICS: + * + * Userspace may indicate (via io_cancel()) that they wish an iocb to be + * cancelled. io_cancel() does nothing more than indicate that the iocb should + * be cancelled if possible; it does not indicate whether it succeeded (nor will + * it block). + * + * If cancellation does succeed, userspace should be informed by passing + * -ECANCELLED to aio_complete(); userspace retrieves the io_event in the usual + * manner. + * + * DRIVERS: + * + * A driver that wishes to support cancellation may (but does not have to) + * implement a ki_cancel callback. If it doesn't implement a callback, it can + * check if the kiocb has been marked as cancelled (with kiocb_cancelled()). + * This is what the block layer does - when dequeuing requests it checks to see + * if it's for a bio that's been marked as cancelled, and if so doesn't send it + * to the device. + * + * Some drivers are going to need to kick something to notice that kiocb has + * been cancelled - those will want to implement a ki_cancel function. The + * callback could, say, issue a wakeup so that the thread processing the kiocb + * can notice the cancellation - or it might do something else entirely. + * kiocb->private is owned by the driver, so that ki_cancel can find the + * driver's state. + * + * A driver must guarantee that a kiocb completes in bounded time if it's been + * cancelled - this means that ki_cancel may have to guarantee forward progress. + * + * ki_cancel() may not call aio_complete(). * - * And since most things don't implement kiocb cancellation and we'd really like - * kiocb completion to be lockless when possible, we use ki_cancel to - * synchronize cancellation and completion - we only set it to KIOCB_CANCELLED - * with xchg() or cmpxchg(), see batch_complete_aio() and kiocb_cancel(). + * SYNCHRONIZATION: + * + * The aio code ensures that after aio_complete() returns, no ki_cancel function + * can be called or still be executing. Thus, the driver should free whatever + * kiocb->private points to after calling aio_complete(). + * + * Drivers must not set kiocb->ki_cancel directly; they should use + * kiocb_set_cancel_fn(), which guards against races with kiocb_cancel(). It + * might be the case that userspace cancelled the iocb before the driver called + * kiocb_set_cancel_fn() - in that case, kiocb_set_cancel_fn() will immediately + * call the cancel function you passed it, and leave ki_cancel set to + * KIOCB_CANCELLED. + */ + +/* + * Special values for kiocb->ki_cancel - these indicate that a kiocb has either + * been cancelled, or has a ki_cancel function currently running. */ -#define KIOCB_CANCELLED ((void *) (~0ULL)) +#define KIOCB_CANCELLED ((void *) (-1LL)) +#define KIOCB_CANCELLING ((void *) (-2LL)) typedef int (kiocb_cancel_fn)(struct kiocb *); struct kiocb { struct kiocb *ki_next; /* batch completion */ + /* + * If the aio_resfd field of the userspace iocb is not zero, + * this is the underlying eventfd context to deliver events to. + */ + struct eventfd_ctx *ki_eventfd; struct file *ki_filp; struct kioctx *ki_ctx; /* NULL for sync ops */ - kiocb_cancel_fn *ki_cancel; void *private; + /* Only zero up to here in aio_get_req() */ + kiocb_cancel_fn *ki_cancel; + unsigned ki_id; + union { void __user *user; struct task_struct *tsk; @@ -49,17 +98,13 @@ struct kiocb { loff_t ki_pos; size_t ki_nbytes; /* copy of iocb->aio_nbytes */ - - struct list_head ki_list; /* the aio core uses this - * for cancellation */ - - /* - * If the aio_resfd field of the userspace iocb is not zero, - * this is the underlying eventfd context to deliver events to. - */ - struct eventfd_ctx *ki_eventfd; }; +static inline bool kiocb_cancelled(struct kiocb *kiocb) +{ + return kiocb->ki_cancel == KIOCB_CANCELLED; +} + static inline bool is_sync_kiocb(struct kiocb *kiocb) { return kiocb->ki_ctx == NULL; @@ -107,11 +152,6 @@ static inline void aio_complete(struct kiocb *iocb, long res, long res2) aio_complete_batch(iocb, res, res2, NULL); } -static inline struct kiocb *list_kiocb(struct list_head *h) -{ - return list_entry(h, struct kiocb, ki_list); -} - /* for sysctl: */ extern unsigned long aio_nr; extern unsigned long aio_max_nr; -- 1.8.2.1 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html