diff mbox series

[v5,04/12] block/io_uring: implements interfaces for io_uring

Message ID 20190610134905.22294-5-mehta.aaru20@gmail.com
State New, archived
Headers show
Series Add support for io_uring | expand

Commit Message

Aarushi Mehta June 10, 2019, 1:48 p.m. UTC
Aborts when sqe fails to be set as sqes cannot be returned to the ring.

Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
---
 MAINTAINERS             |   7 +
 block/Makefile.objs     |   3 +
 block/io_uring.c        | 314 ++++++++++++++++++++++++++++++++++++++++
 include/block/aio.h     |  16 +-
 include/block/raw-aio.h |  12 ++
 5 files changed, 351 insertions(+), 1 deletion(-)
 create mode 100644 block/io_uring.c

Comments

Fam Zheng June 11, 2019, 11:17 a.m. UTC | #1
On Mon, 06/10 19:18, Aarushi Mehta wrote:
> Aborts when sqe fails to be set as sqes cannot be returned to the ring.
> 
> Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
> ---
>  MAINTAINERS             |   7 +
>  block/Makefile.objs     |   3 +
>  block/io_uring.c        | 314 ++++++++++++++++++++++++++++++++++++++++
>  include/block/aio.h     |  16 +-
>  include/block/raw-aio.h |  12 ++
>  5 files changed, 351 insertions(+), 1 deletion(-)
>  create mode 100644 block/io_uring.c
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 7be1225415..49f896796e 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2516,6 +2516,13 @@ F: block/file-posix.c
>  F: block/file-win32.c
>  F: block/win32-aio.c
>  
> +Linux io_uring
> +M: Aarushi Mehta <mehta.aaru20@gmail.com>
> +R: Stefan Hajnoczi <stefan@redhat.com>
> +L: qemu-block@nongnu.org
> +S: Maintained
> +F: block/io_uring.c
> +
>  qcow2
>  M: Kevin Wolf <kwolf@redhat.com>
>  M: Max Reitz <mreitz@redhat.com>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index ae11605c9f..8fde7a23a5 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -18,6 +18,7 @@ block-obj-y += block-backend.o snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
>  block-obj-$(CONFIG_POSIX) += file-posix.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
> +block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
>  block-obj-y += null.o mirror.o commit.o io.o create.o
>  block-obj-y += throttle-groups.o
>  block-obj-$(CONFIG_LINUX) += nvme.o
> @@ -61,5 +62,7 @@ block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
>  dmg-lzfse.o-libs   := $(LZFSE_LIBS)
>  qcow.o-libs        := -lz
>  linux-aio.o-libs   := -laio
> +io_uring.o-cflags  := $(LINUX_IO_URING_CFLAGS)
> +io_uring.o-libs    := $(LINUX_IO_URING_LIBS)
>  parallels.o-cflags := $(LIBXML2_CFLAGS)
>  parallels.o-libs   := $(LIBXML2_LIBS)
> diff --git a/block/io_uring.c b/block/io_uring.c
> new file mode 100644
> index 0000000000..f327c7ef96
> --- /dev/null
> +++ b/block/io_uring.c
> @@ -0,0 +1,314 @@
> +/*
> + * Linux io_uring support.
> + *
> + * Copyright (C) 2009 IBM, Corp.
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2019 Aarushi Mehta
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#include "qemu/osdep.h"
> +#include <liburing.h>
> +#include "qemu-common.h"
> +#include "block/aio.h"
> +#include "qemu/queue.h"
> +#include "block/block.h"
> +#include "block/raw-aio.h"
> +#include "qemu/coroutine.h"
> +#include "qapi/error.h"
> +
> +#define MAX_EVENTS 128
> +
> +typedef struct LuringAIOCB {

I have to say it is a good name.

> +    Coroutine *co;
> +    struct io_uring_sqe sqeq;
> +    ssize_t ret;
> +    QEMUIOVector *qiov;
> +    bool is_read;
> +    QSIMPLEQ_ENTRY(LuringAIOCB) next;
> +} LuringAIOCB;
> +
> +typedef struct LuringQueue {
> +    int plugged;
> +    unsigned int in_queue;
> +    unsigned int in_flight;
> +    bool blocked;
> +    QSIMPLEQ_HEAD(, LuringAIOCB) sq_overflow;
> +} LuringQueue;
> +
> +typedef struct LuringState {
> +    AioContext *aio_context;
> +
> +    struct io_uring ring;
> +
> +    /* io queue for submit at batch.  Protected by AioContext lock. */
> +    LuringQueue io_q;
> +
> +    /* I/O completion processing.  Only runs in I/O thread.  */
> +    QEMUBH *completion_bh;
> +} LuringState;
> +
> +/**
> + * ioq_submit:
> + * @s: AIO state
> + *
> + * Queues pending sqes and submits them
> + *
> + */
> +static int ioq_submit(LuringState *s);
> +
> +/**
> + * qemu_luring_process_completions:
> + * @s: AIO state
> + *
> + * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
> + *
> + */
> +static void qemu_luring_process_completions(LuringState *s)
> +{
> +    struct io_uring_cqe *cqes;
> +    int ret;
> +
> +    /*
> +     * Request completion callbacks can run the nested event loop.
> +     * Schedule ourselves so the nested event loop will "see" remaining
> +     * completed requests and process them.  Without this, completion
> +     * callbacks that wait for other requests using a nested event loop
> +     * would hang forever.
> +     */
> +    qemu_bh_schedule(s->completion_bh);
> +
> +    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
> +        if (!cqes) {
> +            break;
> +        }
> +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> +        ret = cqes->res;

Declarations should be in the beginning of the code block.

> +
> +        if (ret == luringcb->qiov->size) {
> +            ret = 0;
> +        } else if (ret >= 0) {
> +            /* Short Read/Write */
> +            if (luringcb->is_read) {
> +                /* Read, pad with zeroes */
> +                qemu_iovec_memset(luringcb->qiov, ret, 0,
> +                luringcb->qiov->size - ret);

Should you check that (ret < luringcb->qiov->size) since ret is from external?

Either way, ret should be assigned 0, I think.

> +            } else {
> +                ret = -ENOSPC;;

s/;;/;/

> +            }
> +        }
> +        luringcb->ret = ret;
> +
> +        io_uring_cqe_seen(&s->ring, cqes);
> +        cqes = NULL;
> +        /* Change counters one-by-one because we can be nested. */
> +        s->io_q.in_flight--;
> +
> +        /*
> +         * If the coroutine is already entered it must be in ioq_submit()
> +         * and will notice luringcb->ret has been filled in when it
> +         * eventually runs later. Coroutines cannot be entered recursively
> +         * so avoid doing that!
> +         */
> +        if (!qemu_coroutine_entered(luringcb->co)) {
> +            aio_co_wake(luringcb->co);
> +        }
> +    }
> +    qemu_bh_cancel(s->completion_bh);
> +}
> +
> +static void qemu_luring_process_completions_and_submit(LuringState *s)
> +{
> +    aio_context_acquire(s->aio_context);
> +    qemu_luring_process_completions(s);
> +
> +    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +    aio_context_release(s->aio_context);
> +}
> +
> +static void qemu_luring_completion_bh(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void qemu_luring_completion_cb(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void ioq_init(LuringQueue *io_q)
> +{
> +    QSIMPLEQ_INIT(&io_q->sq_overflow);
> +    io_q->plugged = 0;
> +    io_q->in_queue = 0;
> +    io_q->in_flight = 0;
> +    io_q->blocked = false;
> +}
> +
> +static int ioq_submit(LuringState *s)
> +{
> +    int ret = 0;
> +    LuringAIOCB *luringcb, *luringcb_next;
> +
> +    while (s->io_q.in_queue > 0) {
> +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> +                              luringcb_next) {
> +            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +            if (!sqes) {
> +                break;
> +            }
> +            /* Prep sqe for submission */
> +            *sqes = luringcb->sqeq;
> +            QSIMPLEQ_REMOVE_HEAD(&s->io_q.sq_overflow, next);
> +        }
> +        ret =  io_uring_submit(&s->ring);

s/  / /

> +        /* Prevent infinite loop if submission is refused */
> +        if (ret <= 0) {
> +            if (ret == -EAGAIN) {
> +                continue;
> +            }
> +            break;
> +        }
> +        s->io_q.in_flight += ret;
> +        s->io_q.in_queue  -= ret;
> +    }
> +    s->io_q.blocked = (s->io_q.in_queue > 0);

I'm confused about s->io_q.blocked. ioq_submit is where it gets updated, but
if it becomes true, calling ioq_submit will be fenced. So how does it get
cleared?

> +
> +    if (s->io_q.in_flight) {
> +        /*
> +         * We can try to complete something just right away if there are
> +         * still requests in-flight.
> +         */
> +        qemu_luring_process_completions(s);
> +    }
> +    return ret;
> +}
> +
> +void luring_io_plug(BlockDriverState *bs, LuringState *s)
> +{
> +    s->io_q.plugged++;
> +}
> +
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s)
> +{
> +    assert(s->io_q.plugged);
> +    if (--s->io_q.plugged == 0 &&
> +        !s->io_q.blocked && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +}
> +
> +/**
> + * luring_do_submit:
> + * @fd: file descriptor for I/O
> + * @luringcb: AIO control block
> + * @s: AIO state
> + * @offset: offset for request
> + * @type: type of request
> + *
> + * Fetches sqes from ring, adds to pending queue and preps them
> + *
> + */
> +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> +                            uint64_t offset, int type)
> +{
> +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +    if (!sqes) {
> +        sqes = &luringcb->sqeq;
> +        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
> +    }
> +
> +    switch (type) {
> +    case QEMU_AIO_WRITE:
> +        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
> +                             luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_READ:
> +        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
> +                            luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        io_uring_prep_fsync(sqes, fd, 0);
> +        break;
> +    default:
> +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> +                        __func__, type);
> +        abort();
> +    }
> +    io_uring_sqe_set_data(sqes, luringcb);
> +    s->io_q.in_queue++;
> +
> +    if (!s->io_q.blocked &&
> +        (!s->io_q.plugged ||
> +         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
> +        return ioq_submit(s);
> +    }
> +    return 0;
> +}
> +
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int type)
> +{
> +    int ret;
> +    LuringAIOCB luringcb = {
> +        .co         = qemu_coroutine_self(),
> +        .ret        = -EINPROGRESS,
> +        .qiov       = qiov,
> +        .is_read    = (type == QEMU_AIO_READ),
> +    };
> +
> +    ret = luring_do_submit(fd, &luringcb, s, offset, type);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    if (luringcb.ret == -EINPROGRESS) {
> +        qemu_coroutine_yield();
> +    }
> +    return luringcb.ret;
> +}
> +
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context)
> +{
> +    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
> +                       s);
> +    qemu_bh_delete(s->completion_bh);
> +    s->aio_context = NULL;
> +}
> +
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context)
> +{
> +    s->aio_context = new_context;
> +    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
> +    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
> +                       qemu_luring_completion_cb, NULL, NULL, s);
> +}
> +
> +LuringState *luring_init(Error **errp)
> +{
> +    int rc;
> +    LuringState *s;
> +    s = g_malloc0(sizeof(*s));

You can use g_new0() to be more concise.

> +    struct io_uring *ring = &s->ring;
> +    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);

