diff mbox series

[RFC,v2,6/9] block/io_uring: implements interfaces for io_uring

Message ID 20190524140337.13415-7-mehta.aaru20@gmail.com (mailing list archive)
State New, archived
Headers show
Series Add support for io_uring | expand

Commit Message

Aarushi Mehta May 24, 2019, 2:03 p.m. UTC
Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
---
 MAINTAINERS         |   1 +
 block/Makefile.objs |   2 +
 block/io_uring.c    | 306 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 309 insertions(+)
 create mode 100644 block/io_uring.c

--
2.17.1

Comments

Stefan Hajnoczi May 24, 2019, 4:17 p.m. UTC | #1
On Fri, May 24, 2019 at 07:33:34PM +0530, Aarushi Mehta wrote:
> Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
> ---
>  MAINTAINERS         |   1 +
>  block/Makefile.objs |   2 +
>  block/io_uring.c    | 306 ++++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 309 insertions(+)
>  create mode 100644 block/io_uring.c
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index b8fc1e3fe3..770d562c6c 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2510,6 +2510,7 @@ R: Stefan Hajnoczi <stefan@redhat.com>
>  L: qemu-block@nongnu.org
>  S: Maintained
>  F: stubs/io_uring.c
> +F: block/io_uring.c
> 
> 
>  L: qemu-block@nongnu.original
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index 7a81892a52..262d413c6d 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -18,6 +18,7 @@ block-obj-y += block-backend.o snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
>  block-obj-$(CONFIG_POSIX) += file-posix.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
> +block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
>  block-obj-y += null.o mirror.o commit.o io.o create.o
>  block-obj-y += throttle-groups.o
>  block-obj-$(CONFIG_LINUX) += nvme.o
> @@ -61,5 +62,6 @@ block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
>  dmg-lzfse.o-libs   := $(LZFSE_LIBS)
>  qcow.o-libs        := -lz
>  linux-aio.o-libs   := -laio
> +io_uring.o-libs    := -luring
>  parallels.o-cflags := $(LIBXML2_CFLAGS)
>  parallels.o-libs   := $(LIBXML2_LIBS)
> diff --git a/block/io_uring.c b/block/io_uring.c
> new file mode 100644
> index 0000000000..817ec055db
> --- /dev/null
> +++ b/block/io_uring.c
> @@ -0,0 +1,306 @@
> +/*
> + * Linux io_uring support.
> + *
> + * Copyright (C) 2009 IBM, Corp.
> + * Copyright (C) 2009 Red Hat, Inc.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#include <liburing.h>

Please move this include below osdep.h as per ./HACKING "1.2. Include
directives".

> +
> +#include "qemu/osdep.h"
> +#include "qemu-common.h"
> +#include "block/aio.h"
> +#include "qemu/queue.h"
> +#include "block/block.h"
> +#include "block/raw-aio.h"
> +#include "qemu/event_notifier.h"

Unused, please drop.

> +#include "qemu/coroutine.h"
> +#include "qapi/error.h"
> +
> +#define MAX_EVENTS 128
> +
> +typedef struct LuringAIOCB {
> +    BlockAIOCB common;
> +    Coroutine *co;
> +    struct io_uring_sqe sqeq;
> +    int ret;
> +    QSIMPLEQ_ENTRY(LuringAIOCB) next;
> +} LuringAIOCB;
> +
> +typedef struct LuringQueue {
> +    int plugged;
> +    unsigned int in_queue;
> +    unsigned int in_flight;
> +    bool blocked;
> +    QSIMPLEQ_HEAD(, LuringAIOCB) pending;
> +} LuringQueue;
> +
> +typedef struct LuringState {
> +    AioContext *aio_context;
> +
> +    struct io_uring ring;
> +
> +    /* io queue for submit at batch.  Protected by AioContext lock. */
> +    LuringQueue io_q;
> +
> +    /* I/O completion processing.  Only runs in I/O thread.  */
> +    QEMUBH *completion_bh;
> +    int event_idx;
> +    int event_max;
> +} LuringState;
> +
> +static void ioq_submit(LuringState *s);
> +
> +static inline int io_cqe_ret(struct io_uring_cqe *cqe)
> +{
> +    return cqe->res;
> +}
> +
> +/**
> + * qemu_luring_process_completions:
> + * @s: AIO state
> + *
> + * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
> + *
> + */
> +static void qemu_luring_process_completions(LuringState *s)
> +{
> +    struct io_uring_cqe *cqes;
> +
> +    qemu_bh_schedule(s->completion_bh);

Please add a comment explaining why this is necessary:

  /*
   * Request completion callbacks can run the nested event loop.
   * Schedule ourselves so the nested event loop will "see" remaining
   * completed requests and process them.  Without this, completion
   * callbacks that wait for other requests using a nested event loop
   * would hang forever.
   */

> +
> +    while ((s->event_max = s->io_q.in_flight) &&
> +           !io_uring_peek_cqe(&s->ring, &cqes)) {
> +        for (s->event_idx = 0; s->event_idx < s->event_max;) {
> +            io_uring_cqe_seen(&s->ring, cqes);

What is the purpose of event_max/event_idx given that we can consume
cqes via io_uring_peek_cqe() + io_uring_cqe_seen()?

I think just this will do:

  while (!io_uring_peek_cqe(&s->ring, &cqes)) {
      int ret = io_cqe_ret(cqes);

      /* We're done accessing cqes, replenish the cq ring */
      io_uring_cqe_seen(&s->ring, cqes);

> +
> +            LuringAIOCB *luringcb;
> +            luringcb = g_malloc0(sizeof(luringcb));

We need to fetch the original LuringAIOCB associated with this request
instead of allocating a new one.  The original LuringAIOCB contains the
coroutine and async completion callback function pointer that we need.

io_uring allows us to pass user data along with the sqe and it gets
returned in the cqe.

During request submission:

  io_uring_sqe_set_data(sqes, luringcb);

During request completion:

  LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);

> +            luringcb->ret = io_cqe_ret(cqes);
> +            /* Change counters one-by-one because we can be nested. */
> +            s->io_q.in_flight--;
> +            s->event_idx++;

We must invoke completion and possibly unref luringcb here:

  if (luringcb->co) {
      /* If the coroutine is already entered it must be in ioq_submit() and
       * will notice laio->ret has been filled in when it eventually runs
       * later.  Coroutines cannot be entered recursively so avoid doing
       * that!
       */
      if (!qemu_coroutine_entered(luringcb->co)) {
          aio_wake(luringcb->co);
      }
  } else {
      luringcb->common.cb(luringcb->common.opaque, luringcb->ret);
      qemu_aio_unref(luringcb);
  }

Without this code the request is never completed.  How are you testing
this code (you mentioned the guest boots)?  Is it possible that the
Linux AIO or thread-pool code path is being taken accidentally and we're
actually not running io_uring yet?

> +        }
> +    }
> +
> +    qemu_bh_cancel(s->completion_bh);
> +
> +    /*
> +     *If we are nested we have to notify the level above that we are done
> +     * by setting event_max to zero, upper level will then jump out of it's
> +     * own `for` loop.  If we are the last all counters dropped to zero.
> +     */
> +    s->event_max = 0;
> +    s->event_idx = 0;
> +}
> +
> +static void qemu_luring_process_completions_and_submit(LuringState *s)
> +{
> +    aio_context_acquire(s->aio_context);
> +    qemu_luring_process_completions(s);
> +
> +    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {

Please use in_queue instead of checking io_q.pending since there might
be requests in the sq ring that also need to be submitted.

> +        ioq_submit(s);
> +    }
> +    aio_context_release(s->aio_context);
> +}
> +
> +static void qemu_luring_completion_bh(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void qemu_luring_completion_cb(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static const AIOCBInfo luring_aiocb_info = {
> +    .aiocb_size         = sizeof(LuringAIOCB),
> +};
> +
> +
> +static void ioq_init(LuringQueue *io_q)
> +{
> +    QSIMPLEQ_INIT(&io_q->pending);
> +    io_q->plugged = 0;
> +    io_q->in_queue = 0;
> +    io_q->in_flight = 0;
> +    io_q->blocked = false;
> +}
> +
> +static void ioq_submit(LuringState *s)
> +{
> +    int ret, len;
> +    LuringAIOCB *luringcb;
> +    QSIMPLEQ_HEAD(, LuringAIOCB) completed;
> +
> +    while (s->io_q.in_flight >= MAX_EVENTS && s->io_q.in_queue) {

Try to submit requests as long as MAX_EVENTS has not been reached:

s/>=/</

By the way, is a loop necessary?  I don't see a scenario where we need
multiple iterations.  In fact, this is an infinite loop if the kernel
refuses to consume further requests for some reason.

> +        len = 0;
> +        QSIMPLEQ_FOREACH(luringcb, &s->io_q.pending, next) {
> +            if (s->io_q.in_flight + len++ >= MAX_EVENTS) {
> +                break;
> +            }

Can we rely on io_uring_get_sqe() failing when the sq ring is exhausted?
That way there's no need for this if statement.

> +            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +            if (sqes)  { /* Prep sqe for subission */
> +                memset(sqes, 0, sizeof(*sqes));

memset is unnecessary since the next statement overwrites all of sqes.

> +                *sqes = luringcb->sqeq;
> +                QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);

Careful, a special API is needed when you want to modify the queue
during iteration.  Please see QSIMPLEQ_FOREACH_SAFE().

> +            } else {
> +                break;
> +            }

The simpler form of this if statement is:

  if (!sqes) {
      break;
  }

  ...the success case without nesting...

This is just a style suggestion.  I find code easier to read without
nesting.  Feel free to leave it if you prefer your way.

> +        }
> +
> +        ret =  io_uring_submit(&s->ring);
> +        if (ret == -EAGAIN) {
> +            break;
> +        }

Actually in all error cases since we don't want to increment
in_flight/in_queue with -errno:

  if (ret < 0) {
      break;
  }

Please add a TODO comment for error handling, the error is currently
ignored and this could be a problem.  In the EAGAIN case the kernel is
unable to process more requests temporarily and we should try again
later.  But we don't try again later, so this could result in hung I/O.

> +
> +        s->io_q.in_flight += ret;
> +        s->io_q.in_queue  -= ret;
> +        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, luringcb, next, &completed);

Hmm...what is this doing?  QSIMPLEQ_REMOVE_HEAD() was already called
earlier on, so why modify the list again?

> +    }
> +    s->io_q.blocked = (s->io_q.in_queue > 0);
> +
> +    if (s->io_q.in_flight) {
> +        /*
> +         * We can try to complete something just right away if there are
> +         * still requests in-flight.
> +         */
> +        qemu_luring_process_completions(s);
> +    }
> +}
> +
> +void luring_io_plug(BlockDriverState *bs, LuringState *s)
> +{
> +    s->io_q.plugged++;
> +}
> +
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s)
> +{
> +    assert(s->io_q.plugged);
> +    if (--s->io_q.plugged == 0 &&
> +        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {

Remember to take into account requests in the sq ring, they also need to
be submitted:

s/!QSIMPLEQ_EMPTY(&s->io_q.pending)/s->io_q.in_queue > 0/

> +        ioq_submit(s);
> +    }
> +}
> +
> +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> +                            uint64_t offset, QEMUIOVector *qiov, int type)
> +{
> +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +    if (!sqes) {
> +        sqes = &luringcb->sqeq;
> +        QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, luringcb, next);
> +    }
> +
> +    switch (type) {
> +    case QEMU_AIO_WRITE:
> +        io_uring_prep_writev(sqes, fd, qiov->iov, qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_READ:
> +        io_uring_prep_readv(sqes, fd, qiov->iov, qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
> +        break;
> +    default:
> +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> +                        __func__, type);
> +        abort();
> +    }
> +
> +    s->io_q.in_queue++;

It's easy to think that "in_queue" is just the length of io_q.pending,
but that's incorrect.  "pending" and "in_queue" have different semantics
and it's a bit subtle:

 * The "pending" queue is only used when the sq ring is full.

 * The "in_queue" counter indicates the total number of requests on the
   io_q.pending list and in the sq_ring but not yet consumed by the
   kernel.

Changing the names would help.  Here is my suggestion, maybe you have a
better idea:

 * Call the list io_q.sq_overflow so it's clear these are requests that
   didn't fit into the sq ring.

 * Call the counter num_unsubmitted.  I don't know about this one, it's
   hard to find a good name :).  Maybe it can be left as in_queue.

