diff mbox

[RFC] Linux AIO support when using O_DIRECT

Message ID 1237823124-6417-1-git-send-email-aliguori@us.ibm.com (mailing list archive)
State New, archived
Headers show

Commit Message

Anthony Liguori March 23, 2009, 3:45 p.m. UTC
This is just a first cut.  It needs a fair bit of cleanup before it can be
committed.  I also think we need to fixup the AIO abstractions a bit.

I wanted to share though in case anyone is interested in doing some performance
comparisons.  It seems to work although I haven't exercised it very much.

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Avi Kivity March 23, 2009, 4:17 p.m. UTC | #1
Anthony Liguori wrote:
> This is just a first cut.  It needs a fair bit of cleanup before it can be
> committed.  I also think we need to fixup the AIO abstractions a bit.
>
> I wanted to share though in case anyone is interested in doing some performance
> comparisons.  It seems to work although I haven't exercised it very much.
>
>  
> +typedef struct AIOOperations
> +{
> +    struct qemu_aiocb *(*get_aiocb)(void);
> +    void (*put_aiocb)(struct qemu_aiocb *);
> +    int (*read)(struct qemu_aiocb *);
> +    int (*write)(struct qemu_aiocb *);
> +    int (*error)(struct qemu_aiocb *);
> +    ssize_t (*get_result)(struct qemu_aiocb *aiocb);
> +    int (*cancel)(int fd, struct qemu_aiocb *aiocb);
> +} AIOOperations;
> +
>   


Instead of introducing yet another layer of indirection, you could add 
block-raw-linux-aio, which would be registered before block-raw-posix 
(which is realy block-raw-threadpool...), and resist a ->probe() if 
caching is enabled.
Anthony Liguori March 23, 2009, 5:14 p.m. UTC | #2
Avi Kivity wrote:
>
> Instead of introducing yet another layer of indirection, you could add 
> block-raw-linux-aio, which would be registered before block-raw-posix 
> (which is realy block-raw-threadpool...), and resist a ->probe() if 
> caching is enabled.

block-raw-posix needs a major overhaul.  That's why I'm not even 
considering committing the patch as is.

I'd like to see the O_DIRECT bounce buffering removed in favor of the 
DMA API bouncing.  Once that happens, raw_read and raw_pread can 
disappear.  block-raw-posix becomes much simpler.

We would drop the signaling stuff and have the thread pool use an fd to 
signal.  The big problem with that right now is that it'll cause a 
performance regression for certain platforms until we have the IO thread 
in place.

Regards,

Anthony Liguori


--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig March 23, 2009, 5:26 p.m. UTC | #3
On Mon, Mar 23, 2009 at 06:17:36PM +0200, Avi Kivity wrote:
> Instead of introducing yet another layer of indirection, you could add  
> block-raw-linux-aio, which would be registered before block-raw-posix  
> (which is realy block-raw-threadpool...), and resist a ->probe() if  
> caching is enabled.

Exactly the kind of comment I was about to make, but I need to read a
little deeper to understand all the details.

But my gut feeling is that this abstraction doesn't help us very much,
especially with Avi's aiocb pools in place.

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig March 23, 2009, 5:29 p.m. UTC | #4
On Mon, Mar 23, 2009 at 12:14:58PM -0500, Anthony Liguori wrote:
> I'd like to see the O_DIRECT bounce buffering removed in favor of the  
> DMA API bouncing.  Once that happens, raw_read and raw_pread can  
> disappear.  block-raw-posix becomes much simpler.

See my vectored I/O patches for doing the bounce buffering at the
optimal place for the aio path. Note that from my reading of the
qcow/qcow2 code they might send down unaligned requests, which is
something the dma api would not help with.

For the buffered I/O path we will always have to do some sort of buffering
due to all the partition header reading / etc.  And given how that part
isn't performance critical my preference would be to keep doing it in
bdrv_pread/write and guarantee the lowlevel drivers proper alignment.

> We would drop the signaling stuff and have the thread pool use an fd to  
> signal.  The big problem with that right now is that it'll cause a  
> performance regression for certain platforms until we have the IO thread  
> in place.

Talking about signaling, does anyone remember why the Linux signalfd/
eventfd support is only in kvm but not in upstream qemu?

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig March 23, 2009, 5:32 p.m. UTC | #5
On Mon, Mar 23, 2009 at 12:14:58PM -0500, Anthony Liguori wrote:
> block-raw-posix needs a major overhaul.  That's why I'm not even  
> considering committing the patch as is.