s/  / /

> +    if (rc < 0) {
> +        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
> +        g_free(s);
> +        return NULL;
> +    }
> +
> +    ioq_init(&s->io_q);
> +    return s;
> +
> +}
> +
> +void luring_cleanup(LuringState *s)
> +{
> +    io_uring_queue_exit(&s->ring);
> +    g_free(s);
> +}
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 0ca25dfec6..9da3fd9793 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -50,6 +50,7 @@ typedef void IOHandler(void *opaque);
>  struct Coroutine;
>  struct ThreadPool;
>  struct LinuxAioState;
> +struct LuringState;
>  
>  struct AioContext {
>      GSource source;
> @@ -118,11 +119,19 @@ struct AioContext {
>      struct ThreadPool *thread_pool;
>  
>  #ifdef CONFIG_LINUX_AIO
> -    /* State for native Linux AIO.  Uses aio_context_acquire/release for
> +    /*
> +     * State for native Linux AIO.  Uses aio_context_acquire/release for
>       * locking.
>       */
>      struct LinuxAioState *linux_aio;
>  #endif
> +#ifdef CONFIG_LINUX_IO_URING
> +    /*
> +     * State for Linux io_uring.  Uses aio_context_acquire/release for
> +     * locking.
> +     */
> +    struct LuringState *linux_io_uring;
> +#endif
>  
>      /* TimerLists for calling timers - one per clock type.  Has its own
>       * locking.
> @@ -387,6 +396,11 @@ struct LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp);
>  /* Return the LinuxAioState bound to this AioContext */
>  struct LinuxAioState *aio_get_linux_aio(AioContext *ctx);
>  
> +/* Setup the LuringState bound to this AioContext */
> +struct LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp);
> +
> +/* Return the LuringState bound to this AioContext */
> +struct LuringState *aio_get_linux_io_uring(AioContext *ctx);
>  /**
>   * aio_timer_new_with_attrs:
>   * @ctx: the aio context
> diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h
> index 0cb7cc74a2..71d7d1395f 100644
> --- a/include/block/raw-aio.h
> +++ b/include/block/raw-aio.h
> @@ -55,6 +55,18 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
>  void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
>  void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s);
>  #endif
> +/* io_uring.c - Linux io_uring implementation */
> +#ifdef CONFIG_LINUX_IO_URING
> +typedef struct LuringState LuringState;
> +LuringState *luring_init(Error **errp);
> +void luring_cleanup(LuringState *s);
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int type);
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context);
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context);
> +void luring_io_plug(BlockDriverState *bs, LuringState *s);
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s);
> +#endif
>  
>  #ifdef _WIN32
>  typedef struct QEMUWin32AIOState QEMUWin32AIOState;
> -- 
> 2.17.1
>
Stefan Hajnoczi June 12, 2019, 2:43 p.m. UTC | #2
On Tue, Jun 11, 2019 at 07:17:14PM +0800, Fam Zheng wrote:
> On Mon, 06/10 19:18, Aarushi Mehta wrote:
> > +        /* Prevent infinite loop if submission is refused */
> > +        if (ret <= 0) {
> > +            if (ret == -EAGAIN) {
> > +                continue;
> > +            }
> > +            break;
> > +        }
> > +        s->io_q.in_flight += ret;
> > +        s->io_q.in_queue  -= ret;
> > +    }
> > +    s->io_q.blocked = (s->io_q.in_queue > 0);
> 
> I'm confused about s->io_q.blocked. ioq_submit is where it gets updated, but
> if it becomes true, calling ioq_submit will be fenced. So how does it get
> cleared?