> +    if (!s->io_q.blocked &&
> +        (!s->io_q.plugged ||
> +         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
> +        ioq_submit(s);
> +    }
> +
> +    return 0;
> +}
> +
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int type)
> +{
> +    int ret;
> +    LuringAIOCB luringcb = {
> +        .co         = qemu_coroutine_self(),
> +        .ret        = -EINPROGRESS,
> +    };
> +
> +    ret = luring_do_submit(fd, &luringcb, s, offset, qiov, type);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    if (luringcb.ret == -EINPROGRESS) {
> +        qemu_coroutine_yield();
> +    }
> +    return luringcb.ret;
> +}
> +
> +BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd,
> +        int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb,
> +        void *opaque, int type)
> +{
> +    LuringAIOCB *luringcb;
> +    off_t offset = sector_num * BDRV_SECTOR_SIZE;
> +    int ret;
> +
> +    luringcb = qemu_aio_get(&luring_aiocb_info, bs, cb, opaque);
> +    luringcb->ret = -EINPROGRESS;
> +    ret = luring_do_submit(fd, luringcb, s, offset, qiov, type);
> +    if (ret < 0) {
> +        qemu_aio_unref(luringcb);
> +        return NULL;
> +    }
> +
> +    return &luringcb->common;
> +}
> +
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context)
> +{
> +    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
> +                       &s);
> +    qemu_bh_delete(s->completion_bh);
> +    s->aio_context = NULL;
> +}
> +
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context)
> +{
> +    s->aio_context = new_context;
> +    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
> +    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
> +                       (IOHandler *)qemu_luring_completion_cb, NULL, NULL, &s);