I have some WIP patches that split out the host device bits into
separate files to get block-raw-posix down to the pure file handling
bits without all the host-specific host device mess.  But it's at the
end of a really large pile, which needs to be rebases once we have the
patches already on the list in in some form.
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Anthony Liguori March 23, 2009, 6:10 p.m. UTC | #6
Christoph Hellwig wrote:
> On Mon, Mar 23, 2009 at 12:14:58PM -0500, Anthony Liguori wrote:
>   
>> I'd like to see the O_DIRECT bounce buffering removed in favor of the  
>> DMA API bouncing.  Once that happens, raw_read and raw_pread can  
>> disappear.  block-raw-posix becomes much simpler.
>>     
>
> See my vectored I/O patches for doing the bounce buffering at the
> optimal place for the aio path. Note that from my reading of the
> qcow/qcow2 code they might send down unaligned requests, which is
> something the dma api would not help with.
>   

I was going to look today at applying those.

> For the buffered I/O path we will always have to do some sort of buffering
> due to all the partition header reading / etc.  And given how that part
> isn't performance critical my preference would be to keep doing it in
> bdrv_pread/write and guarantee the lowlevel drivers proper alignment.
>   

I really dislike having so many APIs.  I'd rather have an aio API that 
took byte accesses or have pread/pwrite always be emulated with a full 
sector read/write

>> We would drop the signaling stuff and have the thread pool use an fd to  
>> signal.  The big problem with that right now is that it'll cause a  
>> performance regression for certain platforms until we have the IO thread  
>> in place.
>>     
>
> Talking about signaling, does anyone remember why the Linux signalfd/
> eventfd support is only in kvm but not in upstream qemu?
>   

Because upstream QEMU doesn't yet have an IO thread.

TCG chains together TBs and if you have a tight loop in a VCPU, then the 
only way to break out of the loop is to receive a signal.  The signal 
handler will call cpu_interrupt() which will unchain TBs allowing TCG 
execution to break once you return from the signal handler.

An IO thread solves this in a different way by letting select() always 
run in parallel to TCG VCPU execution.  When select() returns you can 
send a signal to the TCG VCPU thread to break it out of chained TBs.

Not all IO in qemu generates a signal so this a potential problem but in 
practice, if we don't generate a signal for disk IO completion, a number 
of real world guests breaks (mostly non-x86 boards).

Regards,

Anthony Liguori
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig March 23, 2009, 6:48 p.m. UTC | #7
On Mon, Mar 23, 2009 at 01:10:30PM -0500, Anthony Liguori wrote:
> I really dislike having so many APIs.  I'd rather have an aio API that 
> took byte accesses or have pread/pwrite always be emulated with a full 
> sector read/write

I had patches to change the aio API to byte based access, and get rid
of the read/write methods to only have the byte based pread/pwrite
APIs, but thay got obsoleted by Avi's patch to kill the pread/pwrite
ops.  We could put in byte-based AIO without byte-based read/write,
though.  In my patches I put a flag into BlockDriverState whether we
allow byte-based access to this instance or otherwise emulated it in
the block layer.  We still need this as many of the image formats can't
deal with byte-granularity access without read-modify-write cycles,
and I think we're better off having one read-modify-write handler in
the block handler than one per image format that needs it.
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Avi Kivity March 23, 2009, 7:35 p.m. UTC | #8
Christoph Hellwig wrote:
> On Mon, Mar 23, 2009 at 01:10:30PM -0500, Anthony Liguori wrote:
>   
>> I really dislike having so many APIs.  I'd rather have an aio API that 
>> took byte accesses or have pread/pwrite always be emulated with a full 
>> sector read/write
>>     
>
> I had patches to change the aio API to byte based access, and get rid
> of the read/write methods to only have the byte based pread/pwrite
> APIs, but thay got obsoleted by Avi's patch to kill the pread/pwrite
> ops.  We could put in byte-based AIO without byte-based read/write,
> though.  In my patches I put a flag into BlockDriverState whether we
> allow byte-based access to this instance or otherwise emulated it in
> the block layer.  

I like this approach.  An additional flag could tell us what buffer 
alignment the format driver wants, so we can eliminate the alignment 
bounce from format driver code.  Oh, and a flag to indicate we don't 
support vectors, so the generic layer will bounce and send us a length 
one iovec.