When blocked, additional I/O requests are not submitted until the next
completion.  See qemu_luring_process_completions_and_submit() for the
code path where ioq_submit() gets called again.

Stefan
Maxim Levitsky June 17, 2019, 12:26 p.m. UTC | #3
On Mon, 2019-06-10 at 19:18 +0530, Aarushi Mehta wrote:
> Aborts when sqe fails to be set as sqes cannot be returned to the ring.
> 
> Signed-off-by: Aarushi Mehta <mehta.aaru20@gmail.com>
> ---
>  MAINTAINERS             |   7 +
>  block/Makefile.objs     |   3 +
>  block/io_uring.c        | 314 ++++++++++++++++++++++++++++++++++++++++
>  include/block/aio.h     |  16 +-
>  include/block/raw-aio.h |  12 ++
>  5 files changed, 351 insertions(+), 1 deletion(-)
>  create mode 100644 block/io_uring.c
> 
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 7be1225415..49f896796e 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -2516,6 +2516,13 @@ F: block/file-posix.c
>  F: block/file-win32.c
>  F: block/win32-aio.c
>  
> +Linux io_uring
> +M: Aarushi Mehta <mehta.aaru20@gmail.com>
> +R: Stefan Hajnoczi <stefan@redhat.com>
> +L: qemu-block@nongnu.org
> +S: Maintained
> +F: block/io_uring.c
> +
>  qcow2
>  M: Kevin Wolf <kwolf@redhat.com>
>  M: Max Reitz <mreitz@redhat.com>
> diff --git a/block/Makefile.objs b/block/Makefile.objs
> index ae11605c9f..8fde7a23a5 100644
> --- a/block/Makefile.objs
> +++ b/block/Makefile.objs
> @@ -18,6 +18,7 @@ block-obj-y += block-backend.o snapshot.o qapi.o
>  block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
>  block-obj-$(CONFIG_POSIX) += file-posix.o
>  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
> +block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
>  block-obj-y += null.o mirror.o commit.o io.o create.o
>  block-obj-y += throttle-groups.o
>  block-obj-$(CONFIG_LINUX) += nvme.o
> @@ -61,5 +62,7 @@ block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
>  dmg-lzfse.o-libs   := $(LZFSE_LIBS)
>  qcow.o-libs        := -lz
>  linux-aio.o-libs   := -laio
> +io_uring.o-cflags  := $(LINUX_IO_URING_CFLAGS)
> +io_uring.o-libs    := $(LINUX_IO_URING_LIBS)
>  parallels.o-cflags := $(LIBXML2_CFLAGS)
>  parallels.o-libs   := $(LIBXML2_LIBS)
> diff --git a/block/io_uring.c b/block/io_uring.c
> new file mode 100644
> index 0000000000..f327c7ef96
> --- /dev/null
> +++ b/block/io_uring.c
> @@ -0,0 +1,314 @@
> +/*
> + * Linux io_uring support.
> + *
> + * Copyright (C) 2009 IBM, Corp.
> + * Copyright (C) 2009 Red Hat, Inc.
> + * Copyright (C) 2019 Aarushi Mehta
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#include "qemu/osdep.h"
> +#include <liburing.h>
> +#include "qemu-common.h"
> +#include "block/aio.h"
> +#include "qemu/queue.h"
> +#include "block/block.h"
> +#include "block/raw-aio.h"
> +#include "qemu/coroutine.h"
> +#include "qapi/error.h"
> +
> +#define MAX_EVENTS 128
> +
> +typedef struct LuringAIOCB {
> +    Coroutine *co;
> +    struct io_uring_sqe sqeq;
> +    ssize_t ret;
> +    QEMUIOVector *qiov;
> +    bool is_read;
> +    QSIMPLEQ_ENTRY(LuringAIOCB) next;
> +} LuringAIOCB;
> +
> +typedef struct LuringQueue {
> +    int plugged;
> +    unsigned int in_queue;
> +    unsigned int in_flight;
> +    bool blocked;
> +    QSIMPLEQ_HEAD(, LuringAIOCB) sq_overflow;
> +} LuringQueue;
> +
> +typedef struct LuringState {
> +    AioContext *aio_context;
> +
> +    struct io_uring ring;
> +
> +    /* io queue for submit at batch.  Protected by AioContext lock. */
> +    LuringQueue io_q;
> +
> +    /* I/O completion processing.  Only runs in I/O thread.  */
> +    QEMUBH *completion_bh;
> +} LuringState;
> +
> +/**
> + * ioq_submit:
> + * @s: AIO state
> + *
> + * Queues pending sqes and submits them
> + *
> + */
> +static int ioq_submit(LuringState *s);
> +
> +/**
> + * qemu_luring_process_completions:
> + * @s: AIO state
> + *
> + * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
> + *
> + */
> +static void qemu_luring_process_completions(LuringState *s)
> +{
> +    struct io_uring_cqe *cqes;
> +    int ret;
> +
> +    /*
> +     * Request completion callbacks can run the nested event loop.
> +     * Schedule ourselves so the nested event loop will "see" remaining
> +     * completed requests and process them.  Without this, completion
> +     * callbacks that wait for other requests using a nested event loop
> +     * would hang forever.
> +     */

About that qemu_bh_schedule
The code is copied from linux-aio.c where it was added with the below commit.

Author: Stefan Hajnoczi <stefanha@redhat.com>
Date:   Mon Aug 4 16:56:33 2014 +0100

    linux-aio: avoid deadlock in nested aio_poll() calls
    
    If two Linux AIO request completions are fetched in the same
    io_getevents() call, QEMU will deadlock if request A's callback waits
    for request B to complete using an aio_poll() loop.  This was reported
    to happen with the mirror blockjob.
    
    This patch moves completion processing into a BH and makes it resumable.
    Nested event loops can resume completion processing so that request B
    will complete and the deadlock will not occur.
    
    Cc: Kevin Wolf <kwolf@redhat.com>
    Cc: Paolo Bonzini <pbonzini@redhat.com>
    Cc: Ming Lei <ming.lei@canonical.com>
    Cc: Marcin Gibuła <m.gibula@beyond.pl>
    Reported-by: Marcin Gibuła <m.gibula@beyond.pl>
    Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
    Tested-by: Marcin Gibuła <m.gibula@beyond.pl>


I kind of opened a Pandora box by researching that area suspecting that the same treatment is needed in other block drivers,
but after all, this is correct behaviour, and this is why:

The reason that the bottom half workaround is needed in linux-aio is because aio uses an eventfd which just notifies it
of the completions once, thus if the co-routine which handles the response does aio_poll, the same fd won't be returned again,
at least unless more events are received which is not guaranteed.


Here in io_uring, I think the same would happen. Looking at the kernel source I see that poll implementation uses 'poll_wait' which is basically
a wait queue which is woken up when new completion events are added to the uring, thus attempting to poll again on the same uring fd will indeed block,
even if there are events not yet processed.

For all other leaf block drivers (drivers that access the data, rather that forward the requests to another block driver), they are all networking based, 
thus they poll the communication socket.
When the same situation occurs the nested aio_poll will notice that the socket still has data and thus run the corresponding co-routine, thus preventing the deadlock.

I think that the two above comments should be added to the source in some way to document this so that next guy after me won't need to spend time understanding this.


BTW, nvme userspace driver also solves this issue by not entering the co-routine directly from aio fd handler,  but doing that from a bottom half which the handler schedules. 
This works because the nested aio_poll will
run the bottom halves again, but I suspect that this adds overhead that could be avoided. I'll look at that later.


> +    qemu_bh_schedule(s->completion_bh);
> +
> +    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {


Maybe consider using io_uring_for_each_cqe and then io_uring_cq_advance
to avoid acking one ring entry at the time ? 
However this probably will break the nesting.

> +        if (!cqes) {
> +            break;
> +        }
> +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> +        ret = cqes->res;
> +
> +        if (ret == luringcb->qiov->size) {
> +            ret = 0;
> +        } else if (ret >= 0) {


You should very carefully check the allowed return values here.

It looks like you can get '-EINTR' here, which would ask you to rerun the read operation, and otherwise
you will get the number of bytes read, which might be less that what was asked for, which implies that you
need to retry the read operation with the remainder of the buffer rather that zero the end of the buffer IMHO 

(0 is returned on EOF according to 'read' semantics, which I think are used here, thus a short read might not be an EOF)


Looking at linux-aio.c though I do see that it just passes through the returned value with no special treatments. 
including lack of check for -EINTR.

I assume that since aio is linux specific, and it only supports direct IO, it happens
to have assumption of no short reads/-EINTR (but since libaio has very sparse documentation I can't verify this)

On the other hand the aio=threads implementation actually does everything as specified on the 'write' manpage,
retrying the reads on -EINTR, and doing additional reads if less that required number of bytes were read.

Looking at io_uring implementation in the kernel I see that it does support synchronous (non O_DIRECT mode), 
and in this case, it goes through the same ->read_iter which is pretty much the same path that 
regular read() takes and so it might return short reads and or -EINTR.


> +            /* Short Read/Write */
> +            if (luringcb->is_read) {
> +                /* Read, pad with zeroes */
> +                qemu_iovec_memset(luringcb->qiov, ret, 0,
> +                luringcb->qiov->size - ret);
> +            } else {
> +                ret = -ENOSPC;;
> +            }
> +        }
> +        luringcb->ret = ret;
> +
> +        io_uring_cqe_seen(&s->ring, cqes);
> +        cqes = NULL;
> +        /* Change counters one-by-one because we can be nested. */
> +        s->io_q.in_flight--;
> +
> +        /*
> +         * If the coroutine is already entered it must be in ioq_submit()
> +         * and will notice luringcb->ret has been filled in when it
> +         * eventually runs later. Coroutines cannot be entered recursively
> +         * so avoid doing that!
> +         */
> +        if (!qemu_coroutine_entered(luringcb->co)) {
> +            aio_co_wake(luringcb->co);
> +        }
> +    }
> +    qemu_bh_cancel(s->completion_bh);
> +}
> +
> +static void qemu_luring_process_completions_and_submit(LuringState *s)
> +{
> +    aio_context_acquire(s->aio_context);
> +    qemu_luring_process_completions(s);
> +
> +    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +    aio_context_release(s->aio_context);
> +}
> +
> +static void qemu_luring_completion_bh(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void qemu_luring_completion_cb(void *opaque)
> +{
> +    LuringState *s = opaque;
> +    qemu_luring_process_completions_and_submit(s);
> +}
> +
> +static void ioq_init(LuringQueue *io_q)
> +{
> +    QSIMPLEQ_INIT(&io_q->sq_overflow);
> +    io_q->plugged = 0;
> +    io_q->in_queue = 0;
> +    io_q->in_flight = 0;
> +    io_q->blocked = false;
> +}
> +
> +static int ioq_submit(LuringState *s)
> +{
> +    int ret = 0;
> +    LuringAIOCB *luringcb, *luringcb_next;
> +
> +    while (s->io_q.in_queue > 0) {
> +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> +                              luringcb_next) {

I am torn about the 'sq_overflow' name. it seems to me that its not immediately clear that these
are the requests that are waiting because the io uring got full, but I can't now think of a better name.

Maybe add a comment here to explain what is going on here?

Also maybe we could somehow utilize the plug/unplug facility to avoid reaching that state in first place?
Maybe the block layer has some kind of 'max outstanding requests' limit that could be used?

In my nvme-mdev I opted to not process the input queues when such a condition is detected, but here you can't as the block layer
pretty much calls you to process the requests.


> +            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +            if (!sqes) {
> +                break;
> +            }
> +            /* Prep sqe for submission */
> +            *sqes = luringcb->sqeq;
> +            QSIMPLEQ_REMOVE_HEAD(&s->io_q.sq_overflow, next);
> +        }
> +        ret =  io_uring_submit(&s->ring);
> +        /* Prevent infinite loop if submission is refused */
> +        if (ret <= 0) {
> +            if (ret == -EAGAIN) {
> +                continue;
> +            }
> +            break;
> +        }
> +        s->io_q.in_flight += ret;
> +        s->io_q.in_queue  -= ret;
> +    }
> +    s->io_q.blocked = (s->io_q.in_queue > 0);
> +
> +    if (s->io_q.in_flight) {
> +        /*
> +         * We can try to complete something just right away if there are
> +         * still requests in-flight.
> +         */
> +        qemu_luring_process_completions(s);
> +    }
> +    return ret;
> +}
> +
> +void luring_io_plug(BlockDriverState *bs, LuringState *s)
> +{
> +    s->io_q.plugged++;
> +}
> +
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s)
> +{
> +    assert(s->io_q.plugged);
> +    if (--s->io_q.plugged == 0 &&
> +        !s->io_q.blocked && s->io_q.in_queue > 0) {
> +        ioq_submit(s);
> +    }
> +}
> +
> +/**
> + * luring_do_submit:
> + * @fd: file descriptor for I/O
> + * @luringcb: AIO control block
> + * @s: AIO state
> + * @offset: offset for request
> + * @type: type of request
> + *
> + * Fetches sqes from ring, adds to pending queue and preps them
> + *
> + */
> +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> +                            uint64_t offset, int type)
> +{
> +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> +    if (!sqes) {
> +        sqes = &luringcb->sqeq;
> +        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
> +    }
> +
> +    switch (type) {
> +    case QEMU_AIO_WRITE:
> +        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
> +                             luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_READ:
> +        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
> +                            luringcb->qiov->niov, offset);
> +        break;
> +    case QEMU_AIO_FLUSH:
> +        io_uring_prep_fsync(sqes, fd, 0);
> +        break;
> +    default:
> +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> +                        __func__, type);

