Re: [PATCH 27/27] libxfs: convert sync IO buftarg engine to AIO

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, Oct 15, 2020 at 06:21:55PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@xxxxxxxxxx>
> 
> Simple per-ag thread based completion engine. Will have issues with
> large AG counts.

Hm, I missed how any of this is per-AG... all I saw was an aio control
context that's attached to the buftarg.

> XXX: should this be combined with the struct btcache?
> 
> Signed-off-by: Dave Chinner <dchinner@xxxxxxxxxx>
> ---
>  include/atomic.h           |   7 +-
>  include/builddefs.in       |   2 +-
>  include/platform_defs.h.in |   1 +
>  libxfs/buftarg.c           | 202 +++++++++++++++++++++++++++++++------
>  libxfs/xfs_buf.h           |   6 ++
>  libxfs/xfs_buftarg.h       |   7 ++
>  6 files changed, 191 insertions(+), 34 deletions(-)
> 
> diff --git a/include/atomic.h b/include/atomic.h
> index 5860d7897ae5..8727fc4ddae9 100644
> --- a/include/atomic.h
> +++ b/include/atomic.h
> @@ -27,8 +27,11 @@ typedef	int64_t	atomic64_t;
>  #define atomic_inc_return(a)	uatomic_add_return(a, 1)
>  #define atomic_dec_return(a)	uatomic_sub_return(a, 1)
>  
> -#define atomic_inc(a)		atomic_inc_return(a)
> -#define atomic_dec(a)		atomic_inc_return(a)
> +#define atomic_add(a, v)	uatomic_add(a, v)
> +#define atomic_sub(a, v)	uatomic_sub(a, v)
> +
> +#define atomic_inc(a)		uatomic_inc(a)
> +#define atomic_dec(a)		uatomic_dec(a)

Does this belong in the liburcu patch?

>  
>  #define atomic_dec_and_test(a)	(atomic_dec_return(a) == 0)
>  
> diff --git a/include/builddefs.in b/include/builddefs.in
> index 78eddf4a9852..c20a48f6258c 100644
> --- a/include/builddefs.in
> +++ b/include/builddefs.in
> @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@
>  LIBBLKID = @libblkid@
>  LIBDEVMAPPER = @libdevmapper@
>  LIBINIH = @libinih@
> -LIBXFS = $(TOPDIR)/libxfs/libxfs.la
> +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio

This needs all the autoconf magic to complain if libaio isn't installed,
right?