Note the align flag is in the device state, not the format driver, as it 
depends on the cache= settings.
Avi Kivity March 23, 2009, 7:58 p.m. UTC | #9
Anthony Liguori wrote:
> Avi Kivity wrote:
>>
>> Instead of introducing yet another layer of indirection, you could 
>> add block-raw-linux-aio, which would be registered before 
>> block-raw-posix (which is realy block-raw-threadpool...), and resist 
>> a ->probe() if caching is enabled.
>
> block-raw-posix needs a major overhaul.  That's why I'm not even 
> considering committing the patch as is.

That would suggest block-raw-linux-aio-bork-bork-bork.c even more, no?

>
> I'd like to see the O_DIRECT bounce buffering removed in favor of the 
> DMA API bouncing.  Once that happens, raw_read and raw_pread can 
> disappear.  block-raw-posix becomes much simpler.

They aren't really related... note that DMA API requests are likely to 
be aligned anyway, since the guest generates them with the expectation 
that alignent is required.  We need to align at a lower level so we can 
take care of non-dma-api callers (mostly qemu internal).

>
> We would drop the signaling stuff and have the thread pool use an fd 
> to signal.  The big problem with that right now is that it'll cause a 
> performance regression for certain platforms until we have the IO 
> thread in place. 

Well, let's merge this after the iothread?
Anthony Liguori March 23, 2009, 8:32 p.m. UTC | #10
Avi Kivity wrote:
>
>>
>> We would drop the signaling stuff and have the thread pool use an fd 
>> to signal.  The big problem with that right now is that it'll cause a 
>> performance regression for certain platforms until we have the IO 
>> thread in place. 
>
> Well, let's merge this after the iothread?

Yup.  Just posted that patch in case anyone was interested.  I needed it 
so that we could do some performance testing...

Regards,

Anthony Liguori

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/Makefile b/Makefile
index 82fec80..afc6b41 100644
--- a/Makefile
+++ b/Makefile
@@ -61,6 +61,9 @@  else
 ifdef CONFIG_AIO
 BLOCK_OBJS += posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+BLOCK_OBJS += linux-aio.o
+endif
 BLOCK_OBJS += block-raw-posix.o
 endif
 
diff --git a/Makefile.target b/Makefile.target
index 41366ee..df2a794 100644
--- a/Makefile.target
+++ b/Makefile.target
@@ -514,6 +514,9 @@  else
 ifdef CONFIG_AIO
 OBJS+=posix-aio-compat.o
 endif
+ifdef CONFIG_LINUX_AIO
+OBJS+=linux-aio.o
+endif
 OBJS+=block-raw-posix.o
 endif
 
diff --git a/block-raw-posix.c b/block-raw-posix.c
index 1a1a178..e355cf4 100644
--- a/block-raw-posix.c
+++ b/block-raw-posix.c
@@ -29,6 +29,9 @@ 
 #ifdef CONFIG_AIO
 #include "posix-aio-compat.h"
 #endif
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
 
 #ifdef CONFIG_COCOA
 #include <paths.h>
@@ -68,6 +71,10 @@ 
 #include <sys/diskslice.h>
 #endif
 
+#ifdef CONFIG_LINUX_AIO
+#include "linux-aio.h"
+#endif
+
 //#define DEBUG_FLOPPY
 
 //#define DEBUG_BLOCK
@@ -98,6 +105,17 @@ 
    reopen it to see if the disk has been changed */
 #define FD_OPEN_TIMEOUT 1000
 
