diff mbox

[04/11] aio: introduce aio_co_schedule

Message ID 1460719926-12950-5-git-send-email-pbonzini@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Paolo Bonzini April 15, 2016, 11:31 a.m. UTC
This provides the infrastructure to start a coroutine on a remote
AioContext.  It will be used by CoMutex and CoQueue, so that
coroutines don't jump from one context to another when they
go to sleep on a mutex or waitqueue.

aio_co_schedule is based on a lock-free multiple-producer,
single-consumer queue.  The multiple producers use cmpxchg to add
to a LIFO stacks.  The consumer (a per-AioContext bottom half) grabs
all items added so far, inverts the list to make it FIFO, and goes
through it one item at a time.

Most of the changes really are in infrastructure and tests.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 async.c                      |  38 ++++++++
 include/block/aio.h          |   9 ++
 include/qemu/coroutine_int.h |  10 +-
 tests/Makefile               |  11 ++-
 tests/iothread.c             |  91 ++++++++++++++++++
 tests/iothread.h             |  25 +++++
 tests/test-aio-multithread.c | 220 +++++++++++++++++++++++++++++++++++++++++++
 trace-events                 |   4 +
 8 files changed, 404 insertions(+), 4 deletions(-)
 create mode 100644 tests/iothread.c
 create mode 100644 tests/iothread.h
 create mode 100644 tests/test-aio-multithread.c

Comments

Stefan Hajnoczi April 19, 2016, 2:31 p.m. UTC | #1
On Fri, Apr 15, 2016 at 01:31:59PM +0200, Paolo Bonzini wrote:
> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>      }
>  #endif
>  
> +    qemu_bh_delete(ctx->schedule_bh);

Please include an assertion that the scheduled coroutines list is empty.

