diff mbox series

[27/27] libxfs: convert sync IO buftarg engine to AIO

Message ID 20201015072155.1631135-28-david@fromorbit.com (mailing list archive)
State New, archived
Headers show
Series xfsprogs: xfs_buf unification and AIO | expand

Commit Message

Dave Chinner Oct. 15, 2020, 7:21 a.m. UTC
From: Dave Chinner <dchinner@redhat.com>

Simple per-ag thread based completion engine. Will have issues with
large AG counts.

XXX: should this be combined with the struct btcache?

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 include/atomic.h           |   7 +-
 include/builddefs.in       |   2 +-
 include/platform_defs.h.in |   1 +
 libxfs/buftarg.c           | 202 +++++++++++++++++++++++++++++++------
 libxfs/xfs_buf.h           |   6 ++
 libxfs/xfs_buftarg.h       |   7 ++
 6 files changed, 191 insertions(+), 34 deletions(-)

Comments

Darrick J. Wong Oct. 15, 2020, 6:26 p.m. UTC | #1
On Thu, Oct 15, 2020 at 06:21:55PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@redhat.com>
> 
> Simple per-ag thread based completion engine. Will have issues with
> large AG counts.

Hm, I missed how any of this is per-AG... all I saw was an aio control
context that's attached to the buftarg.

> XXX: should this be combined with the struct btcache?
> 
> Signed-off-by: Dave Chinner <dchinner@redhat.com>
> ---
>  include/atomic.h           |   7 +-
>  include/builddefs.in       |   2 +-
>  include/platform_defs.h.in |   1 +
>  libxfs/buftarg.c           | 202 +++++++++++++++++++++++++++++++------
>  libxfs/xfs_buf.h           |   6 ++
>  libxfs/xfs_buftarg.h       |   7 ++
>  6 files changed, 191 insertions(+), 34 deletions(-)
> 
> diff --git a/include/atomic.h b/include/atomic.h
> index 5860d7897ae5..8727fc4ddae9 100644
> --- a/include/atomic.h
> +++ b/include/atomic.h
> @@ -27,8 +27,11 @@ typedef	int64_t	atomic64_t;
>  #define atomic_inc_return(a)	uatomic_add_return(a, 1)
>  #define atomic_dec_return(a)	uatomic_sub_return(a, 1)
>  
> -#define atomic_inc(a)		atomic_inc_return(a)
> -#define atomic_dec(a)		atomic_inc_return(a)
> +#define atomic_add(a, v)	uatomic_add(a, v)
> +#define atomic_sub(a, v)	uatomic_sub(a, v)
> +
> +#define atomic_inc(a)		uatomic_inc(a)
> +#define atomic_dec(a)		uatomic_dec(a)

Does this belong in the liburcu patch?

>  
>  #define atomic_dec_and_test(a)	(atomic_dec_return(a) == 0)
>  
> diff --git a/include/builddefs.in b/include/builddefs.in
> index 78eddf4a9852..c20a48f6258c 100644
> --- a/include/builddefs.in
> +++ b/include/builddefs.in
> @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@
>  LIBBLKID = @libblkid@
>  LIBDEVMAPPER = @libdevmapper@
>  LIBINIH = @libinih@
> -LIBXFS = $(TOPDIR)/libxfs/libxfs.la
> +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio

This needs all the autoconf magic to complain if libaio isn't installed,
right?

>  LIBFROG = $(TOPDIR)/libfrog/libfrog.la
>  LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la
>  LIBXLOG = $(TOPDIR)/libxlog/libxlog.la
> diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in
> index 8af43f3b8d8a..7c30a43eb951 100644
> --- a/include/platform_defs.h.in
> +++ b/include/platform_defs.h.in
> @@ -24,6 +24,7 @@
>  #include <stdbool.h>
>  #include <libgen.h>
>  #include <urcu.h>
> +#include <libaio.h>
>  
>  typedef struct filldir		filldir_t;
>  
> diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c
> index df968c66c205..e1e5f41b423c 100644
> --- a/libxfs/buftarg.c
> +++ b/libxfs/buftarg.c
> @@ -259,11 +259,16 @@ xfs_buftarg_alloc(
>  	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
>  		goto error_lru;
>  
> -	if (xfs_buftarg_mempressue_init(btp))
> +	if (xfs_buftarg_aio_init(&btp->bt_aio))
>  		goto error_pcp;
>  
> +	if (xfs_buftarg_mempressue_init(btp))
> +		goto error_aio;
> +
>  	return btp;
>  
> +error_aio:
> +	xfs_buftarg_aio_destroy(btp->bt_aio);
>  error_pcp:
>  	percpu_counter_destroy(&btp->bt_io_count);
>  error_lru:
> @@ -286,6 +291,7 @@ xfs_buftarg_free(
>  	if (btp->bt_psi_fd >= 0)
>  		close(btp->bt_psi_fd);
>  
> +	xfs_buftarg_aio_destroy(btp->bt_aio);
>  	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
>  	percpu_counter_destroy(&btp->bt_io_count);
>  	platform_flush_device(btp->bt_fd, btp->bt_bdev);
> @@ -329,39 +335,132 @@ xfs_buf_allocate_memory(
>   */
>  
>  /*
> - * XXX: this will be replaced by an AIO submission engine in future. In the mean
> - * time, just complete the IO synchronously so all the machinery still works.
> + * AIO context for dispatch and completion.
> + *
> + * Run completion polling in a separate thread, the poll timeout will stop it
> + * from spinning in tight loops when there is nothing to do.
>   */
> -static int
> -submit_io(
> -	struct xfs_buf *bp,
> -	int		fd,
> -	void		*buf,
> -	xfs_daddr_t	blkno,
> -	int		size,
> -	bool		write)
> +#define MAX_AIO_EVENTS 1024
> +struct xfs_btaio {
> +	io_context_t	ctxp;
> +	int		aio_fd;
> +	int		aio_in_flight;
> +	pthread_t	completion_tid;
> +	bool		done;
> +};
> +
> +static void
> +xfs_buf_aio_ioend(
> +	struct io_event		*ev)
>  {
> -	int		ret;
> +	struct xfs_buf		*bp = (struct xfs_buf *)ev->data;
>  
> -	if (!write)
> -		ret = pread(fd, buf, size, BBTOB(blkno));
> -	else
> -		ret = pwrite(fd, buf, size, BBTOB(blkno));
> -	if (ret < 0)
> -		ret = -errno;
> -	else if (ret != size)
> -		ret = -EIO;
> -	else
> -		ret = 0;
>  	/*
> -	 * This is a bit of a hack until we get AIO that runs completions.
> -	 * Success is treated as a completion here, but IO errors are handled as
> -	 * a submission error and are handled by the caller. AIO will clean this
> -	 * up.
> +	 * don't overwrite existing errors - otherwise we can lose errors on
> +	 * buffers that require multiple bios to complete.
> +	 *
> +	 * We check that the returned length was the same as specified for this
> +	 * IO. Note that this onyl works for read and write - if we start
> +	 * using readv/writev for discontiguous buffers then this needs more
> +	 * work.

Interesting RFC, though I gather discontig buffers don't work yet?
Or hmm maybe there's something funny going on with
xfs_buftarg_submit_io_map?  You didn't post a git branch link, so it's
hard(er) to just rummage around on my own. :/

This seems like a reasonable restructuring to allow asynchronous io.
It's sort of a pity that this isn't modular enough to allow multiple IO
engines (synchronous system calls and io_uring come to mine) but that
can come later.

Urp, I'll go reply to the cover letter now.

--D

>  	 */
> -	if (!ret)
> +	if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) {
> +		int error = ev->res < 0 ? (int)ev->res : -EIO;
> +
> +		cmpxchg(&bp->b_io_error, 0, error);
> +	}
> +
> +	if (atomic_dec_and_test(&bp->b_io_remaining))
>  		xfs_buf_ioend(bp);
> -	return ret;
> +}
> +
> +static void
> +get_io_completions(
> +	struct xfs_btaio	*aio,
> +	int			threshold)
> +{
> +	struct io_event		ioevents[MAX_AIO_EVENTS];
> +	struct timespec		tout = {
> +		.tv_nsec = 100*1000*1000,	/* 100ms */
> +	};
> +	int			i, r;
> +
> +	/* gather up some completions */
> +	r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout);
> +	if (r < 0)  {
> +		fprintf(stderr, "FAIL! io_getevents returned %d\n", r);
> +		if (r == -EINTR)
> +			return;
> +		exit(1);
> +	}
> +	if (r == 0) {
> +		/* timeout, return to caller to check for what to do next */
> +		return;
> +	}
> +
> +	atomic_sub(&aio->aio_in_flight, r);
> +	for (i = 0; i < r; ++i)
> +		xfs_buf_aio_ioend(&ioevents[i]);
> +}
> +
> +static void *
> +aio_completion_thread(
> +	void			*arg)
> +{
> +	struct xfs_btaio	*aio = arg;
> +
> +	while (!aio->done) {
> +		get_io_completions(aio, 1);
> +	}
> +	return NULL;
> +}
> +
> +static int
> +submit_aio(
> +	struct xfs_buf		*bp,
> +	void			*buf,
> +	xfs_daddr_t		blkno,
> +	int			size)
> +{
> +	struct xfs_btaio	*aio = bp->b_target->bt_aio;
> +	int			r;
> +
> +	if (!aio->aio_fd)
> +		aio->aio_fd = bp->b_target->bt_fd;
> +
> +	/*
> +	 * Reserve and bound the number of in flight IOs to keep the number of
> +	 * pending IOs overrunning the tail of the event loop. This also serves
> +	 * to throttle incoming IOs without burning CPU by spinning.
> +	 */
> +	while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1))
> +		get_io_completions(aio, 1);
> +
> +	if (bp->b_flags & XBF_WRITE)
> +		io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
> +	else
> +		io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
> +
> +	bp->b_iocb.data = bp;
> +	do {
> +		struct iocb		*iocb;
> +
> +		iocb = &bp->b_iocb;
> +		r = io_submit(aio->ctxp, 1, &iocb);
> +		if (r == 1)
> +			return 0;	/* successful submission */
> +		fprintf(stderr, "io_submit returned %d\n", r);
> +		if (r != -EAGAIN)
> +			break;
> +		/* On EAGAIN, reap some completions and try again. */
> +		get_io_completions(aio, 1);
> +	} while (1);
> +
> +	if (bp->b_flags & LIBXFS_B_EXIT)
> +		exit(1);
> +
> +	atomic_dec(&aio->aio_in_flight);
> +	return r;
>  }
>  
>  static void
> @@ -373,7 +472,6 @@ xfs_buftarg_submit_io_map(
>  {
>  	int		size;
>  	int		offset;
> -	bool		rw = (bp->b_flags & XBF_WRITE);
>  	int		error;
>  
>  	offset = *buf_offset;
> @@ -388,8 +486,7 @@ xfs_buftarg_submit_io_map(
>  
>  	atomic_inc(&bp->b_io_remaining);
>  
> -	error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset,
> -			  bp->b_maps[map].bm_bn, size, rw);
> +	error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size);
>  	if (error) {
>  		/*
>  		 * This is guaranteed not to be the last io reference count
> @@ -474,6 +571,49 @@ xfs_buftarg_submit_io(
>  	}
>  }
>  
> +int
> +xfs_buftarg_aio_init(
> +	struct xfs_btaio	**aiop)
> +{
> +	struct xfs_btaio	*aio;
> +	int			r;
> +
> +	aio = calloc(1, sizeof(*aio));
> +	if (!aio)
> +		return -ENOMEM;
> +
> +	r = io_setup(MAX_AIO_EVENTS, &aio->ctxp);
> +	if (r) {
> +		printf("FAIL! io_setup returned %d\n", r);
> +		goto free_aio;
> +	}
> +
> +	r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread,
> +			aio);
> +	if (r) {
> +		printf("FAIL! aio thread create returned %d\n", r);
> +		goto free_aio;
> +	}
> +
> +	*aiop = aio;
> +	return 0;
> +
> +free_aio:
> +	free(aio);
> +	return r;
> +}
> +
> +void
> +xfs_buftarg_aio_destroy(
> +	struct xfs_btaio	*aio)
> +{
> +	if (!aio)
> +		return;
> +
> +	aio->done = true;
> +	pthread_join(aio->completion_tid, NULL);
> +	free(aio);
> +}
>  /*
>   * Return a buffer associated to external memory via xfs_buf_associate_memory()
>   * back to it's empty state.
> diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h
> index 4b6dff885165..29fbaaab4abb 100644
> --- a/libxfs/xfs_buf.h
> +++ b/libxfs/xfs_buf.h
> @@ -81,6 +81,12 @@ struct xfs_buf {
>  	struct completion	b_iowait;
>  	struct semaphore	b_sema;
>  	xfs_buf_iodone_t	b_iodone;
> +
> +	/*
> +	 * XXX: AIO needs as many iocbs as we have maps for discontig
> +	 * buffers to work correctly.
> +	 */
> +	struct iocb		b_iocb;
>  };
>  
>  struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target,
> diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h
> index 61c4a3164d23..62aa6f236537 100644
> --- a/libxfs/xfs_buftarg.h
> +++ b/libxfs/xfs_buftarg.h
> @@ -16,6 +16,7 @@ struct xfs_buf_ops;
>  struct xfs_buf;
>  struct xfs_buf_map;
>  struct xfs_mount;
> +struct xfs_btaio;
>  
>  /* this needs to die */
>  #define LIBXFS_BBTOOFF64(bbs)	(((xfs_off_t)(bbs)) << BBSHIFT)
> @@ -55,6 +56,9 @@ struct xfs_buftarg {
>  	bool			bt_exiting;
>  	bool			bt_low_mem;
>  	struct completion	bt_low_mem_wait;
> +
> +	/* AIO submission/completion structures */
> +	struct xfs_btaio	*bt_aio;
>  };
>  
>  /* We purged a dirty buffer and lost a write. */
> @@ -90,6 +94,9 @@ int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length);
>  void xfs_buftarg_submit_io(struct xfs_buf *bp);
>  void xfs_buf_mark_dirty(struct xfs_buf *bp);
>  
> +int xfs_buftarg_aio_init(struct xfs_btaio **aiop);
> +void xfs_buftarg_aio_destroy(struct xfs_btaio *aio);
> +
>  /*
>   * Cached buffer memory manangement
>   */
> -- 
> 2.28.0
>
Dave Chinner Oct. 15, 2020, 9:42 p.m. UTC | #2
On Thu, Oct 15, 2020 at 11:26:07AM -0700, Darrick J. Wong wrote:
> On Thu, Oct 15, 2020 at 06:21:55PM +1100, Dave Chinner wrote:
> > From: Dave Chinner <dchinner@redhat.com>
> > 
> > Simple per-ag thread based completion engine. Will have issues with
> > large AG counts.
> 
> Hm, I missed how any of this is per-AG... all I saw was an aio control
> context that's attached to the buftarg.

Stale - I got rid of the per-ag AIO structures because having 500
completion threads in gdb is a PITA and it provided no better
performance. i.e. it had problems with large AG counts.

> > XXX: should this be combined with the struct btcache?
> > 
> > Signed-off-by: Dave Chinner <dchinner@redhat.com>
> > ---
> >  include/atomic.h           |   7 +-
> >  include/builddefs.in       |   2 +-
> >  include/platform_defs.h.in |   1 +
> >  libxfs/buftarg.c           | 202 +++++++++++++++++++++++++++++++------
> >  libxfs/xfs_buf.h           |   6 ++
> >  libxfs/xfs_buftarg.h       |   7 ++
> >  6 files changed, 191 insertions(+), 34 deletions(-)
> > 
> > diff --git a/include/atomic.h b/include/atomic.h
> > index 5860d7897ae5..8727fc4ddae9 100644
> > --- a/include/atomic.h
> > +++ b/include/atomic.h
> > @@ -27,8 +27,11 @@ typedef	int64_t	atomic64_t;
> >  #define atomic_inc_return(a)	uatomic_add_return(a, 1)
> >  #define atomic_dec_return(a)	uatomic_sub_return(a, 1)
> >  
> > -#define atomic_inc(a)		atomic_inc_return(a)
> > -#define atomic_dec(a)		atomic_inc_return(a)
> > +#define atomic_add(a, v)	uatomic_add(a, v)
> > +#define atomic_sub(a, v)	uatomic_sub(a, v)
> > +
> > +#define atomic_inc(a)		uatomic_inc(a)
> > +#define atomic_dec(a)		uatomic_dec(a)
> 
> Does this belong in the liburcu patch?