+typedef struct AIOOperations
+{
+    struct qemu_aiocb *(*get_aiocb)(void);
+    void (*put_aiocb)(struct qemu_aiocb *);
+    int (*read)(struct qemu_aiocb *);
+    int (*write)(struct qemu_aiocb *);
+    int (*error)(struct qemu_aiocb *);
+    ssize_t (*get_result)(struct qemu_aiocb *aiocb);
+    int (*cancel)(int fd, struct qemu_aiocb *aiocb);
+} AIOOperations;
+
 typedef struct BDRVRawState {
     int fd;
     int type;
@@ -111,8 +129,31 @@  typedef struct BDRVRawState {
     int fd_media_changed;
 #endif
     uint8_t* aligned_buf;
+    AIOOperations *aio_ops;
 } BDRVRawState;
 
+static AIOOperations posix_aio_ops = {
+    .get_aiocb = qemu_paio_get_aiocb,
+    .put_aiocb = qemu_paio_put_aiocb,
+    .read = qemu_paio_read,
+    .write = qemu_paio_write,
+    .error = qemu_paio_error,
+    .get_result = qemu_paio_return,
+    .cancel = qemu_paio_cancel,
+};
+
+#ifdef CONFIG_LINUX_AIO
+static AIOOperations linux_aio_ops = {
+    .get_aiocb = qemu_laio_get_aiocb,
+    .put_aiocb = qemu_laio_put_aiocb,
+    .read = qemu_laio_read,
+    .write = qemu_laio_write,
+    .error = qemu_laio_error,
+    .get_result = qemu_laio_return,
+    .cancel = qemu_laio_cancel,
+};    
+#endif
+
 static int posix_aio_init(void);
 
 static int fd_open(BlockDriverState *bs);
@@ -124,6 +165,14 @@  static int raw_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
     s->lseek_err_cnt = 0;
 
     open_flags = O_BINARY;
@@ -463,7 +512,7 @@  static int raw_write(BlockDriverState *bs, int64_t sector_num,
 
 typedef struct RawAIOCB {
     BlockDriverAIOCB common;
-    struct qemu_paiocb aiocb;
+    struct qemu_aiocb *aiocb;
     struct RawAIOCB *next;
     int ret;
 } RawAIOCB;
@@ -496,19 +545,24 @@  static void posix_aio_read(void *opaque)
     for(;;) {
         pacb = &s->first_aio;
         for(;;) {
+            BDRVRawState *s;
+
             acb = *pacb;
             if (!acb)
                 goto the_end;
-            ret = qemu_paio_error(&acb->aiocb);
+
+            s = acb->common.bs->opaque;
+            ret = s->aio_ops->error(acb->aiocb);
             if (ret == ECANCELED) {
                 /* remove the request */
                 *pacb = acb->next;
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
             } else if (ret != EINPROGRESS) {
                 /* end of aio */
                 if (ret == 0) {
-                    ret = qemu_paio_return(&acb->aiocb);
-                    if (ret == acb->aiocb.aio_nbytes)
+                    ret = s->aio_ops->get_result(acb->aiocb);
+                    if (ret == acb->aiocb->aio_nbytes)
                         ret = 0;
                     else
                         ret = -EINVAL;
@@ -519,6 +573,7 @@  static void posix_aio_read(void *opaque)
                 *pacb = acb->next;
                 /* call the callback */
                 acb->common.cb(acb->common.opaque, ret);
+                s->aio_ops->put_aiocb(acb->aiocb);
                 qemu_aio_release(acb);
                 break;
             } else {
@@ -553,7 +608,6 @@  static int posix_aio_init(void)
     struct sigaction act;
     PosixAioState *s;
     int fds[2];
-    struct qemu_paioinit ai;
   
     if (posix_aio_state)
         return 0;
@@ -579,6 +633,8 @@  static int posix_aio_init(void)
 
     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
 
+    struct qemu_paioinit ai;
+
     memset(&ai, 0, sizeof(ai));
     ai.aio_threads = 64;
     ai.aio_num = 64;
@@ -600,16 +656,15 @@  static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
         return NULL;
 
     acb = qemu_aio_get(bs, cb, opaque);
-    if (!acb)
-        return NULL;
-    acb->aiocb.aio_fildes = s->fd;
-    acb->aiocb.ev_signo = SIGUSR2;
-    acb->aiocb.aio_buf = buf;
+    acb->aiocb = s->aio_ops->get_aiocb();
+    acb->aiocb->aio_fildes = s->fd;
+    acb->aiocb->ev_signo = SIGUSR2;
+    acb->aiocb->aio_buf = buf;
     if (nb_sectors < 0)
-        acb->aiocb.aio_nbytes = -nb_sectors;
+        acb->aiocb->aio_nbytes = -nb_sectors;
     else
-        acb->aiocb.aio_nbytes = nb_sectors * 512;
-    acb->aiocb.aio_offset = sector_num * 512;
+        acb->aiocb->aio_nbytes = nb_sectors * 512;
+    acb->aiocb->aio_offset = sector_num * 512;
     acb->next = posix_aio_state->first_aio;
     posix_aio_state->first_aio = acb;
     return acb;
@@ -618,7 +673,9 @@  static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
 static void raw_aio_em_cb(void* opaque)
 {
     RawAIOCB *acb = opaque;
+    BDRVRawState *s = acb->common.bs->opaque;
     acb->common.cb(acb->common.opaque, acb->ret);
+    s->aio_ops->put_aiocb(acb->aiocb);
     qemu_aio_release(acb);
 }
 
@@ -633,7 +690,9 @@  static void raw_aio_remove(RawAIOCB *acb)
             fprintf(stderr, "raw_aio_remove: aio request not found!\n");
             break;
         } else if (*pacb == acb) {
+            BDRVRawState *s = acb->common.bs->opaque;
             *pacb = acb->next;
+            s->aio_ops->put_aiocb(acb->aiocb);
             qemu_aio_release(acb);
             break;
         }
@@ -656,6 +715,7 @@  static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pread(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -665,7 +725,7 @@  static BlockDriverAIOCB *raw_aio_read(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_read(&acb->aiocb) < 0) {
+    if (s->aio_ops->read(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -687,6 +747,7 @@  static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     if (unlikely(s->aligned_buf != NULL && ((uintptr_t) buf % 512))) {
         QEMUBH *bh;
         acb = qemu_aio_get(bs, cb, opaque);
+        acb->aiocb = s->aio_ops->get_aiocb();
         acb->ret = raw_pwrite(bs, 512 * sector_num, buf, 512 * nb_sectors);
         bh = qemu_bh_new(raw_aio_em_cb, acb);
         qemu_bh_schedule(bh);
@@ -696,7 +757,7 @@  static BlockDriverAIOCB *raw_aio_write(BlockDriverState *bs,
     acb = raw_aio_setup(bs, sector_num, (uint8_t*)buf, nb_sectors, cb, opaque);
     if (!acb)
         return NULL;
-    if (qemu_paio_write(&acb->aiocb) < 0) {
+    if (s->aio_ops->write(acb->aiocb) < 0) {
         raw_aio_remove(acb);
         return NULL;
     }
@@ -707,12 +768,13 @@  static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
 {
     int ret;
     RawAIOCB *acb = (RawAIOCB *)blockacb;
+    BDRVRawState *s = acb->common.bs->opaque;
 
-    ret = qemu_paio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
+    ret = s->aio_ops->cancel(acb->aiocb->aio_fildes, acb->aiocb);
     if (ret == QEMU_PAIO_NOTCANCELED) {
         /* fail safe: if the aio could not be canceled, we wait for
            it */
-        while (qemu_paio_error(&acb->aiocb) == EINPROGRESS);
+        while (s->aio_ops->error(acb->aiocb) == EINPROGRESS);
     }
 
     raw_aio_remove(acb);
@@ -938,6 +1000,14 @@  static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
 
     posix_aio_init();
 
+#ifdef CONFIG_LINUX_AIO
+    if ((flags & BDRV_O_NOCACHE)) {
+        qemu_laio_init();
+        s->aio_ops = &linux_aio_ops;
+    } else
+#endif
+        s->aio_ops = &posix_aio_ops;
+
 #ifdef CONFIG_COCOA
     if (strstart(filename, "/dev/cdrom", NULL)) {
         kern_return_t kernResult;
diff --git a/configure b/configure
index 5c62c59..4913a3f 100755
--- a/configure
+++ b/configure
@@ -180,6 +180,7 @@  build_docs="no"
 uname_release=""
 curses="yes"
 aio="yes"
+linuxaio="yes"
 nptl="yes"
 mixemu="no"
 bluez="yes"
@@ -463,6 +464,8 @@  for opt do
   ;;
   --disable-aio) aio="no"
   ;;
+  --disable-linux-aio) linuxaio="no"
+  ;;
   --disable-blobs) blobs="no"
   ;;
   --kerneldir=*) kerneldir="$optarg"
@@ -577,6 +580,7 @@  echo "  --enable-uname-release=R Return R for uname -r in usermode emulation"
 echo "  --sparc_cpu=V            Build qemu for Sparc architecture v7, v8, v8plus, v8plusa, v9"
 echo "  --disable-vde            disable support for vde network"
 echo "  --disable-aio            disable AIO support"
+echo "  --disable-linux-aio      disable Linux AIO support"
 echo "  --disable-blobs          disable installing provided firmware blobs"
 echo "  --kerneldir=PATH         look for kernel includes in PATH"
 echo ""
@@ -1082,6 +1086,22 @@  EOF
 fi
 
 ##########################################
+# linux-aio probe
+
+if test "$linuxaio" = "yes" ; then
+    linuxaio=no
+    cat > $TMPC <<EOF
+#include <libaio.h>
+#include <sys/eventfd.h>
+int main(void) { io_setup; io_set_eventfd; eventfd; return 0; }
+EOF
+    if $cc $ARCH_CFLAGS -o $TMPE -laio $TMPC 2> /dev/null ; then
+	linuxaio=yes
+	AIOLIBS="$AIOLIBS -laio"
+    fi
+fi
+
+##########################################
 # iovec probe
 cat > $TMPC <<EOF
 #include <sys/types.h>
@@ -1204,6 +1224,7 @@  echo "uname -r          $uname_release"
 echo "NPTL support      $nptl"
 echo "vde support       $vde"
 echo "AIO support       $aio"
+echo "Linux AIO support $linuxaio"
 echo "Install blobs     $blobs"
 echo "KVM support       $kvm"
 echo "fdt support       $fdt"
@@ -1500,6 +1521,10 @@  if test "$aio" = "yes" ; then
   echo "#define CONFIG_AIO 1" >> $config_h
   echo "CONFIG_AIO=yes" >> $config_mak
 fi
+if test "$linuxaio" = "yes" ; then
+  echo "#define CONFIG_LINUX_AIO 1" >> $config_h
+  echo "CONFIG_LINUX_AIO=yes" >> $config_mak
+fi
 if test "$blobs" = "yes" ; then
   echo "INSTALL_BLOBS=yes" >> $config_mak
 fi
diff --git a/linux-aio.c b/linux-aio.c
new file mode 100644
index 0000000..959407c
--- /dev/null
+++ b/linux-aio.c
@@ -0,0 +1,207 @@ 
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu-common.h"
+#include "linux-aio.h"
+#include "sys-queue.h"
+#include "osdep.h"
+#include "qemu-aio.h"
+
+#include <sys/eventfd.h>
+#include <libaio.h>
+
+#define MAX_EVENTS 64
+
+struct qemu_laiocb
+{
+    struct qemu_aiocb common;
+    struct qemu_laio_state *ctx;
+    struct iocb iocb;
+    ssize_t ret;
+};
+
+struct qemu_laio_state
+{
+    int efd;
+    io_context_t ctx;
+    int count;
+};
+
+static struct qemu_laio_state *qemu_laio_state;
+
+static struct qemu_laiocb *aiocb_to_laiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_laiocb, common);
+}
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void)
+{
+    struct qemu_laiocb *laiocb;
+
+    laiocb = qemu_mallocz(sizeof(*laiocb));
+    return &laiocb->common;
+}
+
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    qemu_free(laiocb);
+}
+
+static void qemu_laio_completion_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+    uint64_t val;
+    ssize_t ret;
+    struct io_event events[MAX_EVENTS];
+    int ev_signo = -1;
+
+    while (1) {
+        struct timespec ts = { 0 };
+        int nevents, i;
+
+        do {
+            ret = read(s->efd, &val, sizeof(val));
+        } while (ret == -1 && errno == EINTR);
+
+        if (ret == -1 && errno == EAGAIN)
+            break;
+
+        if (ret != 8)
+            break;
+
+        do {
+            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
+        } while (nevents == -1 && errno == EINTR);
+
+        for (i = 0; i < nevents; i++) {
+            struct iocb *iocb = events[i].obj;
+            struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb);
+
+            laiocb->ret = (ssize_t)(((uint64_t)events[i].res2 << 32) | events[i].res);
+            s->count--;
+            ev_signo = laiocb->common.ev_signo;
+        }
+    }
+
+    /* FIXME this is cheating */
+    if (ev_signo != -1)
+        kill(getpid(), ev_signo);
+}
+
+static int qemu_laio_flush_cb(void *opaque)
+{
+    struct qemu_laio_state *s = opaque;
+
+    if (s->count > 0)
+        return 1;
+
+    return 0;
+}
+
+int qemu_laio_init(void)
+{
+    if (qemu_laio_state == NULL) {
+        qemu_laio_state = qemu_mallocz(sizeof(*qemu_laio_state));
+        qemu_laio_state->efd = eventfd(0, 0);
+        if (qemu_laio_state->efd == -1) {
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+        if (io_setup(MAX_EVENTS, &qemu_laio_state->ctx) != 0) {
+            close(qemu_laio_state->efd);
+            qemu_free(qemu_laio_state);
+            return -EINVAL;
+        }
+
+        fcntl(qemu_laio_state->efd, F_SETFL, O_NONBLOCK);
+
+        /* FIXME we could use a separate thread to read from eventfd. */
+        /* This will not generate a signal upon IO completion which means that
+         * the VCPU may keep spinning unless there's an IO thread. */
+        qemu_aio_set_fd_handler(qemu_laio_state->efd, qemu_laio_completion_cb,
+                                NULL, qemu_laio_flush_cb, qemu_laio_state);
+    }
+
+    return 0;
+}
+
+static int qemu_laio_submit(struct qemu_aiocb *aiocb, int is_write)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct iocb *iocbs = &laiocb->iocb;
+
+    if (is_write)
+        io_prep_pwrite(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                       aiocb->aio_nbytes, aiocb->aio_offset);
+    else
+        io_prep_pread(&laiocb->iocb, aiocb->aio_fildes, aiocb->aio_buf,
+                      aiocb->aio_nbytes, aiocb->aio_offset);
+
+    io_set_eventfd(&laiocb->iocb, qemu_laio_state->efd);
+
+    laiocb->ctx = qemu_laio_state;
+    laiocb->ret = -EINPROGRESS;
+
+    qemu_laio_state->count++;
+
+    return io_submit(qemu_laio_state->ctx, 1, &iocbs);
+}
+
+int qemu_laio_read(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 0);
+}
+
+int qemu_laio_write(struct qemu_aiocb *aiocb)
+{
+    return qemu_laio_submit(aiocb, 1);
+}
+
+int qemu_laio_error(struct qemu_aiocb *aiocb)
+{
+    ssize_t ret = qemu_laio_return(aiocb);
+
+    if (ret < 0)
+        ret = -ret;
+    else
+        ret = 0;
+
+    return ret;
+}
+
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+
+    return laiocb->ret;
+}
+
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb)
+{
+    struct qemu_laiocb *laiocb = aiocb_to_laiocb(aiocb);
+    struct io_event event;
+    int ret;
+
+    if (laiocb->ret == -EINPROGRESS) {
+        ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
+        if (ret == 0) {
+            laiocb->ret = -ECANCELED;
+            ret = QEMU_PAIO_CANCELED;
+        } else
+            ret = QEMU_PAIO_NOTCANCELED;
+    } else
+        ret = QEMU_PAIO_ALLDONE;
+
+    return ret;
+}
diff --git a/linux-aio.h b/linux-aio.h
new file mode 100644
index 0000000..002270c
--- /dev/null
+++ b/linux-aio.h
@@ -0,0 +1,28 @@ 
+/* QEMU linux-aio
+ *
+ * Copyright IBM, Corp. 2009
+ *
+ * Authors:
+ *  Anthony Liguori   <aliguori@us.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_LINUX_AIO_H
+#define QEMU_LINUX_AIO_H
+
+#include "posix-aio-compat.h"
+
+struct qemu_aiocb *qemu_laio_get_aiocb(void);
+void qemu_laio_put_aiocb(struct qemu_aiocb *aiocb);
+
+int qemu_laio_init(void);
+int qemu_laio_read(struct qemu_aiocb *aiocb);
+int qemu_laio_write(struct qemu_aiocb *aiocb);
+int qemu_laio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_laio_return(struct qemu_aiocb *aiocb);
+int qemu_laio_cancel(int fd, struct qemu_aiocb *aiocb);
+
+#endif
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index 6b547f4..752001f 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -18,10 +18,24 @@ 
 #include <string.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include "qemu-common.h"
 #include "osdep.h"
 
 #include "posix-aio-compat.h"
 
+#include "sys-queue.h"
+
+struct qemu_paiocb
+{
+    struct qemu_aiocb common;
+
+    /* private */
+    TAILQ_ENTRY(qemu_paiocb) node;
+    int is_write;
+    ssize_t ret;
+    int active;
+};
+
 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 static pthread_t thread_id;