> +
>      qemu_lockcnt_lock(&ctx->list_lock);
>      assert(!qemu_lockcnt_count(&ctx->list_lock));
>      while (ctx->first_bh) {
> @@ -335,6 +339,28 @@ static void event_notifier_dummy_cb(EventNotifier *e)
>  {
>  }
>  
> +static void schedule_bh_cb(void *opaque)
> +{
> +    AioContext *ctx = opaque;
> +    QSLIST_HEAD(, Coroutine) straight, reversed;
> +
> +    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
> +    QSLIST_INIT(&straight);
> +
> +    while (!QSLIST_EMPTY(&reversed)) {
> +        Coroutine *co = QSLIST_FIRST(&reversed);
> +        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
> +        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
> +    }
> +
> +    while (!QSLIST_EMPTY(&straight)) {
> +        Coroutine *co = QSLIST_FIRST(&straight);
> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
> +        trace_aio_schedule_bh_cb(ctx, co);
> +        qemu_coroutine_enter(co, NULL);
> +    }
> +}

This construct brings to mind the use-after-free case when a scheduled
coroutine terminates before it is entered by this loop:

There are two scheduled Coroutines: A and B.  During
qemu_coroutine_enter(A) we enter B.  B then terminates by returning from
its main function.  Once A yields or terminates we still try to enter
the freed B coroutine.

Unfortunately I don't think we have good debugging or an assertion for
this bug.  I'm sure it will occur at some point...   Please document
that the coroutine must not be entered by anyone else while
aio_co_schedule() is active.
Fam Zheng April 29, 2016, 5:11 a.m. UTC | #2
On Fri, 04/15 13:31, Paolo Bonzini wrote:
> This provides the infrastructure to start a coroutine on a remote
> AioContext.  It will be used by CoMutex and CoQueue, so that
> coroutines don't jump from one context to another when they
> go to sleep on a mutex or waitqueue.
> 
> aio_co_schedule is based on a lock-free multiple-producer,
> single-consumer queue.  The multiple producers use cmpxchg to add
> to a LIFO stacks.  The consumer (a per-AioContext bottom half) grabs

s/stacks/stack/ ?

> all items added so far, inverts the list to make it FIFO, and goes
> through it one item at a time.
> 
> Most of the changes really are in infrastructure and tests.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  async.c                      |  38 ++++++++
>  include/block/aio.h          |   9 ++
>  include/qemu/coroutine_int.h |  10 +-
>  tests/Makefile               |  11 ++-
>  tests/iothread.c             |  91 ++++++++++++++++++
>  tests/iothread.h             |  25 +++++
>  tests/test-aio-multithread.c | 220 +++++++++++++++++++++++++++++++++++++++++++
>  trace-events                 |   4 +
>  8 files changed, 404 insertions(+), 4 deletions(-)
>  create mode 100644 tests/iothread.c
>  create mode 100644 tests/iothread.h
>  create mode 100644 tests/test-aio-multithread.c
> 
> diff --git a/async.c b/async.c
> index acd3627..ef8b409 100644
> --- a/async.c
> +++ b/async.c
> @@ -30,6 +30,8 @@
>  #include "qemu/main-loop.h"
>  #include "qemu/atomic.h"
>  #include "block/raw-aio.h"
> +#include "trace/generated-tracers.h"
> +#include "qemu/coroutine_int.h"
>  
>  /***********************************************************/
>  /* bottom halves (can be seen as timers which expire ASAP) */
> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>      }
>  #endif
>  
> +    qemu_bh_delete(ctx->schedule_bh);
> +
>      qemu_lockcnt_lock(&ctx->list_lock);
>      assert(!qemu_lockcnt_count(&ctx->list_lock));
>      while (ctx->first_bh) {
> @@ -335,6 +339,28 @@ static void event_notifier_dummy_cb(EventNotifier *e)
>  {
>  }
>  
> +static void schedule_bh_cb(void *opaque)
> +{
> +    AioContext *ctx = opaque;
> +    QSLIST_HEAD(, Coroutine) straight, reversed;
> +
> +    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
> +    QSLIST_INIT(&straight);
> +
> +    while (!QSLIST_EMPTY(&reversed)) {
> +        Coroutine *co = QSLIST_FIRST(&reversed);
> +        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
> +        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
> +    }
> +
> +    while (!QSLIST_EMPTY(&straight)) {
> +        Coroutine *co = QSLIST_FIRST(&straight);
> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
> +        trace_aio_schedule_bh_cb(ctx, co);
> +        qemu_coroutine_enter(co, NULL);
> +    }
> +}
> +
>  AioContext *aio_context_new(Error **errp)
>  {
>      int ret;
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 8f55d1a..0a344c3 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -46,6 +46,7 @@ typedef struct AioHandler AioHandler;
>  typedef void QEMUBHFunc(void *opaque);
>  typedef void IOHandler(void *opaque);
>  
> +struct Coroutine;
>  struct ThreadPool;
>  struct LinuxAioState;
>  
> @@ -107,6 +108,9 @@ struct AioContext {
>      bool notified;
>      EventNotifier notifier;
>  
> +    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
> +    QEMUBH *schedule_bh;

Maybe rename it to co_schedule_bh to reflect it's used for schedule coroutines?

> +
>      /* Thread pool for performing work and receiving completion callbacks.
>       * Has its own locking.
>       */

> +int main(int argc, char **argv)
> +{
> +    init_clocks();
> +
> +    g_test_init(&argc, &argv, NULL);
> +    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
> +    if (g_test_quick()) {
> +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
> +    } else {
> +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);

Should they use different path names, like /aio/multi/schedule/{1,10}?

> +    }
> +    return g_test_run();
> +}

Fam
Paolo Bonzini May 17, 2016, 2:38 p.m. UTC | #3
On 29/04/2016 07:11, Fam Zheng wrote:
> > +int main(int argc, char **argv)
> > +{
> > +    init_clocks();
> > +
> > +    g_test_init(&argc, &argv, NULL);
> > +    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
> > +    if (g_test_quick()) {
> > +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
> > +    } else {
> > +        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
> 
> Should they use different path names, like /aio/multi/schedule/{1,10}?

rcutorture doesn't.  I guess it might be changed too, but I don't think
it's too important since the only difference is the duration.

I've applied all your other suggestions.

Paolo
Paolo Bonzini May 17, 2016, 2:57 p.m. UTC | #4
On 19/04/2016 16:31, Stefan Hajnoczi wrote:
> On Fri, Apr 15, 2016 at 01:31:59PM +0200, Paolo Bonzini wrote:
>> @@ -255,6 +257,8 @@ aio_ctx_finalize(GSource     *source)
>>      }
>>  #endif
>>  
>> +    qemu_bh_delete(ctx->schedule_bh);
> 
> Please include an assertion that the scheduled coroutines list is empty.

Good idea.

>> +    while (!QSLIST_EMPTY(&straight)) {
>> +        Coroutine *co = QSLIST_FIRST(&straight);
>> +        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
>> +        trace_aio_schedule_bh_cb(ctx, co);
>> +        qemu_coroutine_enter(co, NULL);

FWIW, this should be wrapped with aio_context_acquire/aio_context_release.

>> +    }
>> +}
> 
> This construct brings to mind the use-after-free case when a scheduled
> coroutine terminates before it is entered by this loop:
> 
> There are two scheduled Coroutines: A and B.  During
> qemu_coroutine_enter(A) we enter B.  B then terminates by returning from
> its main function.  Once A yields or terminates we still try to enter
> the freed B coroutine.
> 
> Unfortunately I don't think we have good debugging or an assertion for
> this bug.  I'm sure it will occur at some point...

aio_co_schedule (and qemu_coroutine_wake which wraps it later in the
series) is quite a low-level interface, so I do not expect many users.

That said, there is at least another case where it will be used.  In the
dataplane branch, where AIO callbacks take the AioContext mutex
themselves, we have:

static void bdrv_co_io_em_complete(void *opaque, int ret)
{
    CoroutineIOCompletion *co = opaque;

    co->ret = ret;
    aio_context_acquire(co->ctx);
    qemu_coroutine_enter(co->coroutine, NULL);
    aio_context_release(co->ctx);
}

...

    acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors,
                                  bdrv_co_io_em_complete, &co);
    qemu_coroutine_yield();

bdrv_co_io_em_complete here can be called before the coroutine has
yielded.  To prepare for the replacement of the AioContext mutex with
fine-grained mutexes, I think bdrv_co_io_em_complete should do something
like

    if (ctx != qemu_get_current_aio_context()) {
        aio_co_schedule(ctx, co->coroutine);
        return;
    }

    aio_context_acquire(ctx);
    qemu_coroutine_enter(co->coroutine, NULL);
    aio_context_release(ctx);


> Please document
> that the coroutine must not be entered by anyone else while
> aio_co_schedule() is active.

Sure.

Paolo
Stefan Hajnoczi May 26, 2016, 7:19 p.m. UTC | #5
On Tue, May 17, 2016 at 04:57:28PM +0200, Paolo Bonzini wrote:
> That said, there is at least another case where it will be used.  In the
> dataplane branch, where AIO callbacks take the AioContext mutex
> themselves, we have:
> 
> static void bdrv_co_io_em_complete(void *opaque, int ret)
> {
>     CoroutineIOCompletion *co = opaque;
> 
>     co->ret = ret;
>     aio_context_acquire(co->ctx);
>     qemu_coroutine_enter(co->coroutine, NULL);
>     aio_context_release(co->ctx);
> }
> 
> ...
> 
>     acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors,
>                                   bdrv_co_io_em_complete, &co);
>     qemu_coroutine_yield();
> 
> bdrv_co_io_em_complete here can be called before the coroutine has
> yielded.  To prepare for the replacement of the AioContext mutex with
> fine-grained mutexes, I think bdrv_co_io_em_complete should do something
> like
> 
>     if (ctx != qemu_get_current_aio_context()) {
>         aio_co_schedule(ctx, co->coroutine);
>         return;
>     }
> 
>     aio_context_acquire(ctx);
>     qemu_coroutine_enter(co->coroutine, NULL);
>     aio_context_release(ctx);

