See the previous patch for why we want to do this - this basically implements a per cpu allocator for reqs_available that doesn't actually allocate anything. Note that we need to increase the size of the ringbuffer we allocate, since a single thread won't necessarily be able to use all the reqs_available slots - some (up to about half) might be on other per cpu lists, unavailable for the current thread. We size the ringbuffer based on the nr_events userspace passed to io_setup(), so this is a slight behaviour change - but nr_events wasn't being used as a hard limit before, it was being rounded up to the next page before so this doesn't change the actual semantics. Signed-off-by: Kent Overstreet <koverstreet@xxxxxxxxxx> --- fs/aio.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index d384eb2..e415b33 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -26,6 +26,7 @@ #include <linux/mm.h> #include <linux/mman.h> #include <linux/mmu_context.h> +#include <linux/percpu.h> #include <linux/slab.h> #include <linux/timer.h> #include <linux/aio.h> @@ -59,6 +60,10 @@ struct aio_ring { #define AIO_RING_PAGES 8 +struct kioctx_cpu { + unsigned reqs_available; +}; + struct kioctx { atomic_t users; atomic_t dead; @@ -67,6 +72,10 @@ struct kioctx { unsigned long user_id; struct hlist_node list; + struct __percpu kioctx_cpu *cpu; + + unsigned req_batch; + unsigned nr; /* sys_io_setup currently limits this to an unsigned int */ @@ -149,6 +158,9 @@ static int aio_setup_ring(struct kioctx *ctx) unsigned long size; int nr_pages; + nr_events = max(nr_events, num_possible_cpus() * 4); + nr_events *= 2; + /* Compensate for the ring buffer's head/tail overlap entry */ nr_events += 2; /* 1 is required, 2 for good luck */ @@ -255,6 +267,8 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb, static void free_ioctx_rcu(struct rcu_head *head) { struct kioctx *ctx = container_of(head, struct kioctx, rcu_head); + + free_percpu(ctx->cpu); kmem_cache_free(kioctx_cachep, ctx); } @@ -268,7 +282,7 @@ static void free_ioctx(struct kioctx *ctx) struct aio_ring *ring; struct io_event res; struct kiocb *req; - unsigned head, avail; + unsigned cpu, head, avail; spin_lock_irq(&ctx->ctx_lock); @@ -282,6 +296,13 @@ static void free_ioctx(struct kioctx *ctx) spin_unlock_irq(&ctx->ctx_lock); + for_each_possible_cpu(cpu) { + struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu); + + atomic_add(kcpu->reqs_available, &ctx->reqs_available); + kcpu->reqs_available = 0; + } + ring = kmap_atomic(ctx->ring_pages[0]); head = ring->head; kunmap_atomic(ring); @@ -357,10 +378,16 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) INIT_LIST_HEAD(&ctx->active_reqs); - if (aio_setup_ring(ctx) < 0) + ctx->cpu = alloc_percpu(struct kioctx_cpu); + if (!ctx->cpu) goto out_freectx; + if (aio_setup_ring(ctx) < 0) + goto out_freepcpu; + atomic_set(&ctx->reqs_available, ctx->nr); + ctx->req_batch = ctx->nr / (num_possible_cpus() * 4); + BUG_ON(!ctx->req_batch); /* limit the number of system wide aios */ spin_lock(&aio_nr_lock); @@ -384,6 +411,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) out_cleanup: err = -EAGAIN; aio_free_ring(ctx); +out_freepcpu: + free_percpu(ctx->cpu); out_freectx: kmem_cache_free(kioctx_cachep, ctx); pr_debug("error allocating ioctx %d\n", err); @@ -482,6 +511,52 @@ void exit_aio(struct mm_struct *mm) } } +static void put_reqs_available(struct kioctx *ctx, unsigned nr) +{ + struct kioctx_cpu *kcpu; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + kcpu->reqs_available += nr; + while (kcpu->reqs_available >= ctx->req_batch * 2) { + kcpu->reqs_available -= ctx->req_batch; + atomic_add(ctx->req_batch, &ctx->reqs_available); + } + + preempt_enable(); +} + +static bool get_reqs_available(struct kioctx *ctx) +{ + struct kioctx_cpu *kcpu; + bool ret = false; + + preempt_disable(); + kcpu = this_cpu_ptr(ctx->cpu); + + if (!kcpu->reqs_available) { + int old, avail = atomic_read(&ctx->reqs_available); + + do { + if (avail < ctx->req_batch) + goto out; + + old = avail; + avail = atomic_cmpxchg(&ctx->reqs_available, + avail, avail - ctx->req_batch); + } while (avail != old); + + kcpu->reqs_available += ctx->req_batch; + } + + ret = true; + kcpu->reqs_available--; +out: + preempt_enable(); + return ret; +} + /* aio_get_req * Allocate a slot for an aio request. Increments the ki_users count * of the kioctx so that the kioctx stays around until all requests are @@ -496,7 +571,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) { struct kiocb *req; - if (atomic_dec_if_positive(&ctx->reqs_available) <= 0) + if (!get_reqs_available(ctx)) return NULL; req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); @@ -505,10 +580,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) atomic_set(&req->ki_users, 2); req->ki_ctx = ctx; - return req; out_put: - atomic_inc(&ctx->reqs_available); + put_reqs_available(ctx, 1); return NULL; } @@ -597,6 +671,10 @@ void aio_complete(struct kiocb *iocb, long res, long res2) */ if (unlikely(xchg(&iocb->ki_cancel, KIOCB_CANCELLED) == KIOCB_CANCELLED)) { + /* + * Can't use the percpu reqs_available here - could race with + * free_ioctx() + */ atomic_inc(&ctx->reqs_available); /* Still need the wake_up in case free_ioctx is waiting */ goto put_rq; @@ -737,7 +815,7 @@ static int aio_read_events_ring(struct kioctx *ctx, pr_debug("%d h%u t%u\n", ret, head, ctx->tail); - atomic_add(ret, &ctx->reqs_available); + put_reqs_available(ctx, ret); out: mutex_unlock(&ctx->ring_lock); @@ -1156,7 +1234,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, return 0; out_put_req: - atomic_inc(&ctx->reqs_available); + put_reqs_available(ctx, 1); aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop i/o ref to req */ return ret; -- 1.7.12 -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html