Casting function pointers is suspicious because it often indicates
unportable code (it relies on assumptions about the calling convention
that the compiler is using).

Luckily this type cast seems unnecessary and can be dropped:

  static void qemu_luring_completion_cb(void *opaque)
  ==
  typedef void IOHandler(void *opaque);

> +}
> +
> +LuringState *luring_init(Error **errp)
> +{
> +    int rc;
> +    LuringState *s;
> +    s = g_malloc0(sizeof(*s));
> +    struct io_uring *ring = &s->ring;
> +    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
> +    if (rc == -1) {
> +        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
> +        goto out_close_efd;
> +    }
> +
> +    ioq_init(&s->io_q);
> +    return s;
> +
> +out_close_efd:

There is no eventfd so "efd" is outdated.  Given that there are no other
gotos, you could just inline this return code and avoid the goto
altogether.

> +    g_free(s);
> +    return NULL;
> +}
> +
> +void luring_cleanup(LuringState *s)
> +{
> +    io_uring_queue_exit(&s->ring);
> +    g_free(s);
> +}
> --
> 2.17.1
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index b8fc1e3fe3..770d562c6c 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2510,6 +2510,7 @@  R: Stefan Hajnoczi <stefan@redhat.com>
 L: qemu-block@nongnu.org
 S: Maintained
 F: stubs/io_uring.c