Okay, that makes sense.
diff mbox

Patch

diff --git a/async.c b/async.c
index acd3627..ef8b409 100644
--- a/async.c
+++ b/async.c
@@ -30,6 +30,8 @@ 
 #include "qemu/main-loop.h"
 #include "qemu/atomic.h"
 #include "block/raw-aio.h"
+#include "trace/generated-tracers.h"
+#include "qemu/coroutine_int.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -255,6 +257,8 @@  aio_ctx_finalize(GSource     *source)
     }
 #endif
 
+    qemu_bh_delete(ctx->schedule_bh);
+
     qemu_lockcnt_lock(&ctx->list_lock);
     assert(!qemu_lockcnt_count(&ctx->list_lock));
     while (ctx->first_bh) {
@@ -335,6 +339,28 @@  static void event_notifier_dummy_cb(EventNotifier *e)
 {
 }
 
+static void schedule_bh_cb(void *opaque)
+{
+    AioContext *ctx = opaque;
+    QSLIST_HEAD(, Coroutine) straight, reversed;
+
+    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
+    QSLIST_INIT(&straight);
+
+    while (!QSLIST_EMPTY(&reversed)) {
+        Coroutine *co = QSLIST_FIRST(&reversed);
+        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
+        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
+    }
+
+    while (!QSLIST_EMPTY(&straight)) {
+        Coroutine *co = QSLIST_FIRST(&straight);
+        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
+        trace_aio_schedule_bh_cb(ctx, co);
+        qemu_coroutine_enter(co, NULL);
+    }
+}
+
 AioContext *aio_context_new(Error **errp)
 {
     int ret;
@@ -354,6 +380,10 @@  AioContext *aio_context_new(Error **errp)
     }
     g_source_set_can_recurse(&ctx->source, true);
     qemu_lockcnt_init(&ctx->list_lock);
+
+    ctx->schedule_bh = aio_bh_new(ctx, schedule_bh_cb, ctx);
+    QSLIST_INIT(&ctx->scheduled_coroutines);
+
     aio_set_event_notifier(ctx, &ctx->notifier,
                            false,
                            (EventNotifierHandler *)
@@ -371,6 +401,14 @@  fail:
     return NULL;
 }
 
+void aio_co_schedule(AioContext *ctx, Coroutine *co)
+{
+    trace_aio_co_schedule(ctx, co);
+    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
+                              co, co_scheduled_next);
+    qemu_bh_schedule(ctx->schedule_bh);
+}
+
 void aio_context_ref(AioContext *ctx)
 {
     g_source_ref(&ctx->source);
diff --git a/include/block/aio.h b/include/block/aio.h
index 8f55d1a..0a344c3 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -46,6 +46,7 @@  typedef struct AioHandler AioHandler;
 typedef void QEMUBHFunc(void *opaque);
 typedef void IOHandler(void *opaque);
 
+struct Coroutine;
 struct ThreadPool;
 struct LinuxAioState;
 
@@ -107,6 +108,9 @@  struct AioContext {
     bool notified;
     EventNotifier notifier;
 
+    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
+    QEMUBH *schedule_bh;
+
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -441,6 +445,11 @@  static inline bool aio_node_check(AioContext *ctx, bool is_external)
 }
 
 /**
+ * aio_co_schedule
+ */
+void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
+
+/**
  * @ctx: the aio context
  *
  * Return whether we are running in the I/O thread that manages @ctx.
diff --git a/include/qemu/coroutine_int.h b/include/qemu/coroutine_int.h
index 581a7f5..c0e9aa1 100644
--- a/include/qemu/coroutine_int.h
+++ b/include/qemu/coroutine_int.h
@@ -38,11 +38,19 @@  struct Coroutine {
     CoroutineEntry *entry;
     void *entry_arg;
     Coroutine *caller;
+
+    /* Only used when the coroutine has terminated.  */
     QSLIST_ENTRY(Coroutine) pool_next;
 
-    /* Coroutines that should be woken up when we yield or terminate */
+    /* Coroutines that should be woken up when we yield or terminate.
+     * Only used when the coroutine is running.
+     */
     QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
+
+    /* Only used when the coroutine is sleeping.  */
+    AioContext *ctx;
     QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
+    QSLIST_ENTRY(Coroutine) co_scheduled_next;
 };
 
 Coroutine *qemu_coroutine_new(void);