@@ -31,6 +45,11 @@  static int cur_threads = 0;
 static int idle_threads = 0;
 static TAILQ_HEAD(, qemu_paiocb) request_list;
 
+static struct qemu_paiocb *aiocb_to_paiocb(struct qemu_aiocb *aiocb)
+{
+    return container_of(aiocb, struct qemu_paiocb, common);
+}
+
 static void die2(int err, const char *what)
 {
     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
@@ -116,19 +135,19 @@  static void *aio_thread(void *unused)
         idle_threads--;
         mutex_unlock(&lock);
 
-        while (offset < aiocb->aio_nbytes) {
+        while (offset < aiocb->common.aio_nbytes) {
             ssize_t len;
 
             if (aiocb->is_write)
-                len = pwrite(aiocb->aio_fildes,
-                             (const char *)aiocb->aio_buf + offset,
-                             aiocb->aio_nbytes - offset,
-                             aiocb->aio_offset + offset);
+                len = pwrite(aiocb->common.aio_fildes,
+                             (const char *)aiocb->common.aio_buf + offset,
+                             aiocb->common.aio_nbytes - offset,
+                             aiocb->common.aio_offset + offset);
             else
-                len = pread(aiocb->aio_fildes,
-                            (char *)aiocb->aio_buf + offset,
-                            aiocb->aio_nbytes - offset,
-                            aiocb->aio_offset + offset);
+                len = pread(aiocb->common.aio_fildes,
+                            (char *)aiocb->common.aio_buf + offset,
+                            aiocb->common.aio_nbytes - offset,
+                            aiocb->common.aio_offset + offset);
 
             if (len == -1 && errno == EINTR)
                 continue;
@@ -146,7 +165,7 @@  static void *aio_thread(void *unused)
         idle_threads++;
         mutex_unlock(&lock);
 
-        if (kill(pid, aiocb->ev_signo)) die("kill failed");
+        if (kill(pid, aiocb->common.ev_signo)) die("kill failed");
     }
 
     idle_threads--;
@@ -193,18 +212,21 @@  static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
     return 0;
 }
 
-int qemu_paio_read(struct qemu_paiocb *aiocb)
+int qemu_paio_read(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 0);
 }
 