Nitpick: Don't we use some king of error printing functions like 'error_setg' rather that fprintf?


> +        abort();
> +    }
> +    io_uring_sqe_set_data(sqes, luringcb);
> +    s->io_q.in_queue++;
> +
> +    if (!s->io_q.blocked &&
> +        (!s->io_q.plugged ||
> +         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
> +        return ioq_submit(s);
> +    }
> +    return 0;
> +}
> +
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int type)
> +{
> +    int ret;
> +    LuringAIOCB luringcb = {
> +        .co         = qemu_coroutine_self(),
> +        .ret        = -EINPROGRESS,
> +        .qiov       = qiov,
> +        .is_read    = (type == QEMU_AIO_READ),
> +    };
> +
> +    ret = luring_do_submit(fd, &luringcb, s, offset, type);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    if (luringcb.ret == -EINPROGRESS) {
> +        qemu_coroutine_yield();
> +    }
> +    return luringcb.ret;
> +}
> +
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context)
> +{
> +    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
> +                       s);
> +    qemu_bh_delete(s->completion_bh);
> +    s->aio_context = NULL;
> +}
> +
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context)
> +{
> +    s->aio_context = new_context;
> +    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
> +    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
> +                       qemu_luring_completion_cb, NULL, NULL, s);
> +}
> +
> +LuringState *luring_init(Error **errp)
> +{
> +    int rc;
> +    LuringState *s;
> +    s = g_malloc0(sizeof(*s));
> +    struct io_uring *ring = &s->ring;
> +    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
> +    if (rc < 0) {
> +        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
> +        g_free(s);
> +        return NULL;
> +    }
> +



> +    ioq_init(&s->io_q);

Another nitpick, maybe inline that function as it is used just here?
(that will save the static declaration upfront as well)
Feel free to leave this as is, if you think this way it is clearer.

> +    return s;
> +
> +}
> +
> +void luring_cleanup(LuringState *s)
> +{
> +    io_uring_queue_exit(&s->ring);
> +    g_free(s);
> +}
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 0ca25dfec6..9da3fd9793 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -50,6 +50,7 @@ typedef void IOHandler(void *opaque);
>  struct Coroutine;
>  struct ThreadPool;
>  struct LinuxAioState;
> +struct LuringState;
>  
>  struct AioContext {
>      GSource source;
> @@ -118,11 +119,19 @@ struct AioContext {
>      struct ThreadPool *thread_pool;
>  
>  #ifdef CONFIG_LINUX_AIO
> -    /* State for native Linux AIO.  Uses aio_context_acquire/release for
> +    /*
> +     * State for native Linux AIO.  Uses aio_context_acquire/release for
>       * locking.
>       */
>      struct LinuxAioState *linux_aio;
>  #endif
> +#ifdef CONFIG_LINUX_IO_URING
> +    /*
> +     * State for Linux io_uring.  Uses aio_context_acquire/release for
> +     * locking.
> +     */
> +    struct LuringState *linux_io_uring;
> +#endif
>  
>      /* TimerLists for calling timers - one per clock type.  Has its own
>       * locking.
> @@ -387,6 +396,11 @@ struct LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp);
>  /* Return the LinuxAioState bound to this AioContext */
>  struct LinuxAioState *aio_get_linux_aio(AioContext *ctx);
>  
> +/* Setup the LuringState bound to this AioContext */
> +struct LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp);
> +
> +/* Return the LuringState bound to this AioContext */
> +struct LuringState *aio_get_linux_io_uring(AioContext *ctx);
>  /**
>   * aio_timer_new_with_attrs:
>   * @ctx: the aio context
> diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h
> index 0cb7cc74a2..71d7d1395f 100644
> --- a/include/block/raw-aio.h
> +++ b/include/block/raw-aio.h
> @@ -55,6 +55,18 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
>  void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
>  void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s);
>  #endif
> +/* io_uring.c - Linux io_uring implementation */
> +#ifdef CONFIG_LINUX_IO_URING
> +typedef struct LuringState LuringState;
> +LuringState *luring_init(Error **errp);
> +void luring_cleanup(LuringState *s);
> +int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
> +                                uint64_t offset, QEMUIOVector *qiov, int type);
> +void luring_detach_aio_context(LuringState *s, AioContext *old_context);
> +void luring_attach_aio_context(LuringState *s, AioContext *new_context);
> +void luring_io_plug(BlockDriverState *bs, LuringState *s);
> +void luring_io_unplug(BlockDriverState *bs, LuringState *s);
> +#endif
>  
>  #ifdef _WIN32
>  typedef struct QEMUWin32AIOState QEMUWin32AIOState;


