diff mbox series

[v3,4/8] block/io_uring: implements interfaces for io_uring

Message ID 20190527080327.10780-5-mehta.aaru20@gmail.com
State New, archived
Headers show
Series Add support for io_uring | expand

Commit Message

Aarushi Mehta May 27, 2019, 8:03 a.m. UTC
Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
---
We need nested loops in ioq_submit because overflowed requests may be
permitted to submit if existing ones are cleared. Hence, failure to 
fulfill an overflow request must break separately from normal submission.

For now, to prevent any infinite loops, if the kernel fails to submit
for any reason, we break (ie when number of submissions is zero). 

Now this is tested with a  kali img with trace events to ensure it is 
actually running. The initramfs boots switched to threads.
 
 MAINTAINERS             |   7 +
 block/Makefile.objs     |   3 +
 block/io_uring.c        | 301 ++++++++++++++++++++++++++++++++++++++++
 include/block/aio.h     |  16 ++-
 include/block/raw-aio.h |  15 ++
 5 files changed, 341 insertions(+), 1 deletion(-)
 create mode 100644 block/io_uring.c

Comments

Stefan Hajnoczi May 27, 2019, 9:32 a.m. UTC | #1
On Mon, May 27, 2019 at 01:33:23PM +0530, Aarushi Mehta wrote:
> +static void qemu_luring_process_completions(LuringState *s)
> +{
> +    struct io_uring_cqe *cqes;
> +    /*
> +     * Request completion callbacks can run the nested event loop.
> +     * Schedule ourselves so the nested event loop will "see" remaining
> +     * completed requests and process them.  Without this, completion
> +     * callbacks that wait for other requests using a nested event loop
> +     * would hang forever.
> +     */
> +    qemu_bh_schedule(s->completion_bh);
> +
> +    while (!io_uring_peek_cqe(&s->ring, &cqes)) {
> +        io_uring_cqe_seen(&s->ring, cqes);

The kernel may overwrite the cqe once we've marked it seen.  Therefore
the cqe must only be marked seen after the last access to it.  This is
analogous to a use-after-free bug: we're not allowed to access fields of
an object after it has been freed.

The place to do so is...

> +
> +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> +        luringcb->ret = io_cqe_ret(cqes);

...here:

  io_uring_cqe_seen(&s->ring, cqes);
  cqes = NULL; /* returned to ring, don't access it anymore */

> +        if (luringcb->co) {
> +            /*
> +             * If the coroutine is already entered it must be in ioq_submit()
> +             * and will notice luringcb->ret has been filled in when it
> +             * eventually runs later. Coroutines cannot be entered recursively
> +             * so avoid doing that!
> +             */
> +            if (!qemu_coroutine_entered(luringcb->co)) {
> +                aio_co_wake(luringcb->co);
> +            }
> +        } else {
> +            luringcb->common.cb(luringcb->common.opaque, luringcb->ret);
> +            qemu_aio_unref(luringcb);
> +        }
> +        /* Change counters one-by-one because we can be nested. */
> +        s->io_q.in_flight--;

This counter must be decremented before invoking luringcb's callback.
That way the nested event loop doesn't consider this completed request
in flight anymore.

> +static void ioq_submit(LuringState *s)
> +{
> +    int ret;
> +    LuringAIOCB *luringcb, *luringcb_next;
> +
> +    while(!s->io_q.in_queue) {

Should this be while (s->io_q.in_queue > 0)?

> +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> +                              luringcb_next) {
> +            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +            if (!sqes) {
> +                break;
> +            }
> +            /* Prep sqe for submission */
> +            *sqes = luringcb->sqeq;
> +            io_uring_sqe_set_data(sqes, luringcb);

This is unnecessary, the data field has already been set in
luring_do_submit() and copied to *sqes in the previous line.

> +BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd,
> +        int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb,
> +        void *opaque, int type)
> +{
> +    LuringAIOCB *luringcb;
> +    off_t offset = sector_num * BDRV_SECTOR_SIZE;
> +    int ret;
> +
> +    luringcb = qemu_aio_get(&luring_aiocb_info, bs, cb, opaque);
> +    luringcb->ret = -EINPROGRESS;

luringcb isn't zeroed by qemu_aio_get().  luringcb->co must be
explicitly set to NULL to prevent undefined behavior in
qemu_luring_process_completions() (uninitialized memory access).

  luring->co = NULL;

By the way, this bug originates from linux-aio.c.  I have sent a patch
to fix it there!

> +    ret = luring_do_submit(fd, luringcb, s, offset, qiov, type);
> +    if (ret < 0) {
> +        qemu_aio_unref(luringcb);
> +        return NULL;
> +    }
> +
> +    return &luringcb->common;
> +}
> +
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context)
> +{
> +    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
> +                       &s);
> +    qemu_bh_delete(s->completion_bh);
> +    s->aio_context = NULL;
> +}
> +
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context)
> +{
> +    s->aio_context = new_context;
> +    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
> +    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
> +                       qemu_luring_completion_cb, NULL, NULL, &s);
> +}
> +
> +LuringState *luring_init(Error **errp)
> +{
> +    int rc;
> +    LuringState *s;
> +    s = g_malloc0(sizeof(*s));
> +    struct io_uring *ring = &s->ring;
> +    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
> +    if (rc == -1) {
> +        error_setg_errno(errp, -rc, "failed to init linux io_uring ring");

Why was this changed from error_setg_errno(errp, errno, "failed to init
linux io_uring ring") to -rc in v3?

rc is -1 here, not an errno value, so the error message will be
incorrect.
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 3cacd751bf..462c00a021 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2504,6 +2504,13 @@  F: block/file-posix.c
 F: block/file-win32.c
 F: block/win32-aio.c
 
+Linux io_uring
+M: Aarushi Mehta <mehta.aaru20@gmail.com>
+R: Stefan Hajnoczi <stefan@redhat.com>
+L: qemu-block@nongnu.org
+S: Maintained
+F: block/io_uring.c
+
 qcow2
 M: Kevin Wolf <kwolf@redhat.com>
 M: Max Reitz <mreitz@redhat.com>
diff --git a/block/Makefile.objs b/block/Makefile.objs
index 7a81892a52..348a003af5 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -18,6 +18,7 @@  block-obj-y += block-backend.o snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
 block-obj-$(CONFIG_POSIX) += file-posix.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
+block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
 block-obj-y += null.o mirror.o commit.o io.o create.o
 block-obj-y += throttle-groups.o
 block-obj-$(CONFIG_LINUX) += nvme.o
@@ -61,5 +62,7 @@  block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
 dmg-lzfse.o-libs   := $(LZFSE_LIBS)
 qcow.o-libs        := -lz
 linux-aio.o-libs   := -laio
+io_uring.o-cflags  := $(LINUX_IO_URING_CFLAGS)
+io_uring.o-libs    := $(LINUX_IO_URING_LIBS)
 parallels.o-cflags := $(LIBXML2_CFLAGS)
 parallels.o-libs   := $(LIBXML2_LIBS)
diff --git a/block/io_uring.c b/block/io_uring.c
new file mode 100644
index 0000000000..2a8c48a7dc
--- /dev/null
+++ b/block/io_uring.c
@@ -0,0 +1,301 @@ 
+/*
+ * Linux io_uring support.
+ *
+ * Copyright (C) 2009 IBM, Corp.
+ * Copyright (C) 2009 Red Hat, Inc.
+ * Copyright (C) 2019 Aarushi Mehta
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include <liburing.h>
+#include "qemu-common.h"
+#include "block/aio.h"
+#include "qemu/queue.h"
+#include "block/block.h"
+#include "block/raw-aio.h"
+#include "qemu/coroutine.h"
+#include "qapi/error.h"
+
+#define MAX_EVENTS 128
+
+typedef struct LuringAIOCB {
+    BlockAIOCB common;
+    Coroutine *co;
+    struct io_uring_sqe sqeq;
+    int ret;
+    QSIMPLEQ_ENTRY(LuringAIOCB) next;
+} LuringAIOCB;
+
+typedef struct LuringQueue {
+    int plugged;
+    unsigned int in_queue;
+    unsigned int in_flight;
+    bool blocked;
+    QSIMPLEQ_HEAD(, LuringAIOCB) sq_overflow;
+} LuringQueue;
+
+typedef struct LuringState {
+    AioContext *aio_context;
+
+    struct io_uring ring;
+
+    /* io queue for submit at batch.  Protected by AioContext lock. */
+    LuringQueue io_q;
+
+    /* I/O completion processing.  Only runs in I/O thread.  */
+    QEMUBH *completion_bh;
+} LuringState;
+
+static void ioq_submit(LuringState *s);
+
+static inline int io_cqe_ret(struct io_uring_cqe *cqe)
+{
+    return cqe->res;
+}
+
+/**
+ * qemu_luring_process_completions:
+ * @s: AIO state
+ *
+ * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
+ *
+ */
+static void qemu_luring_process_completions(LuringState *s)
+{
+    struct io_uring_cqe *cqes;
+    /*
+     * Request completion callbacks can run the nested event loop.
+     * Schedule ourselves so the nested event loop will "see" remaining
+     * completed requests and process them.  Without this, completion
+     * callbacks that wait for other requests using a nested event loop
+     * would hang forever.
+     */
+    qemu_bh_schedule(s->completion_bh);
+
+    while (!io_uring_peek_cqe(&s->ring, &cqes)) {
+        io_uring_cqe_seen(&s->ring, cqes);
+
+        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
+        luringcb->ret = io_cqe_ret(cqes);
+        if (luringcb->co) {
+            /*
+             * If the coroutine is already entered it must be in ioq_submit()
+             * and will notice luringcb->ret has been filled in when it
+             * eventually runs later. Coroutines cannot be entered recursively
+             * so avoid doing that!
+             */
+            if (!qemu_coroutine_entered(luringcb->co)) {
+                aio_co_wake(luringcb->co);
+            }
+        } else {
+            luringcb->common.cb(luringcb->common.opaque, luringcb->ret);
+            qemu_aio_unref(luringcb);
+        }
+        /* Change counters one-by-one because we can be nested. */
+        s->io_q.in_flight--;
+    }
+    qemu_bh_cancel(s->completion_bh);
+}
+
+static void qemu_luring_process_completions_and_submit(LuringState *s)
+{
+    aio_context_acquire(s->aio_context);
+    qemu_luring_process_completions(s);
+
+    if (!s->io_q.plugged && !s->io_q.in_queue) {
+        ioq_submit(s);
+    }
+    aio_context_release(s->aio_context);
+}
+
+static void qemu_luring_completion_bh(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static void qemu_luring_completion_cb(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static const AIOCBInfo luring_aiocb_info = {
+    .aiocb_size         = sizeof(LuringAIOCB),
+};
+
+static void ioq_init(LuringQueue *io_q)
+{
+    QSIMPLEQ_INIT(&io_q->sq_overflow);
+    io_q->plugged = 0;
+    io_q->in_queue = 0;
+    io_q->in_flight = 0;
+    io_q->blocked = false;
+}
+
+static void ioq_submit(LuringState *s)
+{
+    int ret;
+    LuringAIOCB *luringcb, *luringcb_next;
+
+    while(!s->io_q.in_queue) {
+        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
+                              luringcb_next) {
+            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+            if (!sqes) {
+                break;
+            }
+            /* Prep sqe for submission */
+            *sqes = luringcb->sqeq;
+            io_uring_sqe_set_data(sqes, luringcb);
+            QSIMPLEQ_REMOVE_HEAD(&s->io_q.sq_overflow, next);
+        }
+        ret =  io_uring_submit(&s->ring);
+        if (ret <= 0) { 
+            /* TODO error handling */
+            break;
+        }
+        s->io_q.in_flight += ret;
+        s->io_q.in_queue  -= ret;
+    }
+    s->io_q.blocked = (s->io_q.in_queue > 0);
+
+    if (s->io_q.in_flight) {
+        /*
+         * We can try to complete something just right away if there are
+         * still requests in-flight.
+         */
+        qemu_luring_process_completions(s);
+    }
+}
+
+void luring_io_plug(BlockDriverState *bs, LuringState *s)
+{
+    s->io_q.plugged++;
+}
+
+void luring_io_unplug(BlockDriverState *bs, LuringState *s)
+{
+    assert(s->io_q.plugged);
+    if (--s->io_q.plugged == 0 &&
+        !s->io_q.blocked && s->io_q.in_queue > 0) {
+        ioq_submit(s);
+    }
+}
+
+static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
+                            uint64_t offset, QEMUIOVector *qiov, int type)
+{
+    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+    if (!sqes) {
+        sqes = &luringcb->sqeq;
+        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
+    }
+
+    switch (type) {
+    case QEMU_AIO_WRITE:
+        io_uring_prep_writev(sqes, fd, qiov->iov, qiov->niov, offset);
+        break;
+    case QEMU_AIO_READ:
+        io_uring_prep_readv(sqes, fd, qiov->iov, qiov->niov, offset);
+        break;
+    case QEMU_AIO_FLUSH:
+        io_uring_prep_fsync(sqes, fd, 0);
+        break;
+    default:
+        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
+                        __func__, type);
+        abort();
+    }
+    io_uring_sqe_set_data(sqes, luringcb);
+    s->io_q.in_queue++;
+
+    if (!s->io_q.blocked &&
+        (!s->io_q.plugged ||
+         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
+        ioq_submit(s);
+    }
+
+    return 0;
+}
+
+int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
+                                uint64_t offset, QEMUIOVector *qiov, int type)
+{
+    int ret;
+    LuringAIOCB luringcb = {
+        .co         = qemu_coroutine_self(),
+        .ret        = -EINPROGRESS,
+    };
+
+    ret = luring_do_submit(fd, &luringcb, s, offset, qiov, type);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (luringcb.ret == -EINPROGRESS) {
+        qemu_coroutine_yield();
+    }
+    return luringcb.ret;
+}
+
+BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd,
+        int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb,
+        void *opaque, int type)
+{
+    LuringAIOCB *luringcb;
+    off_t offset = sector_num * BDRV_SECTOR_SIZE;
+    int ret;
+
+    luringcb = qemu_aio_get(&luring_aiocb_info, bs, cb, opaque);
+    luringcb->ret = -EINPROGRESS;
+    ret = luring_do_submit(fd, luringcb, s, offset, qiov, type);
+    if (ret < 0) {
+        qemu_aio_unref(luringcb);
+        return NULL;
+    }
+
+    return &luringcb->common;
+}
+
+void luring_detach_aio_context(LuringState *s, AioContext *old_context)
+{
+    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
+                       &s);
+    qemu_bh_delete(s->completion_bh);
+    s->aio_context = NULL;
+}
+
+void luring_attach_aio_context(LuringState *s, AioContext *new_context)
+{
+    s->aio_context = new_context;
+    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
+    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
+                       qemu_luring_completion_cb, NULL, NULL, &s);
+}
+
+LuringState *luring_init(Error **errp)
+{
+    int rc;
+    LuringState *s;
+    s = g_malloc0(sizeof(*s));
+    struct io_uring *ring = &s->ring;
+    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
+    if (rc == -1) {
+        error_setg_errno(errp, -rc, "failed to init linux io_uring ring");
+        g_free(s);
+        return NULL;
+    }
+
+    ioq_init(&s->io_q);
+    return s;
+
+}
+
+void luring_cleanup(LuringState *s)
+{
+    io_uring_queue_exit(&s->ring);
+    g_free(s);
+}
diff --git a/include/block/aio.h b/include/block/aio.h
index 0ca25dfec6..9da3fd9793 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -50,6 +50,7 @@  typedef void IOHandler(void *opaque);
 struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