+F: block/io_uring.c


 L: qemu-block@nongnu.original
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 7a81892a52..262d413c6d 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -18,6 +18,7 @@  block-obj-y += block-backend.o snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
 block-obj-$(CONFIG_POSIX) += file-posix.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
+block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
 block-obj-y += null.o mirror.o commit.o io.o create.o
 block-obj-y += throttle-groups.o
 block-obj-$(CONFIG_LINUX) += nvme.o
@@ -61,5 +62,6 @@  block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
 dmg-lzfse.o-libs   := $(LZFSE_LIBS)
 qcow.o-libs        := -lz
 linux-aio.o-libs   := -laio
+io_uring.o-libs    := -luring
 parallels.o-cflags := $(LIBXML2_CFLAGS)
 parallels.o-libs   := $(LIBXML2_LIBS)
diff --git a/block/io_uring.c b/block/io_uring.c
new file mode 100644
index 0000000000..817ec055db
--- /dev/null
+++ b/block/io_uring.c
@@ -0,0 +1,306 @@ 
+/*
+ * Linux io_uring support.
+ *
+ * Copyright (C) 2009 IBM, Corp.
+ * Copyright (C) 2009 Red Hat, Inc.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include <liburing.h>
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+#include "block/aio.h"
+#include "qemu/queue.h"
+#include "block/block.h"
+#include "block/raw-aio.h"
+#include "qemu/event_notifier.h"
+#include "qemu/coroutine.h"
+#include "qapi/error.h"
+
+#define MAX_EVENTS 128
+
+typedef struct LuringAIOCB {
+    BlockAIOCB common;
+    Coroutine *co;
+    struct io_uring_sqe sqeq;
+    int ret;
+    QSIMPLEQ_ENTRY(LuringAIOCB) next;
+} LuringAIOCB;
+
+typedef struct LuringQueue {
+    int plugged;
+    unsigned int in_queue;
+    unsigned int in_flight;
+    bool blocked;
+    QSIMPLEQ_HEAD(, LuringAIOCB) pending;
+} LuringQueue;
+
+typedef struct LuringState {
+    AioContext *aio_context;
+
+    struct io_uring ring;
+
+    /* io queue for submit at batch.  Protected by AioContext lock. */
+    LuringQueue io_q;
+
+    /* I/O completion processing.  Only runs in I/O thread.  */
+    QEMUBH *completion_bh;
+    int event_idx;
+    int event_max;
+} LuringState;
+
+static void ioq_submit(LuringState *s);
+
+static inline int io_cqe_ret(struct io_uring_cqe *cqe)
+{
+    return cqe->res;
+}
+
+/**
+ * qemu_luring_process_completions:
+ * @s: AIO state
+ *
+ * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
+ *
+ */
+static void qemu_luring_process_completions(LuringState *s)
+{
+    struct io_uring_cqe *cqes;
+
+    qemu_bh_schedule(s->completion_bh);
+
+    while ((s->event_max = s->io_q.in_flight) &&
+           !io_uring_peek_cqe(&s->ring, &cqes)) {
+        for (s->event_idx = 0; s->event_idx < s->event_max;) {
+            io_uring_cqe_seen(&s->ring, cqes);
+
+            LuringAIOCB *luringcb;
+            luringcb = g_malloc0(sizeof(luringcb));
+            luringcb->ret = io_cqe_ret(cqes);
+            /* Change counters one-by-one because we can be nested. */
+            s->io_q.in_flight--;
+            s->event_idx++;
+        }
+    }
+
+    qemu_bh_cancel(s->completion_bh);
+
+    /*
+     *If we are nested we have to notify the level above that we are done
+     * by setting event_max to zero, upper level will then jump out of it's
+     * own `for` loop.  If we are the last all counters dropped to zero.
+     */
+    s->event_max = 0;
+    s->event_idx = 0;
+}
+
+static void qemu_luring_process_completions_and_submit(LuringState *s)
+{
+    aio_context_acquire(s->aio_context);
+    qemu_luring_process_completions(s);
+
+    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
+        ioq_submit(s);
+    }
+    aio_context_release(s->aio_context);
+}
+
+static void qemu_luring_completion_bh(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static void qemu_luring_completion_cb(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static const AIOCBInfo luring_aiocb_info = {
+    .aiocb_size         = sizeof(LuringAIOCB),
+};
+
+
+static void ioq_init(LuringQueue *io_q)
+{
+    QSIMPLEQ_INIT(&io_q->pending);
+    io_q->plugged = 0;
+    io_q->in_queue = 0;
+    io_q->in_flight = 0;
+    io_q->blocked = false;
+}
+
+static void ioq_submit(LuringState *s)
+{
+    int ret, len;
+    LuringAIOCB *luringcb;
+    QSIMPLEQ_HEAD(, LuringAIOCB) completed;
+
+    while (s->io_q.in_flight >= MAX_EVENTS && s->io_q.in_queue) {
+        len = 0;
+        QSIMPLEQ_FOREACH(luringcb, &s->io_q.pending, next) {
+            if (s->io_q.in_flight + len++ >= MAX_EVENTS) {
+                break;
+            }
+            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+            if (sqes)  { /* Prep sqe for subission */
+                memset(sqes, 0, sizeof(*sqes));
+                *sqes = luringcb->sqeq;
+                QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
+            } else {
+                break;
+            }
+        }
+
+        ret =  io_uring_submit(&s->ring);
+        if (ret == -EAGAIN) {
+            break;
+        }
+
+        s->io_q.in_flight += ret;
+        s->io_q.in_queue  -= ret;
+        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, luringcb, next, &completed);
+    }
+    s->io_q.blocked = (s->io_q.in_queue > 0);
+
+    if (s->io_q.in_flight) {
+        /*
+         * We can try to complete something just right away if there are
+         * still requests in-flight.
+         */
+        qemu_luring_process_completions(s);
+    }
+}
+
+void luring_io_plug(BlockDriverState *bs, LuringState *s)
+{
+    s->io_q.plugged++;
+}
+
+void luring_io_unplug(BlockDriverState *bs, LuringState *s)
+{
+    assert(s->io_q.plugged);
+    if (--s->io_q.plugged == 0 &&
+        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
+        ioq_submit(s);
+    }
+}
+
+static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
+                            uint64_t offset, QEMUIOVector *qiov, int type)
+{
+    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+    if (!sqes) {
+        sqes = &luringcb->sqeq;
+        QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, luringcb, next);
+    }
+
+    switch (type) {
+    case QEMU_AIO_WRITE:
+        io_uring_prep_writev(sqes, fd, qiov->iov, qiov->niov, offset);
+        break;
+    case QEMU_AIO_READ:
+        io_uring_prep_readv(sqes, fd, qiov->iov, qiov->niov, offset);
+        break;
+    case QEMU_AIO_FLUSH:
+        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
+        break;
+    default:
+        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
+                        __func__, type);
+        abort();
+    }
+
+    s->io_q.in_queue++;
+    if (!s->io_q.blocked &&
+        (!s->io_q.plugged ||
+         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
+        ioq_submit(s);
+    }
+
+    return 0;
+}
+
+int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
+                                uint64_t offset, QEMUIOVector *qiov, int type)
+{
+    int ret;
+    LuringAIOCB luringcb = {
+        .co         = qemu_coroutine_self(),
+        .ret        = -EINPROGRESS,
+    };
+
+    ret = luring_do_submit(fd, &luringcb, s, offset, qiov, type);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (luringcb.ret == -EINPROGRESS) {
+        qemu_coroutine_yield();
+    }
+    return luringcb.ret;
+}
+
+BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd,
+        int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb,
+        void *opaque, int type)
+{
+    LuringAIOCB *luringcb;
+    off_t offset = sector_num * BDRV_SECTOR_SIZE;
+    int ret;
+
+    luringcb = qemu_aio_get(&luring_aiocb_info, bs, cb, opaque);
+    luringcb->ret = -EINPROGRESS;
+    ret = luring_do_submit(fd, luringcb, s, offset, qiov, type);
+    if (ret < 0) {
+        qemu_aio_unref(luringcb);
+        return NULL;
+    }
+
+    return &luringcb->common;
+}
+
+void luring_detach_aio_context(LuringState *s, AioContext *old_context)
+{
+    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
+                       &s);
+    qemu_bh_delete(s->completion_bh);
+    s->aio_context = NULL;
+}
+
+void luring_attach_aio_context(LuringState *s, AioContext *new_context)
+{
+    s->aio_context = new_context;
+    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
+    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
+                       (IOHandler *)qemu_luring_completion_cb, NULL, NULL, &s);
+}
+
+LuringState *luring_init(Error **errp)
+{
+    int rc;
+    LuringState *s;
+    s = g_malloc0(sizeof(*s));
+    struct io_uring *ring = &s->ring;
+    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
+    if (rc == -1) {
+        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
+        goto out_close_efd;
+    }
+
+    ioq_init(&s->io_q);
+    return s;
+
+out_close_efd:
+    g_free(s);
+    return NULL;
+}
+
+void luring_cleanup(LuringState *s)
+{
+    io_uring_queue_exit(&s->ring);
+    g_free(s);
+}