I plan on this or next week to do some benchmarks of the code and I will share the results as soon
as I do them.

Please pardon me if I made some mistakes in the review because most of the qemu is new for me,
so I don't yet know well most of the stuff here.

Best regards,
	Maxim Levitsky
Stefan Hajnoczi June 19, 2019, 10:14 a.m. UTC | #4
On Mon, Jun 17, 2019 at 03:26:50PM +0300, Maxim Levitsky wrote:
> On Mon, 2019-06-10 at 19:18 +0530, Aarushi Mehta wrote:
> > +        if (!cqes) {
> > +            break;
> > +        }
> > +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> > +        ret = cqes->res;
> > +
> > +        if (ret == luringcb->qiov->size) {
> > +            ret = 0;
> > +        } else if (ret >= 0) {
> 
> 
> You should very carefully check the allowed return values here.
> 
> It looks like you can get '-EINTR' here, which would ask you to rerun the read operation, and otherwise
> you will get the number of bytes read, which might be less that what was asked for, which implies that you
> need to retry the read operation with the remainder of the buffer rather that zero the end of the buffer IMHO 
> 
> (0 is returned on EOF according to 'read' semantics, which I think are used here, thus a short read might not be an EOF)
> 
> 
> Looking at linux-aio.c though I do see that it just passes through the returned value with no special treatments. 
> including lack of check for -EINTR.
> 
> I assume that since aio is linux specific, and it only supports direct IO, it happens
> to have assumption of no short reads/-EINTR (but since libaio has very sparse documentation I can't verify this)
> 
> On the other hand the aio=threads implementation actually does everything as specified on the 'write' manpage,
> retrying the reads on -EINTR, and doing additional reads if less that required number of bytes were read.
> 
> Looking at io_uring implementation in the kernel I see that it does support synchronous (non O_DIRECT mode), 
> and in this case, it goes through the same ->read_iter which is pretty much the same path that 
> regular read() takes and so it might return short reads and or -EINTR.

Interesting point.  Investigating EINTR should at least be a TODO
comment and needs to be resolved before io_uring lands in a QEMU
release.

> > +static int ioq_submit(LuringState *s)
> > +{
> > +    int ret = 0;
> > +    LuringAIOCB *luringcb, *luringcb_next;
> > +
> > +    while (s->io_q.in_queue > 0) {
> > +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> > +                              luringcb_next) {
> 
> I am torn about the 'sq_overflow' name. it seems to me that its not immediately clear that these
> are the requests that are waiting because the io uring got full, but I can't now think of a better name.
> 
> Maybe add a comment here to explain what is going on here?

Hmm...I suggested this name because I thought it was clear.  But the
fact that it puzzled you proves it wasn't clear :-).

Can anyone think of a better name?  It's the queue we keep in QEMU to
hold requests while the io_uring sq ring is full.

> Also maybe we could somehow utilize the plug/unplug facility to avoid reaching that state in first place?
> Maybe the block layer has some kind of 'max outstanding requests' limit that could be used?
> 
> In my nvme-mdev I opted to not process the input queues when such a condition is detected, but here you can't as the block layer
> pretty much calls you to process the requests.

Block layer callers are allowed to submit as many I/O requests as they
like and there is no feedback mechanism.  It's up to linux-aio.c and
io_uring.c to handle the case where host kernel I/O submission resources
are exhausted.

Plug/unplug is a batching performance optimization to reduce the number
of io_uring_enter() calls but it does not stop the callers from
submitting more I/O requests.  So plug/unplug isn't directly applicable
here.

> > +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> > +                            uint64_t offset, int type)
> > +{
> > +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> > +    if (!sqes) {
> > +        sqes = &luringcb->sqeq;
> > +        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
> > +    }
> > +
> > +    switch (type) {
> > +    case QEMU_AIO_WRITE:
> > +        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
> > +                             luringcb->qiov->niov, offset);
> > +        break;
> > +    case QEMU_AIO_READ:
> > +        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
> > +                            luringcb->qiov->niov, offset);
> > +        break;
> > +    case QEMU_AIO_FLUSH:
> > +        io_uring_prep_fsync(sqes, fd, 0);
> > +        break;
> > +    default:
> > +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> > +                        __func__, type);
> 
> Nitpick: Don't we use some king of error printing functions like 'error_setg' rather that fprintf?

Here we're not in a context where an Error object can be returned (e.g.
printed by the QMP monitor).  We only have an errno return value that
the emulated storage controller may squash down further to a single
EIO-type error code.

'type' is a QEMU-internal value so the default case is basically
assert(false); /* we should never get here */

For these reasons the fprintf() seems okay here.

> I plan on this or next week to do some benchmarks of the code and I will share the results as soon
> as I do them.

Excellent, Aarushi has been benchmarking too.  Perhaps you can share the
QEMU command-line and fio configuration so that the results can be
compared.

Stefan
Maxim Levitsky June 19, 2019, 10:47 a.m. UTC | #5
On Wed, 2019-06-19 at 11:14 +0100, Stefan Hajnoczi wrote:
> On Mon, Jun 17, 2019 at 03:26:50PM +0300, Maxim Levitsky wrote:
> > On Mon, 2019-06-10 at 19:18 +0530, Aarushi Mehta wrote:
> > > +        if (!cqes) {
> > > +            break;
> > > +        }
> > > +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> > > +        ret = cqes->res;
> > > +
> > > +        if (ret == luringcb->qiov->size) {
> > > +            ret = 0;
> > > +        } else if (ret >= 0) {
> > 
> > 
> > You should very carefully check the allowed return values here.
> > 
> > It looks like you can get '-EINTR' here, which would ask you to rerun the read operation, and otherwise
> > you will get the number of bytes read, which might be less that what was asked for, which implies that you
> > need to retry the read operation with the remainder of the buffer rather that zero the end of the buffer IMHO 
> > 
> > (0 is returned on EOF according to 'read' semantics, which I think are used here, thus a short read might not be an EOF)
> > 
> > 
> > Looking at linux-aio.c though I do see that it just passes through the returned value with no special treatments. 
> > including lack of check for -EINTR.
> > 
> > I assume that since aio is linux specific, and it only supports direct IO, it happens
> > to have assumption of no short reads/-EINTR (but since libaio has very sparse documentation I can't verify this)
> > 
> > On the other hand the aio=threads implementation actually does everything as specified on the 'write' manpage,
> > retrying the reads on -EINTR, and doing additional reads if less that required number of bytes were read.
> > 
> > Looking at io_uring implementation in the kernel I see that it does support synchronous (non O_DIRECT mode), 
> > and in this case, it goes through the same ->read_iter which is pretty much the same path that 
> > regular read() takes and so it might return short reads and or -EINTR.
> 
> Interesting point.  Investigating EINTR should at least be a TODO
> comment and needs to be resolved before io_uring lands in a QEMU
> release.
> 
> > > +static int ioq_submit(LuringState *s)
> > > +{
> > > +    int ret = 0;
> > > +    LuringAIOCB *luringcb, *luringcb_next;
> > > +
> > > +    while (s->io_q.in_queue > 0) {
> > > +        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
> > > +                              luringcb_next) {
> > 
> > I am torn about the 'sq_overflow' name. it seems to me that its not immediately clear that these
> > are the requests that are waiting because the io uring got full, but I can't now think of a better name.
> > 
> > Maybe add a comment here to explain what is going on here?
> 
> Hmm...I suggested this name because I thought it was clear.  But the
> fact that it puzzled you proves it wasn't clear :-).
> 
> Can anyone think of a better name?  It's the queue we keep in QEMU to
> hold requests while the io_uring sq ring is full.
> 
> > Also maybe we could somehow utilize the plug/unplug facility to avoid reaching that state in first place?
> > Maybe the block layer has some kind of 'max outstanding requests' limit that could be used?
> > 
> > In my nvme-mdev I opted to not process the input queues when such a condition is detected, but here you can't as the block layer
> > pretty much calls you to process the requests.
> 
> Block layer callers are allowed to submit as many I/O requests as they
> like and there is no feedback mechanism.  It's up to linux-aio.c and
> io_uring.c to handle the case where host kernel I/O submission resources
> are exhausted.
> 
> Plug/unplug is a batching performance optimization to reduce the number
> of io_uring_enter() calls but it does not stop the callers from
> submitting more I/O requests.  So plug/unplug isn't directly applicable
> here.

Thanks for the explanation! I guess we can leave that name as is, but add some comment or so
in the place where the queue is accessed.



> 
> > > +static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
> > > +                            uint64_t offset, int type)
> > > +{
> > > +    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
> > > +    if (!sqes) {
> > > +        sqes = &luringcb->sqeq;
> > > +        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
> > > +    }
> > > +
> > > +    switch (type) {
> > > +    case QEMU_AIO_WRITE:
> > > +        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
> > > +                             luringcb->qiov->niov, offset);
> > > +        break;
> > > +    case QEMU_AIO_READ:
> > > +        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
> > > +                            luringcb->qiov->niov, offset);
> > > +        break;
> > > +    case QEMU_AIO_FLUSH:
> > > +        io_uring_prep_fsync(sqes, fd, 0);
> > > +        break;
> > > +    default:
> > > +        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
> > > +                        __func__, type);
> > 
> > Nitpick: Don't we use some king of error printing functions like 'error_setg' rather that fprintf?
> 
> Here we're not in a context where an Error object can be returned (e.g.
> printed by the QMP monitor).  We only have an errno return value that
> the emulated storage controller may squash down further to a single
> EIO-type error code.
> 
> 'type' is a QEMU-internal value so the default case is basically
> assert(false); /* we should never get here */
> 
> For these reasons the fprintf() seems okay here.
All right then.

> 
> > I plan on this or next week to do some benchmarks of the code and I will share the results as soon
> > as I do them.
> 
> Excellent, Aarushi has been benchmarking too.  Perhaps you can share the
> QEMU command-line and fio configuration so that the results can be
> compared.

I'll do that soon. I already did some initial benchmarks (didn't save the results yet), and in the current state the io_uring is a bit, tiny bit slower
that libaio, which is nothing wrong because io_uring has lot of benefits in addition to performance, plus the current code can be optimized futher.

I'll post all the benchmarks I am doing once again now, as soon as I have them.

Best regards,
	Maxim Levitsky
Stefan Hajnoczi June 22, 2019, 3:10 p.m. UTC | #6
On Mon, Jun 17, 2019 at 1:27 PM Maxim Levitsky <mlevitsk@redhat.com> wrote:
> On Mon, 2019-06-10 at 19:18 +0530, Aarushi Mehta wrote:
> > +        if (!cqes) {
> > +            break;
> > +        }
> > +        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
> > +        ret = cqes->res;
> > +
> > +        if (ret == luringcb->qiov->size) {
> > +            ret = 0;
> > +        } else if (ret >= 0) {
>
>
> You should very carefully check the allowed return values here.
>
> It looks like you can get '-EINTR' here, which would ask you to rerun the read operation, and otherwise
> you will get the number of bytes read, which might be less that what was asked for, which implies that you
> need to retry the read operation with the remainder of the buffer rather that zero the end of the buffer IMHO
>
> (0 is returned on EOF according to 'read' semantics, which I think are used here, thus a short read might not be an EOF)
>
>
> Looking at linux-aio.c though I do see that it just passes through the returned value with no special treatments.
> including lack of check for -EINTR.
>
> I assume that since aio is linux specific, and it only supports direct IO, it happens
> to have assumption of no short reads/-EINTR (but since libaio has very sparse documentation I can't verify this)
>
> On the other hand the aio=threads implementation actually does everything as specified on the 'write' manpage,
> retrying the reads on -EINTR, and doing additional reads if less that required number of bytes were read.
>
> Looking at io_uring implementation in the kernel I see that it does support synchronous (non O_DIRECT mode),
> and in this case, it goes through the same ->read_iter which is pretty much the same path that
> regular read() takes and so it might return short reads and or -EINTR.

Thanks, Maxim.  I've confirmed that -EINTR needs to be handled based
on fs/io_uring.c.  We need to get a new sqe (since the old one is no
longer usable) and submit the request again.  There are no ordering
guarantees between pending requests so doing this is permissible.

Stefan
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 7be1225415..49f896796e 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -2516,6 +2516,13 @@  F: block/file-posix.c
 F: block/file-win32.c
 F: block/win32-aio.c
 
+Linux io_uring
+M: Aarushi Mehta <mehta.aaru20@gmail.com>
+R: Stefan Hajnoczi <stefan@redhat.com>
+L: qemu-block@nongnu.org
+S: Maintained
+F: block/io_uring.c
+
 qcow2
 M: Kevin Wolf <kwolf@redhat.com>
 M: Max Reitz <mreitz@redhat.com>
diff --git a/block/Makefile.objs b/block/Makefile.objs
index ae11605c9f..8fde7a23a5 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -18,6 +18,7 @@  block-obj-y += block-backend.o snapshot.o qapi.o
 block-obj-$(CONFIG_WIN32) += file-win32.o win32-aio.o
 block-obj-$(CONFIG_POSIX) += file-posix.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
+block-obj-$(CONFIG_LINUX_IO_URING) += io_uring.o
 block-obj-y += null.o mirror.o commit.o io.o create.o
 block-obj-y += throttle-groups.o
 block-obj-$(CONFIG_LINUX) += nvme.o
@@ -61,5 +62,7 @@  block-obj-$(if $(CONFIG_LZFSE),m,n) += dmg-lzfse.o
 dmg-lzfse.o-libs   := $(LZFSE_LIBS)
 qcow.o-libs        := -lz
 linux-aio.o-libs   := -laio
+io_uring.o-cflags  := $(LINUX_IO_URING_CFLAGS)
+io_uring.o-libs    := $(LINUX_IO_URING_LIBS)
 parallels.o-cflags := $(LIBXML2_CFLAGS)
 parallels.o-libs   := $(LIBXML2_LIBS)
diff --git a/block/io_uring.c b/block/io_uring.c
new file mode 100644
index 0000000000..f327c7ef96
--- /dev/null
+++ b/block/io_uring.c
@@ -0,0 +1,314 @@ 
+/*
+ * Linux io_uring support.
+ *
+ * Copyright (C) 2009 IBM, Corp.
+ * Copyright (C) 2009 Red Hat, Inc.
+ * Copyright (C) 2019 Aarushi Mehta
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include "qemu/osdep.h"
+#include <liburing.h>
+#include "qemu-common.h"
+#include "block/aio.h"
+#include "qemu/queue.h"
+#include "block/block.h"
+#include "block/raw-aio.h"
+#include "qemu/coroutine.h"
+#include "qapi/error.h"
+
+#define MAX_EVENTS 128
+
+typedef struct LuringAIOCB {
+    Coroutine *co;
+    struct io_uring_sqe sqeq;
+    ssize_t ret;
+    QEMUIOVector *qiov;
+    bool is_read;
+    QSIMPLEQ_ENTRY(LuringAIOCB) next;
+} LuringAIOCB;
+
+typedef struct LuringQueue {
+    int plugged;
+    unsigned int in_queue;
+    unsigned int in_flight;
+    bool blocked;
+    QSIMPLEQ_HEAD(, LuringAIOCB) sq_overflow;
+} LuringQueue;
+
+typedef struct LuringState {
+    AioContext *aio_context;
+
+    struct io_uring ring;
+
+    /* io queue for submit at batch.  Protected by AioContext lock. */
+    LuringQueue io_q;
+
+    /* I/O completion processing.  Only runs in I/O thread.  */
+    QEMUBH *completion_bh;
+} LuringState;
+
+/**
+ * ioq_submit:
+ * @s: AIO state
+ *
+ * Queues pending sqes and submits them
+ *
+ */
+static int ioq_submit(LuringState *s);
+
+/**
+ * qemu_luring_process_completions:
+ * @s: AIO state
+ *
+ * Fetches completed I/O requests, consumes cqes and invokes their callbacks.
+ *
+ */
+static void qemu_luring_process_completions(LuringState *s)
+{
+    struct io_uring_cqe *cqes;
+    int ret;
+
+    /*
+     * Request completion callbacks can run the nested event loop.
+     * Schedule ourselves so the nested event loop will "see" remaining
+     * completed requests and process them.  Without this, completion
+     * callbacks that wait for other requests using a nested event loop
+     * would hang forever.
+     */
+    qemu_bh_schedule(s->completion_bh);
+
+    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
+        if (!cqes) {
+            break;
+        }
+        LuringAIOCB *luringcb = io_uring_cqe_get_data(cqes);
+        ret = cqes->res;
+
+        if (ret == luringcb->qiov->size) {
+            ret = 0;
+        } else if (ret >= 0) {
+            /* Short Read/Write */
+            if (luringcb->is_read) {
+                /* Read, pad with zeroes */
+                qemu_iovec_memset(luringcb->qiov, ret, 0,
+                luringcb->qiov->size - ret);
+            } else {
+                ret = -ENOSPC;;
+            }
+        }
+        luringcb->ret = ret;
+
+        io_uring_cqe_seen(&s->ring, cqes);
+        cqes = NULL;
+        /* Change counters one-by-one because we can be nested. */
+        s->io_q.in_flight--;
+
+        /*
+         * If the coroutine is already entered it must be in ioq_submit()
+         * and will notice luringcb->ret has been filled in when it
+         * eventually runs later. Coroutines cannot be entered recursively
+         * so avoid doing that!
+         */
+        if (!qemu_coroutine_entered(luringcb->co)) {
+            aio_co_wake(luringcb->co);
+        }
+    }
+    qemu_bh_cancel(s->completion_bh);
+}
+
+static void qemu_luring_process_completions_and_submit(LuringState *s)
+{
+    aio_context_acquire(s->aio_context);
+    qemu_luring_process_completions(s);
+
+    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
+        ioq_submit(s);
+    }
+    aio_context_release(s->aio_context);
+}
+
+static void qemu_luring_completion_bh(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static void qemu_luring_completion_cb(void *opaque)
+{
+    LuringState *s = opaque;
+    qemu_luring_process_completions_and_submit(s);
+}
+
+static void ioq_init(LuringQueue *io_q)
+{
+    QSIMPLEQ_INIT(&io_q->sq_overflow);
+    io_q->plugged = 0;
+    io_q->in_queue = 0;
+    io_q->in_flight = 0;
+    io_q->blocked = false;
+}
+
+static int ioq_submit(LuringState *s)
+{
+    int ret = 0;
+    LuringAIOCB *luringcb, *luringcb_next;
+
+    while (s->io_q.in_queue > 0) {
+        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.sq_overflow, next,
+                              luringcb_next) {
+            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+            if (!sqes) {
+                break;
+            }
+            /* Prep sqe for submission */
+            *sqes = luringcb->sqeq;
+            QSIMPLEQ_REMOVE_HEAD(&s->io_q.sq_overflow, next);
+        }
+        ret =  io_uring_submit(&s->ring);
+        /* Prevent infinite loop if submission is refused */
+        if (ret <= 0) {
+            if (ret == -EAGAIN) {
+                continue;
+            }
+            break;
+        }
+        s->io_q.in_flight += ret;
+        s->io_q.in_queue  -= ret;
+    }
+    s->io_q.blocked = (s->io_q.in_queue > 0);
+
+    if (s->io_q.in_flight) {
+        /*
+         * We can try to complete something just right away if there are
+         * still requests in-flight.
+         */
+        qemu_luring_process_completions(s);
+    }
+    return ret;
+}
+
+void luring_io_plug(BlockDriverState *bs, LuringState *s)
+{
+    s->io_q.plugged++;
+}
+
+void luring_io_unplug(BlockDriverState *bs, LuringState *s)
+{
+    assert(s->io_q.plugged);
+    if (--s->io_q.plugged == 0 &&
+        !s->io_q.blocked && s->io_q.in_queue > 0) {
+        ioq_submit(s);
+    }
+}
+
+/**
+ * luring_do_submit:
+ * @fd: file descriptor for I/O
+ * @luringcb: AIO control block
+ * @s: AIO state
+ * @offset: offset for request
+ * @type: type of request
+ *
+ * Fetches sqes from ring, adds to pending queue and preps them
+ *
+ */
+static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
+                            uint64_t offset, int type)
+{
+    struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
+    if (!sqes) {
+        sqes = &luringcb->sqeq;
+        QSIMPLEQ_INSERT_TAIL(&s->io_q.sq_overflow, luringcb, next);
+    }
+
+    switch (type) {
+    case QEMU_AIO_WRITE:
+        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
+                             luringcb->qiov->niov, offset);
+        break;
+    case QEMU_AIO_READ:
+        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
+                            luringcb->qiov->niov, offset);
+        break;
+    case QEMU_AIO_FLUSH:
+        io_uring_prep_fsync(sqes, fd, 0);
+        break;
+    default:
+        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
+                        __func__, type);
+        abort();
+    }
+    io_uring_sqe_set_data(sqes, luringcb);
+    s->io_q.in_queue++;
+
+    if (!s->io_q.blocked &&
+        (!s->io_q.plugged ||
+         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
+        return ioq_submit(s);
+    }
+    return 0;
+}
+
+int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
+                                uint64_t offset, QEMUIOVector *qiov, int type)
+{
+    int ret;
+    LuringAIOCB luringcb = {
+        .co         = qemu_coroutine_self(),
+        .ret        = -EINPROGRESS,
+        .qiov       = qiov,
+        .is_read    = (type == QEMU_AIO_READ),
+    };
+
+    ret = luring_do_submit(fd, &luringcb, s, offset, type);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (luringcb.ret == -EINPROGRESS) {
+        qemu_coroutine_yield();
+    }
+    return luringcb.ret;
+}
+
+void luring_detach_aio_context(LuringState *s, AioContext *old_context)
+{
+    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
+                       s);
+    qemu_bh_delete(s->completion_bh);
+    s->aio_context = NULL;
+}
+
+void luring_attach_aio_context(LuringState *s, AioContext *new_context)
+{
+    s->aio_context = new_context;
+    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
+    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
+                       qemu_luring_completion_cb, NULL, NULL, s);
+}
+
+LuringState *luring_init(Error **errp)
+{
+    int rc;
+    LuringState *s;
+    s = g_malloc0(sizeof(*s));
+    struct io_uring *ring = &s->ring;
+    rc =  io_uring_queue_init(MAX_EVENTS, ring, 0);
+    if (rc < 0) {
+        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
+        g_free(s);
+        return NULL;
+    }
+
+    ioq_init(&s->io_q);
+    return s;
+
+}
+
+void luring_cleanup(LuringState *s)
+{
+    io_uring_queue_exit(&s->ring);
+    g_free(s);
+}
diff --git a/include/block/aio.h b/include/block/aio.h
index 0ca25dfec6..9da3fd9793 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -50,6 +50,7 @@  typedef void IOHandler(void *opaque);
 struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