>  LIBFROG = $(TOPDIR)/libfrog/libfrog.la
>  LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la
>  LIBXLOG = $(TOPDIR)/libxlog/libxlog.la
> diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in
> index 8af43f3b8d8a..7c30a43eb951 100644
> --- a/include/platform_defs.h.in
> +++ b/include/platform_defs.h.in
> @@ -24,6 +24,7 @@
>  #include <stdbool.h>
>  #include <libgen.h>
>  #include <urcu.h>
> +#include <libaio.h>
>  
>  typedef struct filldir		filldir_t;
>  
> diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c
> index df968c66c205..e1e5f41b423c 100644
> --- a/libxfs/buftarg.c
> +++ b/libxfs/buftarg.c
> @@ -259,11 +259,16 @@ xfs_buftarg_alloc(
>  	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
>  		goto error_lru;
>  
> -	if (xfs_buftarg_mempressue_init(btp))
> +	if (xfs_buftarg_aio_init(&btp->bt_aio))
>  		goto error_pcp;
>  
> +	if (xfs_buftarg_mempressue_init(btp))
> +		goto error_aio;
> +
>  	return btp;
>  
> +error_aio:
> +	xfs_buftarg_aio_destroy(btp->bt_aio);
>  error_pcp:
>  	percpu_counter_destroy(&btp->bt_io_count);
>  error_lru:
> @@ -286,6 +291,7 @@ xfs_buftarg_free(
>  	if (btp->bt_psi_fd >= 0)
>  		close(btp->bt_psi_fd);
>  
> +	xfs_buftarg_aio_destroy(btp->bt_aio);
>  	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
>  	percpu_counter_destroy(&btp->bt_io_count);
>  	platform_flush_device(btp->bt_fd, btp->bt_bdev);
> @@ -329,39 +335,132 @@ xfs_buf_allocate_memory(
>   */
>  
>  /*
> - * XXX: this will be replaced by an AIO submission engine in future. In the mean
> - * time, just complete the IO synchronously so all the machinery still works.
> + * AIO context for dispatch and completion.
> + *
> + * Run completion polling in a separate thread, the poll timeout will stop it
> + * from spinning in tight loops when there is nothing to do.
>   */
> -static int
> -submit_io(
> -	struct xfs_buf *bp,
> -	int		fd,
> -	void		*buf,
> -	xfs_daddr_t	blkno,
> -	int		size,
> -	bool		write)
> +#define MAX_AIO_EVENTS 1024
> +struct xfs_btaio {
> +	io_context_t	ctxp;
> +	int		aio_fd;
> +	int		aio_in_flight;
> +	pthread_t	completion_tid;
> +	bool		done;
> +};
> +
> +static void
> +xfs_buf_aio_ioend(
> +	struct io_event		*ev)
>  {
> -	int		ret;
> +	struct xfs_buf		*bp = (struct xfs_buf *)ev->data;
>  
> -	if (!write)
> -		ret = pread(fd, buf, size, BBTOB(blkno));
> -	else
> -		ret = pwrite(fd, buf, size, BBTOB(blkno));
> -	if (ret < 0)
> -		ret = -errno;
> -	else if (ret != size)
> -		ret = -EIO;
> -	else
> -		ret = 0;
>  	/*
> -	 * This is a bit of a hack until we get AIO that runs completions.
> -	 * Success is treated as a completion here, but IO errors are handled as
> -	 * a submission error and are handled by the caller. AIO will clean this
> -	 * up.
> +	 * don't overwrite existing errors - otherwise we can lose errors on
> +	 * buffers that require multiple bios to complete.
> +	 *
> +	 * We check that the returned length was the same as specified for this
> +	 * IO. Note that this onyl works for read and write - if we start
> +	 * using readv/writev for discontiguous buffers then this needs more
> +	 * work.

Interesting RFC, though I gather discontig buffers don't work yet?
Or hmm maybe there's something funny going on with
xfs_buftarg_submit_io_map?  You didn't post a git branch link, so it's
hard(er) to just rummage around on my own. :/

This seems like a reasonable restructuring to allow asynchronous io.
It's sort of a pity that this isn't modular enough to allow multiple IO
engines (synchronous system calls and io_uring come to mine) but that
can come later.

Urp, I'll go reply to the cover letter now.

--D

>  	 */
> -	if (!ret)
> +	if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) {
> +		int error = ev->res < 0 ? (int)ev->res : -EIO;
> +
> +		cmpxchg(&bp->b_io_error, 0, error);
> +	}
> +
> +	if (atomic_dec_and_test(&bp->b_io_remaining))
>  		xfs_buf_ioend(bp);
> -	return ret;
> +}
> +
> +static void
> +get_io_completions(
> +	struct xfs_btaio	*aio,
> +	int			threshold)
> +{
> +	struct io_event		ioevents[MAX_AIO_EVENTS];
> +	struct timespec		tout = {
> +		.tv_nsec = 100*1000*1000,	/* 100ms */
> +	};
> +	int			i, r;
> +
> +	/* gather up some completions */
> +	r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout);
> +	if (r < 0)  {
> +		fprintf(stderr, "FAIL! io_getevents returned %d\n", r);
> +		if (r == -EINTR)
> +			return;
> +		exit(1);
> +	}
> +	if (r == 0) {
> +		/* timeout, return to caller to check for what to do next */
> +		return;
> +	}
> +
> +	atomic_sub(&aio->aio_in_flight, r);
> +	for (i = 0; i < r; ++i)
> +		xfs_buf_aio_ioend(&ioevents[i]);
> +}
> +
> +static void *
> +aio_completion_thread(
> +	void			*arg)
> +{
> +	struct xfs_btaio	*aio = arg;
> +
> +	while (!aio->done) {
> +		get_io_completions(aio, 1);
> +	}
> +	return NULL;
> +}
> +
> +static int
> +submit_aio(
> +	struct xfs_buf		*bp,
> +	void			*buf,
> +	xfs_daddr_t		blkno,
> +	int			size)
> +{
> +	struct xfs_btaio	*aio = bp->b_target->bt_aio;
> +	int			r;
> +
> +	if (!aio->aio_fd)
> +		aio->aio_fd = bp->b_target->bt_fd;
> +
> +	/*
> +	 * Reserve and bound the number of in flight IOs to keep the number of
> +	 * pending IOs overrunning the tail of the event loop. This also serves
> +	 * to throttle incoming IOs without burning CPU by spinning.
> +	 */
> +	while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1))
> +		get_io_completions(aio, 1);
> +
> +	if (bp->b_flags & XBF_WRITE)
> +		io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
> +	else
> +		io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
> +
> +	bp->b_iocb.data = bp;
> +	do {
> +		struct iocb		*iocb;
> +
> +		iocb = &bp->b_iocb;
> +		r = io_submit(aio->ctxp, 1, &iocb);
> +		if (r == 1)
> +			return 0;	/* successful submission */
> +		fprintf(stderr, "io_submit returned %d\n", r);
> +		if (r != -EAGAIN)
> +			break;
> +		/* On EAGAIN, reap some completions and try again. */
> +		get_io_completions(aio, 1);
> +	} while (1);
> +
> +	if (bp->b_flags & LIBXFS_B_EXIT)
> +		exit(1);
> +
> +	atomic_dec(&aio->aio_in_flight);
> +	return r;
>  }
>  
>  static void
> @@ -373,7 +472,6 @@ xfs_buftarg_submit_io_map(
>  {
>  	int		size;
>  	int		offset;
> -	bool		rw = (bp->b_flags & XBF_WRITE);
>  	int		error;
>  
>  	offset = *buf_offset;
> @@ -388,8 +486,7 @@ xfs_buftarg_submit_io_map(
>  
>  	atomic_inc(&bp->b_io_remaining);
>  
> -	error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset,
> -			  bp->b_maps[map].bm_bn, size, rw);
> +	error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size);
>  	if (error) {
>  		/*
>  		 * This is guaranteed not to be the last io reference count
> @@ -474,6 +571,49 @@ xfs_buftarg_submit_io(
>  	}
>  }
>  
> +int
> +xfs_buftarg_aio_init(
> +	struct xfs_btaio	**aiop)
> +{
> +	struct xfs_btaio	*aio;
> +	int			r;
> +
> +	aio = calloc(1, sizeof(*aio));
> +	if (!aio)
> +		return -ENOMEM;
> +
> +	r = io_setup(MAX_AIO_EVENTS, &aio->ctxp);
> +	if (r) {
> +		printf("FAIL! io_setup returned %d\n", r);
> +		goto free_aio;
> +	}
> +
> +	r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread,
> +			aio);
> +	if (r) {
> +		printf("FAIL! aio thread create returned %d\n", r);
> +		goto free_aio;
> +	}
> +
> +	*aiop = aio;
> +	return 0;
> +
> +free_aio:
> +	free(aio);
> +	return r;
> +}
> +
> +void
> +xfs_buftarg_aio_destroy(
> +	struct xfs_btaio	*aio)
> +{
> +	if (!aio)
> +		return;
> +
> +	aio->done = true;
> +	pthread_join(aio->completion_tid, NULL);
> +	free(aio);
> +}
>  /*
>   * Return a buffer associated to external memory via xfs_buf_associate_memory()
>   * back to it's empty state.
> diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h
> index 4b6dff885165..29fbaaab4abb 100644
> --- a/libxfs/xfs_buf.h
> +++ b/libxfs/xfs_buf.h
> @@ -81,6 +81,12 @@ struct xfs_buf {
>  	struct completion	b_iowait;
>  	struct semaphore	b_sema;
>  	xfs_buf_iodone_t	b_iodone;
> +
> +	/*
> +	 * XXX: AIO needs as many iocbs as we have maps for discontig
> +	 * buffers to work correctly.
> +	 */
> +	struct iocb		b_iocb;
>  };
>  
>  struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target,
> diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h
> index 61c4a3164d23..62aa6f236537 100644
> --- a/libxfs/xfs_buftarg.h
> +++ b/libxfs/xfs_buftarg.h
> @@ -16,6 +16,7 @@ struct xfs_buf_ops;
>  struct xfs_buf;
>  struct xfs_buf_map;
>  struct xfs_mount;
> +struct xfs_btaio;
>  
>  /* this needs to die */
>  #define LIBXFS_BBTOOFF64(bbs)	(((xfs_off_t)(bbs)) << BBSHIFT)
> @@ -55,6 +56,9 @@ struct xfs_buftarg {
>  	bool			bt_exiting;
>  	bool			bt_low_mem;
>  	struct completion	bt_low_mem_wait;
> +
> +	/* AIO submission/completion structures */
> +	struct xfs_btaio	*bt_aio;
>  };
>  
>  /* We purged a dirty buffer and lost a write. */
> @@ -90,6 +94,9 @@ int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length);
>  void xfs_buftarg_submit_io(struct xfs_buf *bp);
>  void xfs_buf_mark_dirty(struct xfs_buf *bp);
>  
> +int xfs_buftarg_aio_init(struct xfs_btaio **aiop);
> +void xfs_buftarg_aio_destroy(struct xfs_btaio *aio);
> +
>  /*
>   * Cached buffer memory manangement
>   */
> -- 
> 2.28.0
> 



[Index of Archives]     [XFS Filesystem Development (older mail)]     [Linux Filesystem Development]     [Linux Audio Users]     [Yosemite Trails]     [Linux Kernel]     [Linux RAID]     [Linux SCSI]


  Powered by Linux