Re: [PATCH 05/13] aio: mostly crap

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon 25-05-09 09:30:52, Jens Axboe wrote:
> First attempts at getting rid of some locking in aio
  I suppose this shouldn't be in the series ;).

								Honza

> Signed-off-by: Jens Axboe <jens.axboe@xxxxxxxxxx>
> ---
>  fs/aio.c            |  151 +++++++++++++++++++++++++++++++++------------------
>  include/linux/aio.h |   11 ++--
>  2 files changed, 103 insertions(+), 59 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index 76da125..98c82f2 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -79,9 +79,8 @@ static int __init aio_setup(void)
>  	return 0;
>  }
>  
> -static void aio_free_ring(struct kioctx *ctx)
> +static void __aio_free_ring(struct kioctx *ctx, struct aio_ring_info *info)
>  {
> -	struct aio_ring_info *info = &ctx->ring_info;
>  	long i;
>  
>  	for (i=0; i<info->nr_pages; i++)
> @@ -99,16 +98,28 @@ static void aio_free_ring(struct kioctx *ctx)
>  	info->nr = 0;
>  }
>  
> -static int aio_setup_ring(struct kioctx *ctx)
> +static void aio_free_ring(struct kioctx *ctx)
> +{
> +	unsigned int i;
> +
> +	for_each_possible_cpu(i) {
> +		struct aio_ring_info *info = per_cpu_ptr(ctx->ring_info, i);
> +
> +		 __aio_free_ring(ctx, info);
> +	}
> +	free_percpu(ctx->ring_info);
> +	ctx->ring_info = NULL;
> +}
> +
> +static int __aio_setup_ring(struct kioctx *ctx, struct aio_ring_info *info)
>  {
>  	struct aio_ring *ring;
> -	struct aio_ring_info *info = &ctx->ring_info;
>  	unsigned nr_events = ctx->max_reqs;
>  	unsigned long size;
>  	int nr_pages;
>  
> -	/* Compensate for the ring buffer's head/tail overlap entry */
> -	nr_events += 2;	/* 1 is required, 2 for good luck */
> +	/* round nr_event to next power of 2 */
> +	nr_events = roundup_pow_of_two(nr_events);
>  
>  	size = sizeof(struct aio_ring);
>  	size += sizeof(struct io_event) * nr_events;
> @@ -117,8 +128,6 @@ static int aio_setup_ring(struct kioctx *ctx)
>  	if (nr_pages < 0)
>  		return -EINVAL;
>  
> -	nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
> -
>  	info->nr = 0;
>  	info->ring_pages = info->internal_pages;
>  	if (nr_pages > AIO_RING_PAGES) {
> @@ -158,7 +167,8 @@ static int aio_setup_ring(struct kioctx *ctx)
>  	ring = kmap_atomic(info->ring_pages[0], KM_USER0);
>  	ring->nr = nr_events;	/* user copy */
>  	ring->id = ctx->user_id;
> -	ring->head = ring->tail = 0;
> +	atomic_set(&ring->head, 0);
> +	ring->tail = 0;
>  	ring->magic = AIO_RING_MAGIC;
>  	ring->compat_features = AIO_RING_COMPAT_FEATURES;
>  	ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
> @@ -168,6 +178,27 @@ static int aio_setup_ring(struct kioctx *ctx)
>  	return 0;
>  }
>  
> +static int aio_setup_ring(struct kioctx *ctx)
> +{
> +	unsigned int i;
> +	int ret;
> +
> +	ctx->ring_info = alloc_percpu(struct aio_ring_info);
> +	if (!ctx->ring_info)
> +		return -ENOMEM;
> +
> +	ret = 0;
> +	for_each_possible_cpu(i) {
> +		struct aio_ring_info *info = per_cpu_ptr(ctx->ring_info, i);
> +		int err;
> +
> +		err = __aio_setup_ring(ctx, info);
> +		if (err && !ret)
> +			ret = err;
> +	}
> +
> +	return ret;
> +}
>  
>  /* aio_ring_event: returns a pointer to the event at the given index from
>   * kmap_atomic(, km).  Release the pointer with put_aio_ring_event();
> @@ -176,8 +207,8 @@ static int aio_setup_ring(struct kioctx *ctx)
>  #define AIO_EVENTS_FIRST_PAGE	((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
>  #define AIO_EVENTS_OFFSET	(AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
>  
> -#define aio_ring_event(info, nr, km) ({					\
> -	unsigned pos = (nr) + AIO_EVENTS_OFFSET;			\
> +#define aio_ring_event(info, __nr, km) ({				\
> +	unsigned pos = ((__nr) & ((info)->nr - 1)) + AIO_EVENTS_OFFSET;	\
>  	struct io_event *__event;					\
>  	__event = kmap_atomic(						\
>  			(info)->ring_pages[pos / AIO_EVENTS_PER_PAGE], km); \
> @@ -262,7 +293,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
>  
>  	atomic_set(&ctx->users, 1);
>  	spin_lock_init(&ctx->ctx_lock);
> -	spin_lock_init(&ctx->ring_info.ring_lock);
>  	init_waitqueue_head(&ctx->wait);
>  
>  	INIT_LIST_HEAD(&ctx->active_reqs);
> @@ -426,6 +456,7 @@ void exit_aio(struct mm_struct *mm)
>  static struct kiocb *__aio_get_req(struct kioctx *ctx)
>  {
>  	struct kiocb *req = NULL;
> +	struct aio_ring_info *info;
>  	struct aio_ring *ring;
>  	int okay = 0;
>  
> @@ -448,15 +479,18 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
>  	/* Check if the completion queue has enough free space to
>  	 * accept an event from this io.
>  	 */
> -	spin_lock_irq(&ctx->ctx_lock);
> -	ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
> -	if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
> +	local_irq_disable();
> +	info = per_cpu_ptr(ctx->ring_info, smp_processor_id());
> +	ring = kmap_atomic(info->ring_pages[0], KM_IRQ0);
> +	if (ctx->reqs_active < aio_ring_avail(info, ring)) {
> +		spin_lock(&ctx->ctx_lock);
>  		list_add(&req->ki_list, &ctx->active_reqs);
>  		ctx->reqs_active++;
> +		spin_unlock(&ctx->ctx_lock);
>  		okay = 1;
>  	}
> -	kunmap_atomic(ring, KM_USER0);
> -	spin_unlock_irq(&ctx->ctx_lock);
> +	kunmap_atomic(ring, KM_IRQ0);
> +	local_irq_enable();
>  
>  	if (!okay) {
>  		kmem_cache_free(kiocb_cachep, req);
> @@ -578,9 +612,11 @@ int aio_put_req(struct kiocb *req)
>  {
>  	struct kioctx *ctx = req->ki_ctx;
>  	int ret;
> +
>  	spin_lock_irq(&ctx->ctx_lock);
>  	ret = __aio_put_req(ctx, req);
>  	spin_unlock_irq(&ctx->ctx_lock);
> +
>  	return ret;
>  }
>  
> @@ -954,7 +990,7 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
>  	struct aio_ring	*ring;
>  	struct io_event	*event;
>  	unsigned long	flags;
> -	unsigned long	tail;
> +	unsigned	tail;
>  	int		ret;
>  
>  	/*
> @@ -972,15 +1008,14 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
>  		return 1;
>  	}
>  
> -	info = &ctx->ring_info;
> -
>  	/* add a completion event to the ring buffer.
>  	 * must be done holding ctx->ctx_lock to prevent
>  	 * other code from messing with the tail
>  	 * pointer since we might be called from irq
>  	 * context.
>  	 */
> -	spin_lock_irqsave(&ctx->ctx_lock, flags);
> +	local_irq_save(flags);
> +	info = per_cpu_ptr(ctx->ring_info, smp_processor_id());
>  
>  	if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
>  		list_del_init(&iocb->ki_run_list);
> @@ -996,8 +1031,6 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
>  
>  	tail = info->tail;
>  	event = aio_ring_event(info, tail, KM_IRQ0);
> -	if (++tail >= info->nr)
> -		tail = 0;
>  
>  	event->obj = (u64)(unsigned long)iocb->ki_obj.user;
>  	event->data = iocb->ki_user_data;
> @@ -1013,13 +1046,14 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
>  	 */
>  	smp_wmb();	/* make event visible before updating tail */
>  
> +	tail++;
>  	info->tail = tail;
>  	ring->tail = tail;
>  
>  	put_aio_ring_event(event, KM_IRQ0);
>  	kunmap_atomic(ring, KM_IRQ1);
>  
> -	pr_debug("added to ring %p at [%lu]\n", iocb, tail);
> +	pr_debug("added to ring %p at [%u]\n", iocb, tail);
>  
>  	/*
>  	 * Check if the user asked us to deliver the result through an
> @@ -1031,7 +1065,9 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
>  
>  put_rq:
>  	/* everything turned out well, dispose of the aiocb. */
> +	spin_lock(&ctx->ctx_lock);
>  	ret = __aio_put_req(ctx, iocb);
> +	spin_unlock(&ctx->ctx_lock);
>  
>  	/*
>  	 * We have to order our ring_info tail store above and test
> @@ -1044,49 +1080,58 @@ put_rq:
>  	if (waitqueue_active(&ctx->wait))
>  		wake_up(&ctx->wait);
>  
> -	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
> +	local_irq_restore(flags);
> +	return ret;
> +}
> +
> +static int __aio_read_evt(struct aio_ring_info *info, struct aio_ring *ring,
> +			  struct io_event *ent)
> +{
> +	struct io_event *evp;
> +	unsigned head;
> +	int ret = 0;
> +
> +	do {
> +		head = atomic_read(&ring->head);
> +		if (head == ring->tail)
> +			break;
> +		evp = aio_ring_event(info, head, KM_USER1);
> +		*ent = *evp;
> +		smp_mb(); /* finish reading the event before updatng the head */
> +		++ret;
> +		put_aio_ring_event(evp, KM_USER1);
> +	} while (head != atomic_cmpxchg(&ring->head, head, head + 1));
> +
>  	return ret;
>  }
>  
>  /* aio_read_evt
>   *	Pull an event off of the ioctx's event ring.  Returns the number of 
>   *	events fetched (0 or 1 ;-)
> - *	FIXME: make this use cmpxchg.
> - *	TODO: make the ringbuffer user mmap()able (requires FIXME).
> + *	TODO: make the ringbuffer user mmap()able
>   */
>  static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
>  {
> -	struct aio_ring_info *info = &ioctx->ring_info;
> -	struct aio_ring *ring;
> -	unsigned long head;
> -	int ret = 0;
> +	int i, ret = 0;
>  
> -	ring = kmap_atomic(info->ring_pages[0], KM_USER0);
> -	dprintk("in aio_read_evt h%lu t%lu m%lu\n",
> -		 (unsigned long)ring->head, (unsigned long)ring->tail,
> -		 (unsigned long)ring->nr);
> +	for_each_possible_cpu(i) {
> +		struct aio_ring_info *info;
> +		struct aio_ring *ring;
>  
> -	if (ring->head == ring->tail)
> -		goto out;
> +		info = per_cpu_ptr(ioctx->ring_info, i);
> +		ring = kmap_atomic(info->ring_pages[0], KM_USER0);
> +		dprintk("in aio_read_evt h%u t%u m%u\n",
> +			 atomic_read(&ring->head), ring->tail, ring->nr);
>  
> -	spin_lock(&info->ring_lock);
> -
> -	head = ring->head % info->nr;
> -	if (head != ring->tail) {
> -		struct io_event *evp = aio_ring_event(info, head, KM_USER1);
> -		*ent = *evp;
> -		head = (head + 1) % info->nr;
> -		smp_mb(); /* finish reading the event before updatng the head */
> -		ring->head = head;
> -		ret = 1;
> -		put_aio_ring_event(evp, KM_USER1);
> +		ret = __aio_read_evt(info, ring, ent);
> +		kunmap_atomic(ring, KM_USER0);
> +		if (ret)
> +			break;
>  	}
> -	spin_unlock(&info->ring_lock);
>  
> -out:
> -	kunmap_atomic(ring, KM_USER0);
> -	dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
> -		 (unsigned long)ring->head, (unsigned long)ring->tail);
> +	dprintk("leaving aio_read_evt: %d  h%u t%u\n", ret,
> +		 atomic_read(&ring->head), ring->tail);
> +
>  	return ret;
>  }
>  
> diff --git a/include/linux/aio.h b/include/linux/aio.h
> index b16a957..9a7acb4 100644
> --- a/include/linux/aio.h
> +++ b/include/linux/aio.h
> @@ -149,7 +149,7 @@ struct kiocb {
>  struct aio_ring {
>  	unsigned	id;	/* kernel internal index number */
>  	unsigned	nr;	/* number of io_events */
> -	unsigned	head;
> +	atomic_t	head;
>  	unsigned	tail;
>  
>  	unsigned	magic;
> @@ -157,11 +157,11 @@ struct aio_ring {
>  	unsigned	incompat_features;
>  	unsigned	header_length;	/* size of aio_ring */
>  
> -
> -	struct io_event		io_events[0];
> +	struct io_event	io_events[0];
>  }; /* 128 bytes + ring size */
>  
> -#define aio_ring_avail(info, ring)	(((ring)->head + (info)->nr - 1 - (ring)->tail) % (info)->nr)
> +#define aio_ring_avail(info, ring)					\
> +	((info)->nr + (unsigned) atomic_read(&(ring)->head) - (ring)->tail)
>  
>  #define AIO_RING_PAGES	8
>  struct aio_ring_info {
> @@ -169,7 +169,6 @@ struct aio_ring_info {
>  	unsigned long		mmap_size;
>  
>  	struct page		**ring_pages;
> -	spinlock_t		ring_lock;
>  	long			nr_pages;
>  
>  	unsigned		nr, tail;
> @@ -197,7 +196,7 @@ struct kioctx {
>  	/* sys_io_setup currently limits this to an unsigned int */
>  	unsigned		max_reqs;
>  
> -	struct aio_ring_info	ring_info;
> +	struct aio_ring_info	*ring_info;
>  
>  	struct delayed_work	wq;
>  
> -- 
> 1.6.3.rc0.1.gf800
> 
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux