diff mbox series

[2/3] nbd/server: Fix drained_poll to wake coroutine in right AioContext

Message ID 20230517152834.277483-3-kwolf@redhat.com (mailing list archive)
State New, archived
Headers show
Series block/graph-lock: Disable locking for now | expand

Commit Message

Kevin Wolf May 17, 2023, 3:28 p.m. UTC
nbd_drained_poll() generally runs in the main thread, not whatever
iothread the NBD server coroutine is meant to run in, so it can't
directly reenter the coroutines to wake them up.

The code seems to have the right intention, it specifies the correct
AioContext when it calls qemu_aio_coroutine_enter(). However, this
functions doesn't schedule the coroutine to run in that AioContext, but
it assumes it is already called in the home thread of the AioContext.

To fix this, add a new thread-safe qio_channel_wake_read() that can be
called in the main thread to wake up the coroutine in its AioContext,
and use this in nbd_drained_poll().

Cc: qemu-stable@nongnu.org
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 include/io/channel.h | 10 ++++++++++
 io/channel.c         | 33 +++++++++++++++++++++++++++------
 nbd/server.c         |  3 +--
 3 files changed, 38 insertions(+), 8 deletions(-)

Comments

Eric Blake May 18, 2023, 12:43 p.m. UTC | #1
On Wed, May 17, 2023 at 05:28:33PM +0200, Kevin Wolf wrote:
> nbd_drained_poll() generally runs in the main thread, not whatever
> iothread the NBD server coroutine is meant to run in, so it can't
> directly reenter the coroutines to wake them up.
> 
> The code seems to have the right intention, it specifies the correct
> AioContext when it calls qemu_aio_coroutine_enter(). However, this
> functions doesn't schedule the coroutine to run in that AioContext, but
> it assumes it is already called in the home thread of the AioContext.
> 
> To fix this, add a new thread-safe qio_channel_wake_read() that can be
> called in the main thread to wake up the coroutine in its AioContext,
> and use this in nbd_drained_poll().
> 
> Cc: qemu-stable@nongnu.org
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  include/io/channel.h | 10 ++++++++++
>  io/channel.c         | 33 +++++++++++++++++++++++++++------
>  nbd/server.c         |  3 +--
>  3 files changed, 38 insertions(+), 8 deletions(-)
>

Lots of support code...

> +++ b/nbd/server.c
> @@ -1599,8 +1599,7 @@ static bool nbd_drained_poll(void *opaque)
>               * enter it here so we don't depend on the client to wake it up.
>               */
>              if (client->recv_coroutine != NULL && client->read_yielding) {
> -                qemu_aio_coroutine_enter(exp->common.ctx,
> -                                         client->recv_coroutine);
> +                qio_channel_wake_read(client->ioc);
>              }

...for what boils down to a deceptively simple alternative call.  But
your reasoning made sense to me, and I can see how your new function
solves the issue.

Reviewed-by: Eric Blake <eblake@redhat.com>
diff mbox series

Patch

diff --git a/include/io/channel.h b/include/io/channel.h
index 446a566e5e..229bf36910 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -757,6 +757,16 @@  void qio_channel_detach_aio_context(QIOChannel *ioc);
 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
                                     GIOCondition condition);
 
+/**
+ * qio_channel_wake_read:
+ * @ioc: the channel object
+ *
+ * If qio_channel_yield() is currently waiting for the channel to become
+ * readable, interrupt it and reenter immediately. This function is safe to call
+ * from any thread.
+ */
+void qio_channel_wake_read(QIOChannel *ioc);
+
 /**
  * qio_channel_wait:
  * @ioc: the channel object
diff --git a/io/channel.c b/io/channel.c
index 375a130a39..72f0066af5 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -19,6 +19,7 @@ 
  */
 
 #include "qemu/osdep.h"
+#include "block/aio-wait.h"
 #include "io/channel.h"
 #include "qapi/error.h"
 #include "qemu/main-loop.h"
@@ -514,7 +515,11 @@  int qio_channel_flush(QIOChannel *ioc,
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;
-    Coroutine *co = ioc->read_coroutine;
+    Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
+
+    if (!co) {
+        return;
+    }
 
     /* Assert that aio_co_wake() reenters the coroutine directly */
     assert(qemu_get_current_aio_context() ==
@@ -525,7 +530,11 @@  static void qio_channel_restart_read(void *opaque)
 static void qio_channel_restart_write(void *opaque)
 {
     QIOChannel *ioc = opaque;
-    Coroutine *co = ioc->write_coroutine;
+    Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
+
+    if (!co) {
+        return;
+    }
 
     /* Assert that aio_co_wake() reenters the coroutine directly */
     assert(qemu_get_current_aio_context() ==
@@ -568,7 +577,11 @@  void qio_channel_detach_aio_context(QIOChannel *ioc)
 void coroutine_fn qio_channel_yield(QIOChannel *ioc,
                                     GIOCondition condition)
 {
+    AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
+
     assert(qemu_in_coroutine());
+    assert(in_aio_context_home_thread(ioc_ctx));
+
     if (condition == G_IO_IN) {
         assert(!ioc->read_coroutine);
         ioc->read_coroutine = qemu_coroutine_self();
@@ -580,18 +593,26 @@  void coroutine_fn qio_channel_yield(QIOChannel *ioc,
     }
     qio_channel_set_aio_fd_handlers(ioc);
     qemu_coroutine_yield();
+    assert(in_aio_context_home_thread(ioc_ctx));
 
     /* Allow interrupting the operation by reentering the coroutine other than
      * through the aio_fd_handlers. */
-    if (condition == G_IO_IN && ioc->read_coroutine) {
-        ioc->read_coroutine = NULL;
+    if (condition == G_IO_IN) {
+        assert(ioc->read_coroutine == NULL);
         qio_channel_set_aio_fd_handlers(ioc);
-    } else if (condition == G_IO_OUT && ioc->write_coroutine) {
-        ioc->write_coroutine = NULL;
+    } else if (condition == G_IO_OUT) {
+        assert(ioc->write_coroutine == NULL);
         qio_channel_set_aio_fd_handlers(ioc);
     }
 }
 
+void qio_channel_wake_read(QIOChannel *ioc)
+{
+    Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
+    if (co) {
+        aio_co_wake(co);
+    }
+}
 
 static gboolean qio_channel_wait_complete(QIOChannel *ioc,
                                           GIOCondition condition,
diff --git a/nbd/server.c b/nbd/server.c
index e239c2890f..2664d43bff 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -1599,8 +1599,7 @@  static bool nbd_drained_poll(void *opaque)
              * enter it here so we don't depend on the client to wake it up.
              */
             if (client->recv_coroutine != NULL && client->read_yielding) {
-                qemu_aio_coroutine_enter(exp->common.ctx,
-                                         client->recv_coroutine);
+                qio_channel_wake_read(client->ioc);
             }
 
             return true;