+struct LuringState;
 
 struct AioContext {
     GSource source;
@@ -118,11 +119,19 @@  struct AioContext {
     struct ThreadPool *thread_pool;
 
 #ifdef CONFIG_LINUX_AIO
-    /* State for native Linux AIO.  Uses aio_context_acquire/release for
+    /*
+     * State for native Linux AIO.  Uses aio_context_acquire/release for
      * locking.
      */
     struct LinuxAioState *linux_aio;
 #endif
+#ifdef CONFIG_LINUX_IO_URING
+    /*
+     * State for Linux io_uring.  Uses aio_context_acquire/release for
+     * locking.
+     */
+    struct LuringState *linux_io_uring;
+#endif
 
     /* TimerLists for calling timers - one per clock type.  Has its own
      * locking.
@@ -387,6 +396,11 @@  struct LinuxAioState *aio_setup_linux_aio(AioContext *ctx, Error **errp);
 /* Return the LinuxAioState bound to this AioContext */
 struct LinuxAioState *aio_get_linux_aio(AioContext *ctx);
 
+/* Setup the LuringState bound to this AioContext */
+struct LuringState *aio_setup_linux_io_uring(AioContext *ctx, Error **errp);
+
+/* Return the LuringState bound to this AioContext */
+struct LuringState *aio_get_linux_io_uring(AioContext *ctx);
 /**
  * aio_timer_new_with_attrs:
  * @ctx: the aio context
diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h
index 0cb7cc74a2..71d7d1395f 100644
--- a/include/block/raw-aio.h
+++ b/include/block/raw-aio.h
@@ -55,6 +55,18 @@  void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context);
 void laio_io_plug(BlockDriverState *bs, LinuxAioState *s);
 void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s);
 #endif
+/* io_uring.c - Linux io_uring implementation */
+#ifdef CONFIG_LINUX_IO_URING
+typedef struct LuringState LuringState;
+LuringState *luring_init(Error **errp);
+void luring_cleanup(LuringState *s);
+int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
+                                uint64_t offset, QEMUIOVector *qiov, int type);
+void luring_detach_aio_context(LuringState *s, AioContext *old_context);
+void luring_attach_aio_context(LuringState *s, AioContext *new_context);
+void luring_io_plug(BlockDriverState *bs, LuringState *s);
+void luring_io_unplug(BlockDriverState *bs, LuringState *s);
+#endif
 
 #ifdef _WIN32
 typedef struct QEMUWin32AIOState QEMUWin32AIOState;