-int qemu_paio_write(struct qemu_paiocb *aiocb)
+int qemu_paio_write(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     return qemu_paio_submit(aiocb, 1);
 }
 
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
+ssize_t qemu_paio_return(struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     ssize_t ret;
 
     mutex_lock(&lock);
@@ -214,9 +236,9 @@  ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_error(struct qemu_paiocb *aiocb)
+int qemu_paio_error(struct qemu_aiocb *cb)
 {
-    ssize_t ret = qemu_paio_return(aiocb);
+    ssize_t ret = qemu_paio_return(cb);
 
     if (ret < 0)
         ret = -ret;
@@ -226,8 +248,9 @@  int qemu_paio_error(struct qemu_paiocb *aiocb)
     return ret;
 }
 
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
+int qemu_paio_cancel(int fd, struct qemu_aiocb *cb)
 {
+    struct qemu_paiocb *aiocb = aiocb_to_paiocb(cb);
     int ret;
 
     mutex_lock(&lock);
@@ -243,3 +266,18 @@  int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
 
     return ret;
 }
+
+struct qemu_aiocb *qemu_paio_get_aiocb(void)
+{
+    struct qemu_paiocb *paiocb;
+
+    paiocb = qemu_mallocz(sizeof(*paiocb));
+    return &paiocb->common;
+}
+
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb)
+{
+    struct qemu_paiocb *paiocb = aiocb_to_paiocb(aiocb);
+
+    qemu_free(paiocb);
+}
diff --git a/posix-aio-compat.h b/posix-aio-compat.h
index 0bc10f5..b9aa3f9 100644
--- a/posix-aio-compat.h
+++ b/posix-aio-compat.h
@@ -18,25 +18,17 @@ 
 #include <unistd.h>
 #include <signal.h>
 