+struct LuringState;
 
 struct AioContext {
     GSource source;
@@ -118,11 +119,19 @@  struct AioContext {
     struct ThreadPool *thread_pool;
 
 #ifdef CONFIG_LINUX_AIO
-    /* State for native Linux AIO.  Uses aio_context_acquire/release for
+    /*
+     * State for native Linux AIO.  Uses aio_context_acquire/release for
      * locking.
      */
     struct LinuxAioState *linux_aio;
 #endif
+#ifdef CONFIG_LINUX_IO_URING
+    /*
+     * State for Linux io_uring.  Uses aio_context_acquire/release for
+     * locking.
+     */
+    struct LuringState *linux_io_uring;
+#endif
 
     /* TimerLists for calling timers - one per clock type.  Has its own
      * locking.
@@ -387,6 +396,11 @@  struct LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp);
 /* Return the LinuxAioState bound to this AioContext */
 struct LinuxAioState *aio_get_linux_aio(AioContext *ctx);
 
+/* Setup the LuringState bound to this AioContext */
+struct LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp);
+
+/* Return the LuringState bound to this AioContext */
+struct LuringState *aio_get_linux_io_uring(AioContext *ctx);
 /**
  * aio_timer_new_with_attrs:
  * @ctx: the aio context
diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h
index ba223dd1f1..28a836151e 100644
--- a/include/block/raw-aio.h
+++ b/include/block/raw-aio.h
@@ -58,6 +58,21 @@  void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
 void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
 void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s);
 #endif
+/* io_uring.c - Linux io_uring implementation */
+#ifdef CONFIG_LINUX_IO_URING
+typedef struct LuringState LuringState;
+LuringState *luring_init(Error **errp);
+void luring_cleanup(LuringState *s);
+int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
+                                uint64_t offset, QEMUIOVector *qiov, int type);
+BlockAIOCB *luring_submit(BlockDriverState *bs, LuringState *s, int fd,
+        int64_t sector_num, QEMUIOVector *qiov, BlockCompletionFunc *cb,
+        void *opaque, int type);
+void luring_detach_aio_context(LuringState *s, AioContext *old_context);
+void luring_attach_aio_context(LuringState *s, AioContext *new_context);
+void luring_io_plug(BlockDriverState *bs, LuringState *s);
+void luring_io_unplug(BlockDriverState *bs, LuringState *s);
+#endif
 
 #ifdef _WIN32
 typedef struct QEMUWin32AIOState QEMUWin32AIOState;