Message ID | 20201015072155.1631135-28-david@fromorbit.com (mailing list archive) |
---|---|
State | Deferred, archived |
Headers | show |
Series | xfsprogs: xfs_buf unification and AIO | expand |
On Thu, Oct 15, 2020 at 06:21:55PM +1100, Dave Chinner wrote: > From: Dave Chinner <dchinner@redhat.com> > > Simple per-ag thread based completion engine. Will have issues with > large AG counts. Hm, I missed how any of this is per-AG... all I saw was an aio control context that's attached to the buftarg. > XXX: should this be combined with the struct btcache? > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > --- > include/atomic.h | 7 +- > include/builddefs.in | 2 +- > include/platform_defs.h.in | 1 + > libxfs/buftarg.c | 202 +++++++++++++++++++++++++++++++------ > libxfs/xfs_buf.h | 6 ++ > libxfs/xfs_buftarg.h | 7 ++ > 6 files changed, 191 insertions(+), 34 deletions(-) > > diff --git a/include/atomic.h b/include/atomic.h > index 5860d7897ae5..8727fc4ddae9 100644 > --- a/include/atomic.h > +++ b/include/atomic.h > @@ -27,8 +27,11 @@ typedef int64_t atomic64_t; > #define atomic_inc_return(a) uatomic_add_return(a, 1) > #define atomic_dec_return(a) uatomic_sub_return(a, 1) > > -#define atomic_inc(a) atomic_inc_return(a) > -#define atomic_dec(a) atomic_inc_return(a) > +#define atomic_add(a, v) uatomic_add(a, v) > +#define atomic_sub(a, v) uatomic_sub(a, v) > + > +#define atomic_inc(a) uatomic_inc(a) > +#define atomic_dec(a) uatomic_dec(a) Does this belong in the liburcu patch? > > #define atomic_dec_and_test(a) (atomic_dec_return(a) == 0) > > diff --git a/include/builddefs.in b/include/builddefs.in > index 78eddf4a9852..c20a48f6258c 100644 > --- a/include/builddefs.in > +++ b/include/builddefs.in > @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@ > LIBBLKID = @libblkid@ > LIBDEVMAPPER = @libdevmapper@ > LIBINIH = @libinih@ > -LIBXFS = $(TOPDIR)/libxfs/libxfs.la > +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio This needs all the autoconf magic to complain if libaio isn't installed, right? > LIBFROG = $(TOPDIR)/libfrog/libfrog.la > LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la > LIBXLOG = $(TOPDIR)/libxlog/libxlog.la > diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in > index 8af43f3b8d8a..7c30a43eb951 100644 > --- a/include/platform_defs.h.in > +++ b/include/platform_defs.h.in > @@ -24,6 +24,7 @@ > #include <stdbool.h> > #include <libgen.h> > #include <urcu.h> > +#include <libaio.h> > > typedef struct filldir filldir_t; > > diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c > index df968c66c205..e1e5f41b423c 100644 > --- a/libxfs/buftarg.c > +++ b/libxfs/buftarg.c > @@ -259,11 +259,16 @@ xfs_buftarg_alloc( > if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL)) > goto error_lru; > > - if (xfs_buftarg_mempressue_init(btp)) > + if (xfs_buftarg_aio_init(&btp->bt_aio)) > goto error_pcp; > > + if (xfs_buftarg_mempressue_init(btp)) > + goto error_aio; > + > return btp; > > +error_aio: > + xfs_buftarg_aio_destroy(btp->bt_aio); > error_pcp: > percpu_counter_destroy(&btp->bt_io_count); > error_lru: > @@ -286,6 +291,7 @@ xfs_buftarg_free( > if (btp->bt_psi_fd >= 0) > close(btp->bt_psi_fd); > > + xfs_buftarg_aio_destroy(btp->bt_aio); > ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0); > percpu_counter_destroy(&btp->bt_io_count); > platform_flush_device(btp->bt_fd, btp->bt_bdev); > @@ -329,39 +335,132 @@ xfs_buf_allocate_memory( > */ > > /* > - * XXX: this will be replaced by an AIO submission engine in future. In the mean > - * time, just complete the IO synchronously so all the machinery still works. > + * AIO context for dispatch and completion. > + * > + * Run completion polling in a separate thread, the poll timeout will stop it > + * from spinning in tight loops when there is nothing to do. > */ > -static int > -submit_io( > - struct xfs_buf *bp, > - int fd, > - void *buf, > - xfs_daddr_t blkno, > - int size, > - bool write) > +#define MAX_AIO_EVENTS 1024 > +struct xfs_btaio { > + io_context_t ctxp; > + int aio_fd; > + int aio_in_flight; > + pthread_t completion_tid; > + bool done; > +}; > + > +static void > +xfs_buf_aio_ioend( > + struct io_event *ev) > { > - int ret; > + struct xfs_buf *bp = (struct xfs_buf *)ev->data; > > - if (!write) > - ret = pread(fd, buf, size, BBTOB(blkno)); > - else > - ret = pwrite(fd, buf, size, BBTOB(blkno)); > - if (ret < 0) > - ret = -errno; > - else if (ret != size) > - ret = -EIO; > - else > - ret = 0; > /* > - * This is a bit of a hack until we get AIO that runs completions. > - * Success is treated as a completion here, but IO errors are handled as > - * a submission error and are handled by the caller. AIO will clean this > - * up. > + * don't overwrite existing errors - otherwise we can lose errors on > + * buffers that require multiple bios to complete. > + * > + * We check that the returned length was the same as specified for this > + * IO. Note that this onyl works for read and write - if we start > + * using readv/writev for discontiguous buffers then this needs more > + * work. Interesting RFC, though I gather discontig buffers don't work yet? Or hmm maybe there's something funny going on with xfs_buftarg_submit_io_map? You didn't post a git branch link, so it's hard(er) to just rummage around on my own. :/ This seems like a reasonable restructuring to allow asynchronous io. It's sort of a pity that this isn't modular enough to allow multiple IO engines (synchronous system calls and io_uring come to mine) but that can come later. Urp, I'll go reply to the cover letter now. --D > */ > - if (!ret) > + if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) { > + int error = ev->res < 0 ? (int)ev->res : -EIO; > + > + cmpxchg(&bp->b_io_error, 0, error); > + } > + > + if (atomic_dec_and_test(&bp->b_io_remaining)) > xfs_buf_ioend(bp); > - return ret; > +} > + > +static void > +get_io_completions( > + struct xfs_btaio *aio, > + int threshold) > +{ > + struct io_event ioevents[MAX_AIO_EVENTS]; > + struct timespec tout = { > + .tv_nsec = 100*1000*1000, /* 100ms */ > + }; > + int i, r; > + > + /* gather up some completions */ > + r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout); > + if (r < 0) { > + fprintf(stderr, "FAIL! io_getevents returned %d\n", r); > + if (r == -EINTR) > + return; > + exit(1); > + } > + if (r == 0) { > + /* timeout, return to caller to check for what to do next */ > + return; > + } > + > + atomic_sub(&aio->aio_in_flight, r); > + for (i = 0; i < r; ++i) > + xfs_buf_aio_ioend(&ioevents[i]); > +} > + > +static void * > +aio_completion_thread( > + void *arg) > +{ > + struct xfs_btaio *aio = arg; > + > + while (!aio->done) { > + get_io_completions(aio, 1); > + } > + return NULL; > +} > + > +static int > +submit_aio( > + struct xfs_buf *bp, > + void *buf, > + xfs_daddr_t blkno, > + int size) > +{ > + struct xfs_btaio *aio = bp->b_target->bt_aio; > + int r; > + > + if (!aio->aio_fd) > + aio->aio_fd = bp->b_target->bt_fd; > + > + /* > + * Reserve and bound the number of in flight IOs to keep the number of > + * pending IOs overrunning the tail of the event loop. This also serves > + * to throttle incoming IOs without burning CPU by spinning. > + */ > + while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1)) > + get_io_completions(aio, 1); > + > + if (bp->b_flags & XBF_WRITE) > + io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno)); > + else > + io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno)); > + > + bp->b_iocb.data = bp; > + do { > + struct iocb *iocb; > + > + iocb = &bp->b_iocb; > + r = io_submit(aio->ctxp, 1, &iocb); > + if (r == 1) > + return 0; /* successful submission */ > + fprintf(stderr, "io_submit returned %d\n", r); > + if (r != -EAGAIN) > + break; > + /* On EAGAIN, reap some completions and try again. */ > + get_io_completions(aio, 1); > + } while (1); > + > + if (bp->b_flags & LIBXFS_B_EXIT) > + exit(1); > + > + atomic_dec(&aio->aio_in_flight); > + return r; > } > > static void > @@ -373,7 +472,6 @@ xfs_buftarg_submit_io_map( > { > int size; > int offset; > - bool rw = (bp->b_flags & XBF_WRITE); > int error; > > offset = *buf_offset; > @@ -388,8 +486,7 @@ xfs_buftarg_submit_io_map( > > atomic_inc(&bp->b_io_remaining); > > - error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset, > - bp->b_maps[map].bm_bn, size, rw); > + error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size); > if (error) { > /* > * This is guaranteed not to be the last io reference count > @@ -474,6 +571,49 @@ xfs_buftarg_submit_io( > } > } > > +int > +xfs_buftarg_aio_init( > + struct xfs_btaio **aiop) > +{ > + struct xfs_btaio *aio; > + int r; > + > + aio = calloc(1, sizeof(*aio)); > + if (!aio) > + return -ENOMEM; > + > + r = io_setup(MAX_AIO_EVENTS, &aio->ctxp); > + if (r) { > + printf("FAIL! io_setup returned %d\n", r); > + goto free_aio; > + } > + > + r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread, > + aio); > + if (r) { > + printf("FAIL! aio thread create returned %d\n", r); > + goto free_aio; > + } > + > + *aiop = aio; > + return 0; > + > +free_aio: > + free(aio); > + return r; > +} > + > +void > +xfs_buftarg_aio_destroy( > + struct xfs_btaio *aio) > +{ > + if (!aio) > + return; > + > + aio->done = true; > + pthread_join(aio->completion_tid, NULL); > + free(aio); > +} > /* > * Return a buffer associated to external memory via xfs_buf_associate_memory() > * back to it's empty state. > diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h > index 4b6dff885165..29fbaaab4abb 100644 > --- a/libxfs/xfs_buf.h > +++ b/libxfs/xfs_buf.h > @@ -81,6 +81,12 @@ struct xfs_buf { > struct completion b_iowait; > struct semaphore b_sema; > xfs_buf_iodone_t b_iodone; > + > + /* > + * XXX: AIO needs as many iocbs as we have maps for discontig > + * buffers to work correctly. > + */ > + struct iocb b_iocb; > }; > > struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target, > diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h > index 61c4a3164d23..62aa6f236537 100644 > --- a/libxfs/xfs_buftarg.h > +++ b/libxfs/xfs_buftarg.h > @@ -16,6 +16,7 @@ struct xfs_buf_ops; > struct xfs_buf; > struct xfs_buf_map; > struct xfs_mount; > +struct xfs_btaio; > > /* this needs to die */ > #define LIBXFS_BBTOOFF64(bbs) (((xfs_off_t)(bbs)) << BBSHIFT) > @@ -55,6 +56,9 @@ struct xfs_buftarg { > bool bt_exiting; > bool bt_low_mem; > struct completion bt_low_mem_wait; > + > + /* AIO submission/completion structures */ > + struct xfs_btaio *bt_aio; > }; > > /* We purged a dirty buffer and lost a write. */ > @@ -90,6 +94,9 @@ int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length); > void xfs_buftarg_submit_io(struct xfs_buf *bp); > void xfs_buf_mark_dirty(struct xfs_buf *bp); > > +int xfs_buftarg_aio_init(struct xfs_btaio **aiop); > +void xfs_buftarg_aio_destroy(struct xfs_btaio *aio); > + > /* > * Cached buffer memory manangement > */ > -- > 2.28.0 >
On Thu, Oct 15, 2020 at 11:26:07AM -0700, Darrick J. Wong wrote: > On Thu, Oct 15, 2020 at 06:21:55PM +1100, Dave Chinner wrote: > > From: Dave Chinner <dchinner@redhat.com> > > > > Simple per-ag thread based completion engine. Will have issues with > > large AG counts. > > Hm, I missed how any of this is per-AG... all I saw was an aio control > context that's attached to the buftarg. Stale - I got rid of the per-ag AIO structures because having 500 completion threads in gdb is a PITA and it provided no better performance. i.e. it had problems with large AG counts. > > XXX: should this be combined with the struct btcache? > > > > Signed-off-by: Dave Chinner <dchinner@redhat.com> > > --- > > include/atomic.h | 7 +- > > include/builddefs.in | 2 +- > > include/platform_defs.h.in | 1 + > > libxfs/buftarg.c | 202 +++++++++++++++++++++++++++++++------ > > libxfs/xfs_buf.h | 6 ++ > > libxfs/xfs_buftarg.h | 7 ++ > > 6 files changed, 191 insertions(+), 34 deletions(-) > > > > diff --git a/include/atomic.h b/include/atomic.h > > index 5860d7897ae5..8727fc4ddae9 100644 > > --- a/include/atomic.h > > +++ b/include/atomic.h > > @@ -27,8 +27,11 @@ typedef int64_t atomic64_t; > > #define atomic_inc_return(a) uatomic_add_return(a, 1) > > #define atomic_dec_return(a) uatomic_sub_return(a, 1) > > > > -#define atomic_inc(a) atomic_inc_return(a) > > -#define atomic_dec(a) atomic_inc_return(a) > > +#define atomic_add(a, v) uatomic_add(a, v) > > +#define atomic_sub(a, v) uatomic_sub(a, v) > > + > > +#define atomic_inc(a) uatomic_inc(a) > > +#define atomic_dec(a) uatomic_dec(a) > > Does this belong in the liburcu patch? Probably. > > > > > #define atomic_dec_and_test(a) (atomic_dec_return(a) == 0) > > > > diff --git a/include/builddefs.in b/include/builddefs.in > > index 78eddf4a9852..c20a48f6258c 100644 > > --- a/include/builddefs.in > > +++ b/include/builddefs.in > > @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@ > > LIBBLKID = @libblkid@ > > LIBDEVMAPPER = @libdevmapper@ > > LIBINIH = @libinih@ > > -LIBXFS = $(TOPDIR)/libxfs/libxfs.la > > +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio > > This needs all the autoconf magic to complain if libaio isn't installed, > right? Yes. > > /* > > - * This is a bit of a hack until we get AIO that runs completions. > > - * Success is treated as a completion here, but IO errors are handled as > > - * a submission error and are handled by the caller. AIO will clean this > > - * up. > > + * don't overwrite existing errors - otherwise we can lose errors on > > + * buffers that require multiple bios to complete. > > + * > > + * We check that the returned length was the same as specified for this > > + * IO. Note that this onyl works for read and write - if we start > > + * using readv/writev for discontiguous buffers then this needs more > > + * work. > > Interesting RFC, though I gather discontig buffers don't work yet? Right they don't work yet. I mention that in a couple of places. > Or hmm maybe there's something funny going on with > xfs_buftarg_submit_io_map? You didn't post a git branch link, so it's > hard(er) to just rummage around on my own. :/ > > This seems like a reasonable restructuring to allow asynchronous io. > It's sort of a pity that this isn't modular enough to allow multiple IO > engines (synchronous system calls and io_uring come to mine) but that > can come later. Making it modular is easy enough, but that can be done independently of this buffer cache enabling patchset. Cheers, Dave.
diff --git a/include/atomic.h b/include/atomic.h index 5860d7897ae5..8727fc4ddae9 100644 --- a/include/atomic.h +++ b/include/atomic.h @@ -27,8 +27,11 @@ typedef int64_t atomic64_t; #define atomic_inc_return(a) uatomic_add_return(a, 1) #define atomic_dec_return(a) uatomic_sub_return(a, 1) -#define atomic_inc(a) atomic_inc_return(a) -#define atomic_dec(a) atomic_inc_return(a) +#define atomic_add(a, v) uatomic_add(a, v) +#define atomic_sub(a, v) uatomic_sub(a, v) + +#define atomic_inc(a) uatomic_inc(a) +#define atomic_dec(a) uatomic_dec(a) #define atomic_dec_and_test(a) (atomic_dec_return(a) == 0) diff --git a/include/builddefs.in b/include/builddefs.in index 78eddf4a9852..c20a48f6258c 100644 --- a/include/builddefs.in +++ b/include/builddefs.in @@ -29,7 +29,7 @@ LIBEDITLINE = @libeditline@ LIBBLKID = @libblkid@ LIBDEVMAPPER = @libdevmapper@ LIBINIH = @libinih@ -LIBXFS = $(TOPDIR)/libxfs/libxfs.la +LIBXFS = $(TOPDIR)/libxfs/libxfs.la -laio LIBFROG = $(TOPDIR)/libfrog/libfrog.la LIBXCMD = $(TOPDIR)/libxcmd/libxcmd.la LIBXLOG = $(TOPDIR)/libxlog/libxlog.la diff --git a/include/platform_defs.h.in b/include/platform_defs.h.in index 8af43f3b8d8a..7c30a43eb951 100644 --- a/include/platform_defs.h.in +++ b/include/platform_defs.h.in @@ -24,6 +24,7 @@ #include <stdbool.h> #include <libgen.h> #include <urcu.h> +#include <libaio.h> typedef struct filldir filldir_t; diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c index df968c66c205..e1e5f41b423c 100644 --- a/libxfs/buftarg.c +++ b/libxfs/buftarg.c @@ -259,11 +259,16 @@ xfs_buftarg_alloc( if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL)) goto error_lru; - if (xfs_buftarg_mempressue_init(btp)) + if (xfs_buftarg_aio_init(&btp->bt_aio)) goto error_pcp; + if (xfs_buftarg_mempressue_init(btp)) + goto error_aio; + return btp; +error_aio: + xfs_buftarg_aio_destroy(btp->bt_aio); error_pcp: percpu_counter_destroy(&btp->bt_io_count); error_lru: @@ -286,6 +291,7 @@ xfs_buftarg_free( if (btp->bt_psi_fd >= 0) close(btp->bt_psi_fd); + xfs_buftarg_aio_destroy(btp->bt_aio); ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0); percpu_counter_destroy(&btp->bt_io_count); platform_flush_device(btp->bt_fd, btp->bt_bdev); @@ -329,39 +335,132 @@ xfs_buf_allocate_memory( */ /* - * XXX: this will be replaced by an AIO submission engine in future. In the mean - * time, just complete the IO synchronously so all the machinery still works. + * AIO context for dispatch and completion. + * + * Run completion polling in a separate thread, the poll timeout will stop it + * from spinning in tight loops when there is nothing to do. */ -static int -submit_io( - struct xfs_buf *bp, - int fd, - void *buf, - xfs_daddr_t blkno, - int size, - bool write) +#define MAX_AIO_EVENTS 1024 +struct xfs_btaio { + io_context_t ctxp; + int aio_fd; + int aio_in_flight; + pthread_t completion_tid; + bool done; +}; + +static void +xfs_buf_aio_ioend( + struct io_event *ev) { - int ret; + struct xfs_buf *bp = (struct xfs_buf *)ev->data; - if (!write) - ret = pread(fd, buf, size, BBTOB(blkno)); - else - ret = pwrite(fd, buf, size, BBTOB(blkno)); - if (ret < 0) - ret = -errno; - else if (ret != size) - ret = -EIO; - else - ret = 0; /* - * This is a bit of a hack until we get AIO that runs completions. - * Success is treated as a completion here, but IO errors are handled as - * a submission error and are handled by the caller. AIO will clean this - * up. + * don't overwrite existing errors - otherwise we can lose errors on + * buffers that require multiple bios to complete. + * + * We check that the returned length was the same as specified for this + * IO. Note that this onyl works for read and write - if we start + * using readv/writev for discontiguous buffers then this needs more + * work. */ - if (!ret) + if (ev->res < 0 || ev->res != ev->obj->u.c.nbytes) { + int error = ev->res < 0 ? (int)ev->res : -EIO; + + cmpxchg(&bp->b_io_error, 0, error); + } + + if (atomic_dec_and_test(&bp->b_io_remaining)) xfs_buf_ioend(bp); - return ret; +} + +static void +get_io_completions( + struct xfs_btaio *aio, + int threshold) +{ + struct io_event ioevents[MAX_AIO_EVENTS]; + struct timespec tout = { + .tv_nsec = 100*1000*1000, /* 100ms */ + }; + int i, r; + + /* gather up some completions */ + r = io_getevents(aio->ctxp, 1, MAX_AIO_EVENTS, ioevents, &tout); + if (r < 0) { + fprintf(stderr, "FAIL! io_getevents returned %d\n", r); + if (r == -EINTR) + return; + exit(1); + } + if (r == 0) { + /* timeout, return to caller to check for what to do next */ + return; + } + + atomic_sub(&aio->aio_in_flight, r); + for (i = 0; i < r; ++i) + xfs_buf_aio_ioend(&ioevents[i]); +} + +static void * +aio_completion_thread( + void *arg) +{ + struct xfs_btaio *aio = arg; + + while (!aio->done) { + get_io_completions(aio, 1); + } + return NULL; +} + +static int +submit_aio( + struct xfs_buf *bp, + void *buf, + xfs_daddr_t blkno, + int size) +{ + struct xfs_btaio *aio = bp->b_target->bt_aio; + int r; + + if (!aio->aio_fd) + aio->aio_fd = bp->b_target->bt_fd; + + /* + * Reserve and bound the number of in flight IOs to keep the number of + * pending IOs overrunning the tail of the event loop. This also serves + * to throttle incoming IOs without burning CPU by spinning. + */ + while (!atomic_add_unless(&aio->aio_in_flight, 1, MAX_AIO_EVENTS - 1)) + get_io_completions(aio, 1); + + if (bp->b_flags & XBF_WRITE) + io_prep_pwrite(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno)); + else + io_prep_pread(&bp->b_iocb, aio->aio_fd, buf, size, BBTOB(blkno)); + + bp->b_iocb.data = bp; + do { + struct iocb *iocb; + + iocb = &bp->b_iocb; + r = io_submit(aio->ctxp, 1, &iocb); + if (r == 1) + return 0; /* successful submission */ + fprintf(stderr, "io_submit returned %d\n", r); + if (r != -EAGAIN) + break; + /* On EAGAIN, reap some completions and try again. */ + get_io_completions(aio, 1); + } while (1); + + if (bp->b_flags & LIBXFS_B_EXIT) + exit(1); + + atomic_dec(&aio->aio_in_flight); + return r; } static void @@ -373,7 +472,6 @@ xfs_buftarg_submit_io_map( { int size; int offset; - bool rw = (bp->b_flags & XBF_WRITE); int error; offset = *buf_offset; @@ -388,8 +486,7 @@ xfs_buftarg_submit_io_map( atomic_inc(&bp->b_io_remaining); - error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset, - bp->b_maps[map].bm_bn, size, rw); + error = submit_aio(bp, bp->b_addr + offset, bp->b_maps[map].bm_bn, size); if (error) { /* * This is guaranteed not to be the last io reference count @@ -474,6 +571,49 @@ xfs_buftarg_submit_io( } } +int +xfs_buftarg_aio_init( + struct xfs_btaio **aiop) +{ + struct xfs_btaio *aio; + int r; + + aio = calloc(1, sizeof(*aio)); + if (!aio) + return -ENOMEM; + + r = io_setup(MAX_AIO_EVENTS, &aio->ctxp); + if (r) { + printf("FAIL! io_setup returned %d\n", r); + goto free_aio; + } + + r = pthread_create(&aio->completion_tid, NULL, aio_completion_thread, + aio); + if (r) { + printf("FAIL! aio thread create returned %d\n", r); + goto free_aio; + } + + *aiop = aio; + return 0; + +free_aio: + free(aio); + return r; +} + +void +xfs_buftarg_aio_destroy( + struct xfs_btaio *aio) +{ + if (!aio) + return; + + aio->done = true; + pthread_join(aio->completion_tid, NULL); + free(aio); +} /* * Return a buffer associated to external memory via xfs_buf_associate_memory() * back to it's empty state. diff --git a/libxfs/xfs_buf.h b/libxfs/xfs_buf.h index 4b6dff885165..29fbaaab4abb 100644 --- a/libxfs/xfs_buf.h +++ b/libxfs/xfs_buf.h @@ -81,6 +81,12 @@ struct xfs_buf { struct completion b_iowait; struct semaphore b_sema; xfs_buf_iodone_t b_iodone; + + /* + * XXX: AIO needs as many iocbs as we have maps for discontig + * buffers to work correctly. + */ + struct iocb b_iocb; }; struct xfs_buf *xfs_buf_incore(struct xfs_buftarg *target, diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h index 61c4a3164d23..62aa6f236537 100644 --- a/libxfs/xfs_buftarg.h +++ b/libxfs/xfs_buftarg.h @@ -16,6 +16,7 @@ struct xfs_buf_ops; struct xfs_buf; struct xfs_buf_map; struct xfs_mount; +struct xfs_btaio; /* this needs to die */ #define LIBXFS_BBTOOFF64(bbs) (((xfs_off_t)(bbs)) << BBSHIFT) @@ -55,6 +56,9 @@ struct xfs_buftarg { bool bt_exiting; bool bt_low_mem; struct completion bt_low_mem_wait; + + /* AIO submission/completion structures */ + struct xfs_btaio *bt_aio; }; /* We purged a dirty buffer and lost a write. */ @@ -90,6 +94,9 @@ int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length); void xfs_buftarg_submit_io(struct xfs_buf *bp); void xfs_buf_mark_dirty(struct xfs_buf *bp); +int xfs_buftarg_aio_init(struct xfs_btaio **aiop); +void xfs_buftarg_aio_destroy(struct xfs_btaio *aio); + /* * Cached buffer memory manangement */