diff --git a/tests/Makefile b/tests/Makefile
index f60df75..b90b870 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -39,9 +39,13 @@  check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
+gcov-files-test-aio-y = async-posix.c
+gcov-files-test-aio-$(CONFIG_WIN32) += aio-win32.c
+gcov-files-test-aio-$(CONFIG_POSIX) += aio-posix.c
+check-unit-y += tests/test-aio-multithread$(EXESUF)
+gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
+gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
 check-unit-y += tests/test-throttle$(EXESUF)
-gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
-gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
 check-unit-y += tests/test-thread-pool$(EXESUF)
 gcov-files-test-thread-pool-y = thread-pool.c
 gcov-files-test-hbitmap-y = util/hbitmap.c
@@ -400,7 +404,7 @@  test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
 	$(test-qom-obj-y)
 test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
 test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
-test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
+test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
 
 tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
 tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
@@ -412,6 +416,7 @@  tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y
 tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
 tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
+tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
 tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(test-block-obj-y)
diff --git a/tests/iothread.c b/tests/iothread.c
new file mode 100644
index 0000000..00ab316
--- /dev/null
+++ b/tests/iothread.c
@@ -0,0 +1,91 @@ 
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qemu/osdep.h"
+#include "qapi/error.h"
+#include "block/aio.h"
+#include "qemu/main-loop.h"
+#include "qemu/rcu.h"
+#include "iothread.h"
+
+struct IOThread {
+    AioContext *ctx;
+
+    QemuThread thread;
+    QemuMutex init_done_lock;
+    QemuCond init_done_cond;    /* is thread initialization done? */
+    bool stopping;
+};
+
+static __thread IOThread *my_iothread;
+
+bool aio_context_in_iothread(AioContext *ctx)
+{
+    return ctx == (my_iothread ? my_iothread->ctx : qemu_get_aio_context());
+}
+
+static void *iothread_run(void *opaque)
+{
+    IOThread *iothread = opaque;
+
+    rcu_register_thread();
+
+    my_iothread = iothread;
+    qemu_mutex_lock(&iothread->init_done_lock);
+    iothread->ctx = aio_context_new(&error_abort);
+    qemu_cond_signal(&iothread->init_done_cond);
+    qemu_mutex_unlock(&iothread->init_done_lock);
+
+    while (!atomic_read(&iothread->stopping)) {
+        aio_poll(iothread->ctx, true);
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+void iothread_join(IOThread *iothread)
+{
+    iothread->stopping = true;
+    aio_notify(iothread->ctx);
+    qemu_thread_join(&iothread->thread);
+    qemu_cond_destroy(&iothread->init_done_cond);
+    qemu_mutex_destroy(&iothread->init_done_lock);
+    aio_context_unref(iothread->ctx);
+    g_free(iothread);
+}
+
+IOThread *iothread_new(void)
+{
+    IOThread *iothread = g_new0(IOThread, 1);
+
+    qemu_mutex_init(&iothread->init_done_lock);
+    qemu_cond_init(&iothread->init_done_cond);
+    qemu_thread_create(&iothread->thread, NULL, iothread_run,
+                       iothread, QEMU_THREAD_JOINABLE);
+
+    /* Wait for initialization to complete */
+    qemu_mutex_lock(&iothread->init_done_lock);
+    while (iothread->ctx == NULL) {
+        qemu_cond_wait(&iothread->init_done_cond,
+                       &iothread->init_done_lock);
+    }
+    qemu_mutex_unlock(&iothread->init_done_lock);
+    return iothread;
+}
+
+AioContext *iothread_get_aio_context(IOThread *iothread)
+{
+    return iothread->ctx;
+}
diff --git a/tests/iothread.h b/tests/iothread.h
new file mode 100644
index 0000000..4877cea
--- /dev/null
+++ b/tests/iothread.h
@@ -0,0 +1,25 @@ 
+/*
+ * Event loop thread implementation for unit tests
+ *
+ * Copyright Red Hat Inc., 2013, 2016
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *  Paolo Bonzini     <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#ifndef TEST_IOTHREAD_H
+#define TEST_IOTHREAD_H
+
+#include "block/aio.h"
+#include "qemu/thread.h"
+
+typedef struct IOThread IOThread;
+
+IOThread *iothread_new(void);
+void iothread_join(IOThread *iothread);
+AioContext *iothread_get_aio_context(IOThread *iothread);
+
+#endif
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
new file mode 100644
index 0000000..94fecf7
--- /dev/null
+++ b/tests/test-aio-multithread.c
@@ -0,0 +1,220 @@ 
+/*
+ * AioContext multithreading tests
+ *
+ * Copyright Red Hat, Inc. 2016
+ *
+ * Authors:
+ *  Paolo Bonzini    <pbonzini@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include <glib.h>
+#include "block/aio.h"
+#include "qapi/error.h"
+#include "qemu/coroutine.h"
+#include "qemu/thread.h"
+#include "qemu/error-report.h"
+#include "iothread.h"
+
+/* AioContext management */
+
+#define NUM_CONTEXTS 5
+
+static IOThread *threads[NUM_CONTEXTS];
+static AioContext *ctx[NUM_CONTEXTS];
+static __thread int id = -1;
+
+static QemuEvent done_event;
+
+/* Callbacks run a function synchronously on a remote iothread. */
+
+typedef struct CtxRunData {
+    QEMUBH *bh;
+    QEMUBHFunc *cb;
+    void *arg;
+} CtxRunData;
+
+static void ctx_run_bh_cb(void *opaque)
+{
+    CtxRunData *data = opaque;
+    QEMUBHFunc *cb = data->cb;
+    void *arg = data->arg;
+    qemu_bh_delete(data->bh);
+    g_free(data);
+
+    cb(arg);
+    qemu_event_set(&done_event);
+}
+
+static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
+{
+    CtxRunData *data = g_new(CtxRunData, 1);
+    QEMUBH *bh = aio_bh_new(ctx[i], ctx_run_bh_cb, data);
+
+    data->bh = bh;
+    data->cb = cb;
+    data->arg = opaque;
+
+    qemu_event_reset(&done_event);
+    qemu_bh_schedule(bh);
+    qemu_event_wait(&done_event);
+}
+
+/* Starting the iothreads. */
+
+static void set_id_cb(void *opaque)
+{
+    int *i = opaque;
+
+    id = *i;
+}
+
+static void create_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        threads[i] = iothread_new();
+        ctx[i] = iothread_get_aio_context(threads[i]);
+    }
+
+    qemu_event_init(&done_event, false);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, set_id_cb, &i);
+    }
+}
+
+/* Stopping the iothreads. */
+
+static void join_aio_contexts(void)
+{
+    int i;
+
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_ref(ctx[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        iothread_join(threads[i]);
+    }
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        aio_context_unref(ctx[i]);
+    }
+    qemu_event_destroy(&done_event);
+}
+
+/* Basic test for the stuff above. */
+
+static void test_lifecycle(void)
+{
+    create_aio_contexts();
+    join_aio_contexts();
+}
+
+/* aio_co_schedule test.  */
+
+static Coroutine *to_schedule[NUM_CONTEXTS];
+
+static bool now_stopping;
+
+static int count_retry;
+static int count_here;
+static int count_other;
+
+static bool schedule_next(int n)
+{
+    Coroutine *co;
+
+    co = atomic_xchg(&to_schedule[n], NULL);
+    if (!co) {
+        atomic_inc(&count_retry);
+        return false;
+    }
+
+    if (n == id) {
+        atomic_inc(&count_here);
+    } else {
+        atomic_inc(&count_other);
+    }
+
+    aio_co_schedule(ctx[n], co);
+    return true;
+}
+
+static void finish_cb(void *opaque)
+{
+    schedule_next(id);
+}
+
+static void test_multi_co_schedule_entry(void *opaque)
+{
+    g_assert(to_schedule[id] == NULL);
+    atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+
+    while (!atomic_mb_read(&now_stopping)) {
+        int n;
+
+        n = g_test_rand_int_range(0, NUM_CONTEXTS);
+        schedule_next(n);
+        qemu_coroutine_yield();
+
+        g_assert(to_schedule[id] == NULL);
+        atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
+    }
+}
+
+
+static void test_multi_co_schedule(int seconds)
+{
+    int i;
+
+    count_here = count_other = count_retry = 0;
+    now_stopping = false;
+
+    create_aio_contexts();
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    for (i = 0; i < NUM_CONTEXTS; i++) {
+        ctx_run(i, finish_cb, NULL);
+        to_schedule[i] = NULL;
+    }
+
+    join_aio_contexts();
+    g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
+                  count_other, count_here, count_retry,
+                  count_here + count_other + count_retry);
+}
+
+static void test_multi_co_schedule_1(void)
+{
+    test_multi_co_schedule(1);
+}
+
+static void test_multi_co_schedule_10(void)
+{
+    test_multi_co_schedule(10);
+}
+
+/* End of tests.  */
+
+int main(int argc, char **argv)
+{
+    init_clocks();
+
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
+    if (g_test_quick()) {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+    } else {
+        g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+    }
+    return g_test_run();
+}
diff --git a/trace-events b/trace-events
index 922e70b..78b042c 100644
--- a/trace-events
+++ b/trace-events
@@ -972,6 +972,10 @@  xen_map_cache_return(void* ptr) "%p"
 # hw/i386/xen/xen_platform.c
 xen_platform_log(char *s) "xen platform: %s"
 
+# async.c
+aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
+aio_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
+
 # qemu-coroutine.c
 qemu_coroutine_enter(void *from, void *to, void *opaque) "from %p to %p opaque %p"
 qemu_coroutine_yield(void *from, void *to) "from %p to %p"