-#include "sys-queue.h"
-
 #define QEMU_PAIO_CANCELED     0x01
 #define QEMU_PAIO_NOTCANCELED  0x02
 #define QEMU_PAIO_ALLDONE      0x03
 
-struct qemu_paiocb
+struct qemu_aiocb
 {
     int aio_fildes;
     void *aio_buf;
     size_t aio_nbytes;
     int ev_signo;
     off_t aio_offset;
-
-    /* private */
-    TAILQ_ENTRY(qemu_paiocb) node;
-    int is_write;
-    ssize_t ret;
-    int active;
 };
 
 struct qemu_paioinit
@@ -46,11 +38,14 @@  struct qemu_paioinit
     unsigned int aio_idle_time;
 };
 
+struct qemu_aiocb *qemu_paio_get_aiocb(void);
+void qemu_paio_put_aiocb(struct qemu_aiocb *aiocb);
+
 int qemu_paio_init(struct qemu_paioinit *aioinit);
-int qemu_paio_read(struct qemu_paiocb *aiocb);
-int qemu_paio_write(struct qemu_paiocb *aiocb);
-int qemu_paio_error(struct qemu_paiocb *aiocb);
-ssize_t qemu_paio_return(struct qemu_paiocb *aiocb);
-int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb);
+int qemu_paio_read(struct qemu_aiocb *aiocb);
+int qemu_paio_write(struct qemu_aiocb *aiocb);
+int qemu_paio_error(struct qemu_aiocb *aiocb);
+ssize_t qemu_paio_return(struct qemu_aiocb *aiocb);
+int qemu_paio_cancel(int fd, struct qemu_aiocb *aiocb);
 
 #endif