Probably.

> 
> >  
> >  #define atomic_dec_and_test(a)	(atomic_dec_return(a) == 0)
> >  
> > diff --git a/include/builddefs.in b/include/builddefs.in
> > index 78eddf4a9852..c20a48f6258c 100644
> > --- a/include/builddefs.in
> > +++ b/include/builddefs.in
> > @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@
> >  LIBBLKID = @libblkid@
> >  LIBDEVMAPPER = @libdevmapper@
> >  LIBINIH = @libinih@
> > -LIBXFS = $(TOPDIR)/libxfs/libxfs.la
> > +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio
> 
> This needs all the autoconf magic to complain if libaio isn't installed,
> right?

Yes.

> >  	/*
> > -	 * This is a bit of a hack until we get AIO that runs completions.
> > -	 * Success is treated as a completion here, but IO errors are handled as
> > -	 * a submission error and are handled by the caller. AIO will clean this
> > -	 * up.
> > +	 * don't overwrite existing errors - otherwise we can lose errors on
> > +	 * buffers that require multiple bios to complete.
> > +	 *
> > +	 * We check that the returned length was the same as specified for this
> > +	 * IO. Note that this onyl works for read and write - if we start
> > +	 * using readv/writev for discontiguous buffers then this needs more
> > +	 * work.
> 
> Interesting RFC, though I gather discontig buffers don't work yet?

Right they don't work yet. I mention that in a couple of places.

> Or hmm maybe there's something funny going on with
> xfs_buftarg_submit_io_map?  You didn't post a git branch link, so it's
> hard(er) to just rummage around on my own. :/
> 
> This seems like a reasonable restructuring to allow asynchronous io.
> It's sort of a pity that this isn't modular enough to allow multiple IO
> engines (synchronous system calls and io_uring come to mine) but that
> can come later.

Making it modular is easy enough, but that can be done independently
of this buffer cache enabling patchset.

Cheers,

Dave.
diff mbox series

Patch

diff --git a/include/atomic.h b/include/atomic.h
index 5860d7897ae5..8727fc4ddae9 100644
--- a/include/atomic.h
+++ b/include/atomic.h
@@ -27,8 +27,11 @@  typedef	int64_t	atomic64_t;
 #define atomic_inc_return(a)	uatomic_add_return(a, 1)
 #define atomic_dec_return(a)	uatomic_sub_return(a, 1)
 
-#define atomic_inc(a)		atomic_inc_return(a)
-#define atomic_dec(a)		atomic_inc_return(a)
+#define atomic_add(a, v)	uatomic_add(a, v)
+#define atomic_sub(a, v)	uatomic_sub(a, v)
+
+#define atomic_inc(a)		uatomic_inc(a)
+#define atomic_dec(a)		uatomic_dec(a)
 
 #define atomic_dec_and_test(a)	(atomic_dec_return(a) == 0)
 
diff --git a/include/builddefs.in b/include/builddefs.in
index 78eddf4a9852..c20a48f6258c 100644
--- a/include/builddefs.in
+++ b/include/builddefs.in
@@ -29,7 +29,7 @@  LIBEDITLINE = @libeditline@
 LIBBLKID = @libblkid@
 LIBDEVMAPPER = @libdevmapper@
 LIBINIH = @libinih@
-LIBXFS = $(TOPDIR)/libxfs/libxfs.la
+LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio
 LIBFROG = $(TOPDIR)/libfrog/libfrog.la
 LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la
 LIBXLOG = $(TOPDIR)/libxlog/libxlog.la
diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in
index 8af43f3b8d8a..7c30a43eb951 100644
--- a/include/platform_defs.h.in
+++ b/include/platform_defs.h.in
@@ -24,6 +24,7 @@ 
 #include <stdbool.h>
 #include <libgen.h>
 #include <urcu.h>
+#include <libaio.h>
 
 typedef struct filldir		filldir_t;
 
diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c
index df968c66c205..e1e5f41b423c 100644
--- a/libxfs/buftarg.c
+++ b/libxfs/buftarg.c
@@ -259,11 +259,16 @@  xfs_buftarg_alloc(
 	if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
 		goto error_lru;
 
-	if (xfs_buftarg_mempressue_init(btp))
+	if (xfs_buftarg_aio_init(&btp->bt_aio))
 		goto error_pcp;
 
+	if (xfs_buftarg_mempressue_init(btp))
+		goto error_aio;
+
 	return btp;
 
+error_aio:
+	xfs_buftarg_aio_destroy(btp->bt_aio);
 error_pcp:
 	percpu_counter_destroy(&btp->bt_io_count);
 error_lru:
@@ -286,6 +291,7 @@  xfs_buftarg_free(
 	if (btp->bt_psi_fd >= 0)
 		close(btp->bt_psi_fd);
 
+	xfs_buftarg_aio_destroy(btp->bt_aio);
 	ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
 	percpu_counter_destroy(&btp->bt_io_count);
 	platform_flush_device(btp->bt_fd, btp->bt_bdev);
@@ -329,39 +335,132 @@  xfs_buf_allocate_memory(
  */
 
 /*
- * XXX: this will be replaced by an AIO submission engine in future. In the mean
- * time, just complete the IO synchronously so all the machinery still works.
+ * AIO context for dispatch and completion.
+ *
+ * Run completion polling in a separate thread, the poll timeout will stop it
+ * from spinning in tight loops when there is nothing to do.
  */
-static int
-submit_io(
-	struct xfs_buf *bp,
-	int		fd,
-	void		*buf,
-	xfs_daddr_t	blkno,
-	int		size,
-	bool		write)
+#define MAX_AIO_EVENTS 1024
+struct xfs_btaio {
+	io_context_t	ctxp;
+	int		aio_fd;
+	int		aio_in_flight;
+	pthread_t	completion_tid;
+	bool		done;
+};
+
+static void
+xfs_buf_aio_ioend(
+	struct io_event		*ev)
 {
-	int		ret;
+	struct xfs_buf		*bp = (struct xfs_buf *)ev->data;
 
-	if (!write)
-		ret = pread(fd, buf, size, BBTOB(blkno));
-	else
-		ret = pwrite(fd, buf, size, BBTOB(blkno));
-	if (ret < 0)
-		ret = -errno;
-	else if (ret != size)
-		ret = -EIO;
-	else
-		ret = 0;
 	/*
-	 * This is a bit of a hack until we get AIO that runs completions.
-	 * Success is treated as a completion here, but IO errors are handled as
-	 * a submission error and are handled by the caller. AIO will clean this
-	 * up.
+	 * don't overwrite existing errors - otherwise we can lose errors on
+	 * buffers that require multiple bios to complete.
+	 *
+	 * We check that the returned length was the same as specified for this
+	 * IO. Note that this onyl works for read and write - if we start
+	 * using readv/writev for discontiguous buffers then this needs more
+	 * work.
 	 */
-	if (!ret)
+	if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) {
+		int error = ev->res < 0 ? (int)ev->res : -EIO;
+
+		cmpxchg(&bp->b_io_error, 0, error);
+	}
+
+	if (atomic_dec_and_test(&bp->b_io_remaining))
 		xfs_buf_ioend(bp);
-	return ret;
+}
+
+static void
+get_io_completions(
+	struct xfs_btaio	*aio,
+	int			threshold)
+{
+	struct io_event		ioevents[MAX_AIO_EVENTS];
+	struct timespec		tout = {
+		.tv_nsec = 100*1000*1000,	/* 100ms */
+	};
+	int			i, r;
+
+	/* gather up some completions */
+	r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout);
+	if (r < 0)  {
+		fprintf(stderr, "FAIL! io_getevents returned %d\n", r);
+		if (r == -EINTR)
+			return;
+		exit(1);
+	}
+	if (r == 0) {
+		/* timeout, return to caller to check for what to do next */
+		return;
+	}
+
+	atomic_sub(&aio->aio_in_flight, r);
+	for (i = 0; i < r; ++i)
+		xfs_buf_aio_ioend(&ioevents[i]);
+}
+
+static void *
+aio_completion_thread(
+	void			*arg)
+{
+	struct xfs_btaio	*aio = arg;
+
+	while (!aio->done) {
+		get_io_completions(aio, 1);
+	}
+	return NULL;
+}
+
+static int
+submit_aio(
+	struct xfs_buf		*bp,
+	void			*buf,
+	xfs_daddr_t		blkno,
+	int			size)
+{
+	struct xfs_btaio	*aio = bp->b_target->bt_aio;
+	int			r;
+
+	if (!aio->aio_fd)
+		aio->aio_fd = bp->b_target->bt_fd;
+
+	/*
+	 * Reserve and bound the number of in flight IOs to keep the number of
+	 * pending IOs overrunning the tail of the event loop. This also serves
+	 * to throttle incoming IOs without burning CPU by spinning.
+	 */
+	while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1))
+		get_io_completions(aio, 1);
+
+	if (bp->b_flags & XBF_WRITE)
+		io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
+	else
+		io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno));
+
+	bp->b_iocb.data = bp;
+	do {
+		struct iocb		*iocb;
+
+		iocb = &bp->b_iocb;
+		r = io_submit(aio->ctxp, 1, &iocb);
+		if (r == 1)
+			return 0;	/* successful submission */
+		fprintf(stderr, "io_submit returned %d\n", r);
+		if (r != -EAGAIN)
+			break;
+		/* On EAGAIN, reap some completions and try again. */
+		get_io_completions(aio, 1);
+	} while (1);
+
+	if (bp->b_flags & LIBXFS_B_EXIT)
+		exit(1);
+
+	atomic_dec(&aio->aio_in_flight);
+	return r;
 }
 
 static void
@@ -373,7 +472,6 @@  xfs_buftarg_submit_io_map(
 {
 	int		size;
 	int		offset;
-	bool		rw = (bp->b_flags & XBF_WRITE);
 	int		error;
 
 	offset = *buf_offset;
@@ -388,8 +486,7 @@  xfs_buftarg_submit_io_map(
 
 	atomic_inc(&bp->b_io_remaining);
 
-	error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset,
-			  bp->b_maps[map].bm_bn, size, rw);
+	error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size);
 	if (error) {
 		/*
 		 * This is guaranteed not to be the last io reference count
@@ -474,6 +571,49 @@  xfs_buftarg_submit_io(
 	}
 }
 
+int
+xfs_buftarg_aio_init(
+	struct xfs_btaio	**aiop)
+{
+	struct xfs_btaio	*aio;
+	int			r;
+
+	aio = calloc(1, sizeof(*aio));
+	if (!aio)
+		return -ENOMEM;
+
+	r = io_setup(MAX_AIO_EVENTS, &aio->ctxp);
+	if (r) {
+		printf("FAIL! io_setup returned %d\n", r);
+		goto free_aio;
+	}
+
+	r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread,
+			aio);
+	if (r) {
+		printf("FAIL! aio thread create returned %d\n", r);
+		goto free_aio;
+	}
+
+	*aiop = aio;
+	return 0;
+
+free_aio:
+	free(aio);
+	return r;
+}
+
+void
+xfs_buftarg_aio_destroy(
+	struct xfs_btaio	*aio)
+{
+	if (!aio)
+		return;
+
+	aio->done = true;
+	pthread_join(aio->completion_tid, NULL);
+	free(aio);
+}
 /*
  * Return a buffer associated to external memory via xfs_buf_associate_memory()
  * back to it's empty state.
diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h
index 4b6dff885165..29fbaaab4abb 100644
--- a/libxfs/xfs_buf.h
+++ b/libxfs/xfs_buf.h
@@ -81,6 +81,12 @@  struct xfs_buf {
 	struct completion	b_iowait;
 	struct semaphore	b_sema;
 	xfs_buf_iodone_t	b_iodone;
+
+	/*
+	 * XXX: AIO needs as many iocbs as we have maps for discontig
+	 * buffers to work correctly.
+	 */
+	struct iocb		b_iocb;
 };
 
 struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target,
diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h
index 61c4a3164d23..62aa6f236537 100644
--- a/libxfs/xfs_buftarg.h
+++ b/libxfs/xfs_buftarg.h
@@ -16,6 +16,7 @@  struct xfs_buf_ops;
 struct xfs_buf;
 struct xfs_buf_map;
 struct xfs_mount;
+struct xfs_btaio;
 
 /* this needs to die */
 #define LIBXFS_BBTOOFF64(bbs)	(((xfs_off_t)(bbs)) << BBSHIFT)
@@ -55,6 +56,9 @@  struct xfs_buftarg {
 	bool			bt_exiting;
 	bool			bt_low_mem;
 	struct completion	bt_low_mem_wait;
+
+	/* AIO submission/completion structures */
+	struct xfs_btaio	*bt_aio;
 };
 
 /* We purged a dirty buffer and lost a write. */
@@ -90,6 +94,9 @@  int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length);
 void xfs_buftarg_submit_io(struct xfs_buf *bp);
 void xfs_buf_mark_dirty(struct xfs_buf *bp);
 
+int xfs_buftarg_aio_init(struct xfs_btaio **aiop);
+void xfs_buftarg_aio_destroy(struct xfs_btaio *aio);
+
 /*
  * Cached buffer memory manangement
  */