diff mbox series

[v3,06/33] util/async: aio_co_schedule(): support reschedule in same ctx

Message ID 20210416080911.83197-7-vsementsov@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series block/nbd: rework client connection | expand

Commit Message

Vladimir Sementsov-Ogievskiy April 16, 2021, 8:08 a.m. UTC
With the following patch we want to call wake coroutine from thread.
And it doesn't work with aio_co_wake:
Assume we have no iothreads.
Assume we have a coroutine A, which waits in the yield point for
external aio_co_wake(), and no progress can be done until it happen.
Main thread is in blocking aio_poll() (for example, in blk_read()).

Now, in a separate thread we do aio_co_wake(). It calls  aio_co_enter(),
which goes through last "else" branch and do aio_context_acquire(ctx).

Now we have a deadlock, as aio_poll() will not release the context lock
until some progress is done, and progress can't be done until
aio_co_wake() wake the coroutine A. And it can't because it wait for
aio_context_acquire().

Still, aio_co_schedule() works well in parallel with blocking
aio_poll(). So we want use it. Let's add a possibility of rescheduling
coroutine in same ctx where it was yield'ed.

Fetch co->ctx in same way as it is done in aio_co_wake().

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 include/block/aio.h | 2 +-
 util/async.c        | 8 ++++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

Comments

Roman Kagan April 23, 2021, 10:09 a.m. UTC | #1
On Fri, Apr 16, 2021 at 11:08:44AM +0300, Vladimir Sementsov-Ogievskiy wrote:
> With the following patch we want to call wake coroutine from thread.
> And it doesn't work with aio_co_wake:
> Assume we have no iothreads.
> Assume we have a coroutine A, which waits in the yield point for
> external aio_co_wake(), and no progress can be done until it happen.
> Main thread is in blocking aio_poll() (for example, in blk_read()).
> 
> Now, in a separate thread we do aio_co_wake(). It calls  aio_co_enter(),
> which goes through last "else" branch and do aio_context_acquire(ctx).
> 
> Now we have a deadlock, as aio_poll() will not release the context lock
> until some progress is done, and progress can't be done until
> aio_co_wake() wake the coroutine A. And it can't because it wait for
> aio_context_acquire().
> 
> Still, aio_co_schedule() works well in parallel with blocking
> aio_poll(). So we want use it. Let's add a possibility of rescheduling
> coroutine in same ctx where it was yield'ed.
> 
> Fetch co->ctx in same way as it is done in aio_co_wake().
> 
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---
>  include/block/aio.h | 2 +-
>  util/async.c        | 8 ++++++++
>  2 files changed, 9 insertions(+), 1 deletion(-)
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5f342267d5..744b695525 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -643,7 +643,7 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
>  
>  /**
>   * aio_co_schedule:
> - * @ctx: the aio context
> + * @ctx: the aio context, if NULL, the current ctx of @co will be used.
>   * @co: the coroutine
>   *
>   * Start a coroutine on a remote AioContext.
> diff --git a/util/async.c b/util/async.c
> index 674dbefb7c..750be555c6 100644
> --- a/util/async.c
> +++ b/util/async.c
> @@ -545,6 +545,14 @@ fail:
>  
>  void aio_co_schedule(AioContext *ctx, Coroutine *co)
>  {
> +    if (!ctx) {
> +        /*
> +         * Read coroutine before co->ctx.  Matches smp_wmb in
> +         * qemu_coroutine_enter.
> +         */
> +        smp_read_barrier_depends();
> +        ctx = qatomic_read(&co->ctx);
> +    }

I'd rather not extend this interface, but add a new one on top.  And
document how it's different from aio_co_wake().

Roman.

>      trace_aio_co_schedule(ctx, co);
>      const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
>                                             __func__);
> -- 
> 2.29.2
>
Vladimir Sementsov-Ogievskiy April 26, 2021, 8:52 a.m. UTC | #2
23.04.2021 13:09, Roman Kagan wrote:
> On Fri, Apr 16, 2021 at 11:08:44AM +0300, Vladimir Sementsov-Ogievskiy wrote:
>> With the following patch we want to call wake coroutine from thread.
>> And it doesn't work with aio_co_wake:
>> Assume we have no iothreads.
>> Assume we have a coroutine A, which waits in the yield point for
>> external aio_co_wake(), and no progress can be done until it happen.
>> Main thread is in blocking aio_poll() (for example, in blk_read()).
>>
>> Now, in a separate thread we do aio_co_wake(). It calls  aio_co_enter(),
>> which goes through last "else" branch and do aio_context_acquire(ctx).
>>
>> Now we have a deadlock, as aio_poll() will not release the context lock
>> until some progress is done, and progress can't be done until
>> aio_co_wake() wake the coroutine A. And it can't because it wait for
>> aio_context_acquire().
>>
>> Still, aio_co_schedule() works well in parallel with blocking
>> aio_poll(). So we want use it. Let's add a possibility of rescheduling
>> coroutine in same ctx where it was yield'ed.
>>
>> Fetch co->ctx in same way as it is done in aio_co_wake().
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
>> ---
>>   include/block/aio.h | 2 +-
>>   util/async.c        | 8 ++++++++
>>   2 files changed, 9 insertions(+), 1 deletion(-)
>>
>> diff --git a/include/block/aio.h b/include/block/aio.h
>> index 5f342267d5..744b695525 100644
>> --- a/include/block/aio.h
>> +++ b/include/block/aio.h
>> @@ -643,7 +643,7 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
>>   
>>   /**
>>    * aio_co_schedule:
>> - * @ctx: the aio context
>> + * @ctx: the aio context, if NULL, the current ctx of @co will be used.
>>    * @co: the coroutine
>>    *
>>    * Start a coroutine on a remote AioContext.
>> diff --git a/util/async.c b/util/async.c
>> index 674dbefb7c..750be555c6 100644
>> --- a/util/async.c
>> +++ b/util/async.c
>> @@ -545,6 +545,14 @@ fail:
>>   
>>   void aio_co_schedule(AioContext *ctx, Coroutine *co)
>>   {
>> +    if (!ctx) {
>> +        /*
>> +         * Read coroutine before co->ctx.  Matches smp_wmb in
>> +         * qemu_coroutine_enter.
>> +         */
>> +        smp_read_barrier_depends();
>> +        ctx = qatomic_read(&co->ctx);
>> +    }
> 
> I'd rather not extend this interface, but add a new one on top.  And
> document how it's different from aio_co_wake().
> 

Agree, that's better. Will do.
Paolo Bonzini May 12, 2021, 6:56 a.m. UTC | #3
On 16/04/21 10:08, Vladimir Sementsov-Ogievskiy wrote:
> With the following patch we want to call wake coroutine from thread.
> And it doesn't work with aio_co_wake:
> Assume we have no iothreads.
> Assume we have a coroutine A, which waits in the yield point for
> external aio_co_wake(), and no progress can be done until it happen.
> Main thread is in blocking aio_poll() (for example, in blk_read()).
>
> Now, in a separate thread we do aio_co_wake(). It calls  aio_co_enter(),
> which goes through last "else" branch and do aio_context_acquire(ctx).

I don't understand.  Why doesn't aio_co_enter go through the ctx != 
qemu_get_current_aio_context() branch and just do aio_co_schedule?  That 
was at least the idea behind aio_co_wake and aio_co_enter.

Paolo
Vladimir Sementsov-Ogievskiy May 12, 2021, 7:15 a.m. UTC | #4
12.05.2021 09:56, Paolo Bonzini wrote:
> On 16/04/21 10:08, Vladimir Sementsov-Ogievskiy wrote:
>> With the following patch we want to call wake coroutine from thread.
>> And it doesn't work with aio_co_wake:
>> Assume we have no iothreads.
>> Assume we have a coroutine A, which waits in the yield point for
>> external aio_co_wake(), and no progress can be done until it happen.
>> Main thread is in blocking aio_poll() (for example, in blk_read()).
>>
>> Now, in a separate thread we do aio_co_wake(). It calls  aio_co_enter(),
>> which goes through last "else" branch and do aio_context_acquire(ctx).
> 
> I don't understand.  Why doesn't aio_co_enter go through the ctx != qemu_get_current_aio_context() branch and just do aio_co_schedule?  That was at least the idea behind aio_co_wake and aio_co_enter.
> 

Because ctx is exactly qemu_get_current_aio_context(), as we are not in iothread but in nbd connection thread. So, qemu_get_current_aio_context() returns qemu_aio_context.
Paolo Bonzini May 13, 2021, 9:04 p.m. UTC | #5
On 12/05/21 09:15, Vladimir Sementsov-Ogievskiy wrote:
>>>
>>
>> I don't understand.  Why doesn't aio_co_enter go through the ctx != 
>> qemu_get_current_aio_context() branch and just do aio_co_schedule?  
>> That was at least the idea behind aio_co_wake and aio_co_enter.
>>
> 
> Because ctx is exactly qemu_get_current_aio_context(), as we are not in 
> iothread but in nbd connection thread. So, 
> qemu_get_current_aio_context() returns qemu_aio_context.

So the problem is that threads other than the main thread and
the I/O thread should not return qemu_aio_context.  The vCPU thread
may need to return it when running with BQL taken, though.

Something like this (untested):

diff --git a/include/block/aio.h b/include/block/aio.h
index 5f342267d5..10fcae1515 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
   * Return the AioContext whose event loop runs in the current thread.
   *
   * If called from an IOThread this will be the IOThread's AioContext.  If
- * called from another thread it will be the main loop AioContext.
+ * called from the main thread or with the "big QEMU lock" taken it
+ * will be the main loop AioContext.
   */
  AioContext *qemu_get_current_aio_context(void);
  
+void qemu_set_current_aio_context(AioContext *ctx);
+
  /**
   * aio_context_setup:
   * @ctx: the aio context
diff --git a/iothread.c b/iothread.c
index 7f086387be..22b967e77c 100644
--- a/iothread.c
+++ b/iothread.c
@@ -39,11 +39,23 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
  #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
  #endif
  
-static __thread IOThread *my_iothread;
+static __thread AioContext *my_aiocontext;
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+    assert(!my_aiocontext);
+    my_aiocontext = ctx;
+}
  
  AioContext *qemu_get_current_aio_context(void)
  {
-    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+    if (my_aiocontext) {
+        return my_aiocontext;
+    }
+    if (qemu_mutex_iothread_locked()) {
+        return qemu_get_aio_context();
+    }
+    return NULL;
  }
  
  static void *iothread_run(void *opaque)
@@ -56,7 +68,7 @@ static void *iothread_run(void *opaque)
       * in this new thread uses glib.
       */
      g_main_context_push_thread_default(iothread->worker_context);
-    my_iothread = iothread;
+    qemu_set_current_aio_context(iothread->ctx);
      iothread->thread_id = qemu_get_thread_id();
      qemu_sem_post(&iothread->init_done_sem);
  
diff --git a/stubs/iothread.c b/stubs/iothread.c
index 8cc9e28c55..25ff398894 100644
--- a/stubs/iothread.c
+++ b/stubs/iothread.c
@@ -6,3 +6,7 @@ AioContext *qemu_get_current_aio_context(void)
  {
      return qemu_get_aio_context();
  }
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+}
diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
index afde12b4ef..cab38b3da8 100644
--- a/tests/unit/iothread.c
+++ b/tests/unit/iothread.c
@@ -30,13 +30,26 @@ struct IOThread {
      bool stopping;
  };
  
-static __thread IOThread *my_iothread;
+static __thread AioContext *my_aiocontext;
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+    assert(!my_aiocontext);
+    my_aiocontext = ctx;
+}
  
  AioContext *qemu_get_current_aio_context(void)
  {
-    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
+    if (my_aiocontext) {
+        return my_aiocontext;
+    }
+    if (qemu_mutex_iothread_locked()) {
+        return qemu_get_aio_context();
+    }
+    return NULL;
  }
  
+
  static void iothread_init_gcontext(IOThread *iothread)
  {
      GSource *source;
@@ -54,7 +67,7 @@ static void *iothread_run(void *opaque)
  
      rcu_register_thread();
  
-    my_iothread = iothread;
+    qemu_set_current_aio_context(iothread->ctx);
      qemu_mutex_lock(&iothread->init_done_lock);
      iothread->ctx = aio_context_new(&error_abort);
  
diff --git a/util/main-loop.c b/util/main-loop.c
index d9c55df6f5..4ae5b23e99 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
      if (!qemu_aio_context) {
          return -EMFILE;
      }
+    qemu_set_current_aio_context(qemu_aio_context);
      qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
      gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
      src = aio_get_g_source(qemu_aio_context);

If it works for you, I can post it as a formal patch.

Paolo
Roman Kagan May 14, 2021, 5:27 p.m. UTC | #6
On Thu, May 13, 2021 at 11:04:37PM +0200, Paolo Bonzini wrote:
> On 12/05/21 09:15, Vladimir Sementsov-Ogievskiy wrote:
> > > > 
> > > 
> > > I don't understand.  Why doesn't aio_co_enter go through the ctx !=
> > > qemu_get_current_aio_context() branch and just do aio_co_schedule?
> > > That was at least the idea behind aio_co_wake and aio_co_enter.
> > > 
> > 
> > Because ctx is exactly qemu_get_current_aio_context(), as we are not in
> > iothread but in nbd connection thread. So,
> > qemu_get_current_aio_context() returns qemu_aio_context.
> 
> So the problem is that threads other than the main thread and
> the I/O thread should not return qemu_aio_context.  The vCPU thread
> may need to return it when running with BQL taken, though.

I'm not sure this is the only case.

AFAICS your patch has basically the same effect as Vladimir's
patch "util/async: aio_co_enter(): do aio_co_schedule in general case"
(https://lore.kernel.org/qemu-devel/20210408140827.332915-4-vsementsov@virtuozzo.com/).
That one was found to break e.g. aio=threads cases.  I guessed it
implicitly relied upon aio_co_enter() acquiring the aio_context but we
didn't dig further to pinpoint the exact scenario.

Roman.

> Something like this (untested):
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5f342267d5..10fcae1515 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
>   * Return the AioContext whose event loop runs in the current thread.
>   *
>   * If called from an IOThread this will be the IOThread's AioContext.  If
> - * called from another thread it will be the main loop AioContext.
> + * called from the main thread or with the "big QEMU lock" taken it
> + * will be the main loop AioContext.
>   */
>  AioContext *qemu_get_current_aio_context(void);
> +void qemu_set_current_aio_context(AioContext *ctx);
> +
>  /**
>   * aio_context_setup:
>   * @ctx: the aio context
> diff --git a/iothread.c b/iothread.c
> index 7f086387be..22b967e77c 100644
> --- a/iothread.c
> +++ b/iothread.c
> @@ -39,11 +39,23 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
>  #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
>  #endif
> -static __thread IOThread *my_iothread;
> +static __thread AioContext *my_aiocontext;
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +    assert(!my_aiocontext);
> +    my_aiocontext = ctx;
> +}
>  AioContext *qemu_get_current_aio_context(void)
>  {
> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
> +    if (my_aiocontext) {
> +        return my_aiocontext;
> +    }
> +    if (qemu_mutex_iothread_locked()) {
> +        return qemu_get_aio_context();
> +    }
> +    return NULL;
>  }
>  static void *iothread_run(void *opaque)
> @@ -56,7 +68,7 @@ static void *iothread_run(void *opaque)
>       * in this new thread uses glib.
>       */
>      g_main_context_push_thread_default(iothread->worker_context);
> -    my_iothread = iothread;
> +    qemu_set_current_aio_context(iothread->ctx);
>      iothread->thread_id = qemu_get_thread_id();
>      qemu_sem_post(&iothread->init_done_sem);
> diff --git a/stubs/iothread.c b/stubs/iothread.c
> index 8cc9e28c55..25ff398894 100644
> --- a/stubs/iothread.c
> +++ b/stubs/iothread.c
> @@ -6,3 +6,7 @@ AioContext *qemu_get_current_aio_context(void)
>  {
>      return qemu_get_aio_context();
>  }
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +}
> diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
> index afde12b4ef..cab38b3da8 100644
> --- a/tests/unit/iothread.c
> +++ b/tests/unit/iothread.c
> @@ -30,13 +30,26 @@ struct IOThread {
>      bool stopping;
>  };
> -static __thread IOThread *my_iothread;
> +static __thread AioContext *my_aiocontext;
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +    assert(!my_aiocontext);
> +    my_aiocontext = ctx;
> +}
>  AioContext *qemu_get_current_aio_context(void)
>  {
> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
> +    if (my_aiocontext) {
> +        return my_aiocontext;
> +    }
> +    if (qemu_mutex_iothread_locked()) {
> +        return qemu_get_aio_context();
> +    }
> +    return NULL;
>  }
> +
>  static void iothread_init_gcontext(IOThread *iothread)
>  {
>      GSource *source;
> @@ -54,7 +67,7 @@ static void *iothread_run(void *opaque)
>      rcu_register_thread();
> -    my_iothread = iothread;
> +    qemu_set_current_aio_context(iothread->ctx);
>      qemu_mutex_lock(&iothread->init_done_lock);
>      iothread->ctx = aio_context_new(&error_abort);
> diff --git a/util/main-loop.c b/util/main-loop.c
> index d9c55df6f5..4ae5b23e99 100644
> --- a/util/main-loop.c
> +++ b/util/main-loop.c
> @@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
>      if (!qemu_aio_context) {
>          return -EMFILE;
>      }
> +    qemu_set_current_aio_context(qemu_aio_context);
>      qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
>      gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>      src = aio_get_g_source(qemu_aio_context);
> 
> If it works for you, I can post it as a formal patch.
> 
> Paolo
>
Paolo Bonzini May 14, 2021, 9:19 p.m. UTC | #7
On 14/05/21 19:27, Roman Kagan wrote:
> 
> AFAICS your patch has basically the same effect as Vladimir's
> patch "util/async: aio_co_enter(): do aio_co_schedule in general case"
> (https://lore.kernel.org/qemu-devel/20210408140827.332915-4-vsementsov@virtuozzo.com/).
> That one was found to break e.g. aio=threads cases.  I guessed it
> implicitly relied upon aio_co_enter() acquiring the aio_context but we
> didn't dig further to pinpoint the exact scenario.

That one is much more intrusive, because it goes through a bottom half 
unnecessarily in the case of aio_co_wake being called from an I/O 
callback (or from another bottom half).  I'll test my patch with 
aio=threads.

Paolo
Vladimir Sementsov-Ogievskiy June 8, 2021, 6:45 p.m. UTC | #8
14.05.2021 00:04, Paolo Bonzini wrote:
> On 12/05/21 09:15, Vladimir Sementsov-Ogievskiy wrote:
>>>>
>>>
>>> I don't understand.  Why doesn't aio_co_enter go through the ctx != qemu_get_current_aio_context() branch and just do aio_co_schedule? That was at least the idea behind aio_co_wake and aio_co_enter.
>>>
>>
>> Because ctx is exactly qemu_get_current_aio_context(), as we are not in iothread but in nbd connection thread. So, qemu_get_current_aio_context() returns qemu_aio_context.
> 
> So the problem is that threads other than the main thread and
> the I/O thread should not return qemu_aio_context.  The vCPU thread
> may need to return it when running with BQL taken, though.
> 
> Something like this (untested):
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5f342267d5..10fcae1515 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
>    * Return the AioContext whose event loop runs in the current thread.
>    *
>    * If called from an IOThread this will be the IOThread's AioContext.  If
> - * called from another thread it will be the main loop AioContext.
> + * called from the main thread or with the "big QEMU lock" taken it
> + * will be the main loop AioContext.
>    */
>   AioContext *qemu_get_current_aio_context(void);
> 
> +void qemu_set_current_aio_context(AioContext *ctx);
> +
>   /**
>    * aio_context_setup:
>    * @ctx: the aio context
> diff --git a/iothread.c b/iothread.c
> index 7f086387be..22b967e77c 100644
> --- a/iothread.c
> +++ b/iothread.c
> @@ -39,11 +39,23 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
>   #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
>   #endif
> 
> -static __thread IOThread *my_iothread;
> +static __thread AioContext *my_aiocontext;
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +    assert(!my_aiocontext);
> +    my_aiocontext = ctx;
> +}
> 
>   AioContext *qemu_get_current_aio_context(void)
>   {
> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
> +    if (my_aiocontext) {
> +        return my_aiocontext;
> +    }
> +    if (qemu_mutex_iothread_locked()) {
> +        return qemu_get_aio_context();
> +    }
> +    return NULL;
>   }
> 
>   static void *iothread_run(void *opaque)
> @@ -56,7 +68,7 @@ static void *iothread_run(void *opaque)
>        * in this new thread uses glib.
>        */
>       g_main_context_push_thread_default(iothread->worker_context);
> -    my_iothread = iothread;
> +    qemu_set_current_aio_context(iothread->ctx);
>       iothread->thread_id = qemu_get_thread_id();
>       qemu_sem_post(&iothread->init_done_sem);
> 
> diff --git a/stubs/iothread.c b/stubs/iothread.c
> index 8cc9e28c55..25ff398894 100644
> --- a/stubs/iothread.c
> +++ b/stubs/iothread.c
> @@ -6,3 +6,7 @@ AioContext *qemu_get_current_aio_context(void)
>   {
>       return qemu_get_aio_context();
>   }
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +}
> diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
> index afde12b4ef..cab38b3da8 100644
> --- a/tests/unit/iothread.c
> +++ b/tests/unit/iothread.c
> @@ -30,13 +30,26 @@ struct IOThread {
>       bool stopping;
>   };
> 
> -static __thread IOThread *my_iothread;
> +static __thread AioContext *my_aiocontext;
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +    assert(!my_aiocontext);
> +    my_aiocontext = ctx;
> +}
> 
>   AioContext *qemu_get_current_aio_context(void)
>   {
> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
> +    if (my_aiocontext) {
> +        return my_aiocontext;
> +    }
> +    if (qemu_mutex_iothread_locked()) {
> +        return qemu_get_aio_context();
> +    }
> +    return NULL;
>   }
> 
> +
>   static void iothread_init_gcontext(IOThread *iothread)
>   {
>       GSource *source;
> @@ -54,7 +67,7 @@ static void *iothread_run(void *opaque)
> 
>       rcu_register_thread();
> 
> -    my_iothread = iothread;
> +    qemu_set_current_aio_context(iothread->ctx);
>       qemu_mutex_lock(&iothread->init_done_lock);
>       iothread->ctx = aio_context_new(&error_abort);
> 
> diff --git a/util/main-loop.c b/util/main-loop.c
> index d9c55df6f5..4ae5b23e99 100644
> --- a/util/main-loop.c
> +++ b/util/main-loop.c
> @@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
>       if (!qemu_aio_context) {
>           return -EMFILE;
>       }
> +    qemu_set_current_aio_context(qemu_aio_context);
>       qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
>       gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>       src = aio_get_g_source(qemu_aio_context);
> 
> If it works for you, I can post it as a formal patch.
> 

This doesn't work for iotests.. qemu-io goes through version in stub. It works if I add:

diff --git a/stubs/iothread.c b/stubs/iothread.c
index 8cc9e28c55..967a01c4f0 100644
--- a/stubs/iothread.c
+++ b/stubs/iothread.c
@@ -2,7 +2,18 @@
  #include "block/aio.h"
  #include "qemu/main-loop.h"
  
+static __thread AioContext *my_aiocontext;
+
+void qemu_set_current_aio_context(AioContext *ctx)
+{
+    assert(!my_aiocontext);
+    my_aiocontext = ctx;
+}
+
  AioContext *qemu_get_current_aio_context(void)
  {
-    return qemu_get_aio_context();
+    if (my_aiocontext) {
+        return my_aiocontext;
+    }
+    return NULL;
  }
Paolo Bonzini June 9, 2021, 9:35 a.m. UTC | #9
On 08/06/21 20:45, Vladimir Sementsov-Ogievskiy wrote:
> 14.05.2021 00:04, Paolo Bonzini wrote:
>> On 12/05/21 09:15, Vladimir Sementsov-Ogievskiy wrote:
>>>>>
>>>>
>>>> I don't understand.  Why doesn't aio_co_enter go through the ctx != 
>>>> qemu_get_current_aio_context() branch and just do aio_co_schedule? 
>>>> That was at least the idea behind aio_co_wake and aio_co_enter.
>>>>
>>>
>>> Because ctx is exactly qemu_get_current_aio_context(), as we are not 
>>> in iothread but in nbd connection thread. So, 
>>> qemu_get_current_aio_context() returns qemu_aio_context.
>>
>> So the problem is that threads other than the main thread and
>> the I/O thread should not return qemu_aio_context.  The vCPU thread
>> may need to return it when running with BQL taken, though.
>>
>> Something like this (untested):
>>
>> diff --git a/include/block/aio.h b/include/block/aio.h
>> index 5f342267d5..10fcae1515 100644
>> --- a/include/block/aio.h
>> +++ b/include/block/aio.h
>> @@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct 
>> Coroutine *co);
>>    * Return the AioContext whose event loop runs in the current thread.
>>    *
>>    * If called from an IOThread this will be the IOThread's 
>> AioContext.  If
>> - * called from another thread it will be the main loop AioContext.
>> + * called from the main thread or with the "big QEMU lock" taken it
>> + * will be the main loop AioContext.
>>    */
>>   AioContext *qemu_get_current_aio_context(void);
>>
>> +void qemu_set_current_aio_context(AioContext *ctx);
>> +
>>   /**
>>    * aio_context_setup:
>>    * @ctx: the aio context
>> diff --git a/iothread.c b/iothread.c
>> index 7f086387be..22b967e77c 100644
>> --- a/iothread.c
>> +++ b/iothread.c
>> @@ -39,11 +39,23 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
>>   #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
>>   #endif
>>
>> -static __thread IOThread *my_iothread;
>> +static __thread AioContext *my_aiocontext;
>> +
>> +void qemu_set_current_aio_context(AioContext *ctx)
>> +{
>> +    assert(!my_aiocontext);
>> +    my_aiocontext = ctx;
>> +}
>>
>>   AioContext *qemu_get_current_aio_context(void)
>>   {
>> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
>> +    if (my_aiocontext) {
>> +        return my_aiocontext;
>> +    }
>> +    if (qemu_mutex_iothread_locked()) {
>> +        return qemu_get_aio_context();
>> +    }
>> +    return NULL;
>>   }
>>
>>   static void *iothread_run(void *opaque)
>> @@ -56,7 +68,7 @@ static void *iothread_run(void *opaque)
>>        * in this new thread uses glib.
>>        */
>>       g_main_context_push_thread_default(iothread->worker_context);
>> -    my_iothread = iothread;
>> +    qemu_set_current_aio_context(iothread->ctx);
>>       iothread->thread_id = qemu_get_thread_id();
>>       qemu_sem_post(&iothread->init_done_sem);
>>
>> diff --git a/stubs/iothread.c b/stubs/iothread.c
>> index 8cc9e28c55..25ff398894 100644
>> --- a/stubs/iothread.c
>> +++ b/stubs/iothread.c
>> @@ -6,3 +6,7 @@ AioContext *qemu_get_current_aio_context(void)
>>   {
>>       return qemu_get_aio_context();
>>   }
>> +
>> +void qemu_set_current_aio_context(AioContext *ctx)
>> +{
>> +}
>> diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
>> index afde12b4ef..cab38b3da8 100644
>> --- a/tests/unit/iothread.c
>> +++ b/tests/unit/iothread.c
>> @@ -30,13 +30,26 @@ struct IOThread {
>>       bool stopping;
>>   };
>>
>> -static __thread IOThread *my_iothread;
>> +static __thread AioContext *my_aiocontext;
>> +
>> +void qemu_set_current_aio_context(AioContext *ctx)
>> +{
>> +    assert(!my_aiocontext);
>> +    my_aiocontext = ctx;
>> +}
>>
>>   AioContext *qemu_get_current_aio_context(void)
>>   {
>> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
>> +    if (my_aiocontext) {
>> +        return my_aiocontext;
>> +    }
>> +    if (qemu_mutex_iothread_locked()) {
>> +        return qemu_get_aio_context();
>> +    }
>> +    return NULL;
>>   }
>>
>> +
>>   static void iothread_init_gcontext(IOThread *iothread)
>>   {
>>       GSource *source;
>> @@ -54,7 +67,7 @@ static void *iothread_run(void *opaque)
>>
>>       rcu_register_thread();
>>
>> -    my_iothread = iothread;
>> +    qemu_set_current_aio_context(iothread->ctx);
>>       qemu_mutex_lock(&iothread->init_done_lock);
>>       iothread->ctx = aio_context_new(&error_abort);
>>
>> diff --git a/util/main-loop.c b/util/main-loop.c
>> index d9c55df6f5..4ae5b23e99 100644
>> --- a/util/main-loop.c
>> +++ b/util/main-loop.c
>> @@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
>>       if (!qemu_aio_context) {
>>           return -EMFILE;
>>       }
>> +    qemu_set_current_aio_context(qemu_aio_context);
>>       qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
>>       gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>       src = aio_get_g_source(qemu_aio_context);
>>
>> If it works for you, I can post it as a formal patch.
>>
> 
> This doesn't work for iotests.. qemu-io goes through version in stub. It 
> works if I add:

Great, thanks.  I'll try the patch, also with the test that broke with 
the earlier version, and post if it works.

Paolo

> diff --git a/stubs/iothread.c b/stubs/iothread.c
> index 8cc9e28c55..967a01c4f0 100644
> --- a/stubs/iothread.c
> +++ b/stubs/iothread.c
> @@ -2,7 +2,18 @@
>   #include "block/aio.h"
>   #include "qemu/main-loop.h"
> 
> +static __thread AioContext *my_aiocontext;
> +
> +void qemu_set_current_aio_context(AioContext *ctx)
> +{
> +    assert(!my_aiocontext);
> +    my_aiocontext = ctx;
> +}
> +
>   AioContext *qemu_get_current_aio_context(void)
>   {
> -    return qemu_get_aio_context();
> +    if (my_aiocontext) {
> +        return my_aiocontext;
> +    }
> +    return NULL;
>   }
> 
> 
>
Vladimir Sementsov-Ogievskiy June 9, 2021, 10:24 a.m. UTC | #10
09.06.2021 12:35, Paolo Bonzini wrote:
> On 08/06/21 20:45, Vladimir Sementsov-Ogievskiy wrote:
>> 14.05.2021 00:04, Paolo Bonzini wrote:
>>> On 12/05/21 09:15, Vladimir Sementsov-Ogievskiy wrote:
>>>>>>
>>>>>
>>>>> I don't understand.  Why doesn't aio_co_enter go through the ctx != qemu_get_current_aio_context() branch and just do aio_co_schedule? That was at least the idea behind aio_co_wake and aio_co_enter.
>>>>>
>>>>
>>>> Because ctx is exactly qemu_get_current_aio_context(), as we are not in iothread but in nbd connection thread. So, qemu_get_current_aio_context() returns qemu_aio_context.
>>>
>>> So the problem is that threads other than the main thread and
>>> the I/O thread should not return qemu_aio_context.  The vCPU thread
>>> may need to return it when running with BQL taken, though.
>>>
>>> Something like this (untested):
>>>
>>> diff --git a/include/block/aio.h b/include/block/aio.h
>>> index 5f342267d5..10fcae1515 100644
>>> --- a/include/block/aio.h
>>> +++ b/include/block/aio.h
>>> @@ -691,10 +691,13 @@ void aio_co_enter(AioContext *ctx, struct Coroutine *co);
>>>    * Return the AioContext whose event loop runs in the current thread.
>>>    *
>>>    * If called from an IOThread this will be the IOThread's AioContext.  If
>>> - * called from another thread it will be the main loop AioContext.
>>> + * called from the main thread or with the "big QEMU lock" taken it
>>> + * will be the main loop AioContext.
>>>    */
>>>   AioContext *qemu_get_current_aio_context(void);
>>>
>>> +void qemu_set_current_aio_context(AioContext *ctx);
>>> +
>>>   /**
>>>    * aio_context_setup:
>>>    * @ctx: the aio context
>>> diff --git a/iothread.c b/iothread.c
>>> index 7f086387be..22b967e77c 100644
>>> --- a/iothread.c
>>> +++ b/iothread.c
>>> @@ -39,11 +39,23 @@ DECLARE_CLASS_CHECKERS(IOThreadClass, IOTHREAD,
>>>   #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
>>>   #endif
>>>
>>> -static __thread IOThread *my_iothread;
>>> +static __thread AioContext *my_aiocontext;
>>> +
>>> +void qemu_set_current_aio_context(AioContext *ctx)
>>> +{
>>> +    assert(!my_aiocontext);
>>> +    my_aiocontext = ctx;
>>> +}
>>>
>>>   AioContext *qemu_get_current_aio_context(void)
>>>   {
>>> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
>>> +    if (my_aiocontext) {
>>> +        return my_aiocontext;
>>> +    }
>>> +    if (qemu_mutex_iothread_locked()) {
>>> +        return qemu_get_aio_context();
>>> +    }
>>> +    return NULL;
>>>   }
>>>
>>>   static void *iothread_run(void *opaque)
>>> @@ -56,7 +68,7 @@ static void *iothread_run(void *opaque)
>>>        * in this new thread uses glib.
>>>        */
>>>       g_main_context_push_thread_default(iothread->worker_context);
>>> -    my_iothread = iothread;
>>> +    qemu_set_current_aio_context(iothread->ctx);
>>>       iothread->thread_id = qemu_get_thread_id();
>>>       qemu_sem_post(&iothread->init_done_sem);
>>>
>>> diff --git a/stubs/iothread.c b/stubs/iothread.c
>>> index 8cc9e28c55..25ff398894 100644
>>> --- a/stubs/iothread.c
>>> +++ b/stubs/iothread.c
>>> @@ -6,3 +6,7 @@ AioContext *qemu_get_current_aio_context(void)
>>>   {
>>>       return qemu_get_aio_context();
>>>   }
>>> +
>>> +void qemu_set_current_aio_context(AioContext *ctx)
>>> +{
>>> +}
>>> diff --git a/tests/unit/iothread.c b/tests/unit/iothread.c
>>> index afde12b4ef..cab38b3da8 100644
>>> --- a/tests/unit/iothread.c
>>> +++ b/tests/unit/iothread.c
>>> @@ -30,13 +30,26 @@ struct IOThread {
>>>       bool stopping;
>>>   };
>>>
>>> -static __thread IOThread *my_iothread;
>>> +static __thread AioContext *my_aiocontext;
>>> +
>>> +void qemu_set_current_aio_context(AioContext *ctx)
>>> +{
>>> +    assert(!my_aiocontext);
>>> +    my_aiocontext = ctx;
>>> +}
>>>
>>>   AioContext *qemu_get_current_aio_context(void)
>>>   {
>>> -    return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
>>> +    if (my_aiocontext) {
>>> +        return my_aiocontext;
>>> +    }
>>> +    if (qemu_mutex_iothread_locked()) {
>>> +        return qemu_get_aio_context();
>>> +    }
>>> +    return NULL;
>>>   }
>>>
>>> +
>>>   static void iothread_init_gcontext(IOThread *iothread)
>>>   {
>>>       GSource *source;
>>> @@ -54,7 +67,7 @@ static void *iothread_run(void *opaque)
>>>
>>>       rcu_register_thread();
>>>
>>> -    my_iothread = iothread;
>>> +    qemu_set_current_aio_context(iothread->ctx);
>>>       qemu_mutex_lock(&iothread->init_done_lock);
>>>       iothread->ctx = aio_context_new(&error_abort);
>>>
>>> diff --git a/util/main-loop.c b/util/main-loop.c
>>> index d9c55df6f5..4ae5b23e99 100644
>>> --- a/util/main-loop.c
>>> +++ b/util/main-loop.c
>>> @@ -170,6 +170,7 @@ int qemu_init_main_loop(Error **errp)
>>>       if (!qemu_aio_context) {
>>>           return -EMFILE;
>>>       }
>>> +    qemu_set_current_aio_context(qemu_aio_context);
>>>       qemu_notify_bh = qemu_bh_new(notify_event_cb, NULL);
>>>       gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
>>>       src = aio_get_g_source(qemu_aio_context);
>>>
>>> If it works for you, I can post it as a formal patch.
>>>
>>
>> This doesn't work for iotests.. qemu-io goes through version in stub. It works if I add:
> 
> Great, thanks.  I'll try the patch, also with the test that broke with the earlier version, and post if it works.
> 

Thanks, I'll base v4 of nbd patches on it.

I now run make check. test-aio-multithread crashes on assertion:

(gdb) bt
#0  0x00007f4af8d839d5 in raise () from /lib64/libc.so.6
#1  0x00007f4af8d6c8a4 in abort () from /lib64/libc.so.6
#2  0x00007f4af8d6c789 in __assert_fail_base.cold () from /lib64/libc.so.6
#3  0x00007f4af8d7c026 in __assert_fail () from /lib64/libc.so.6
#4  0x000055daebfdab95 in aio_poll (ctx=0x7f4ae0000b60, blocking=true) at ../util/aio-posix.c:567
#5  0x000055daebea096c in iothread_run (opaque=0x55daed81bc90) at ../tests/unit/iothread.c:91
#6  0x000055daebfc6c4a in qemu_thread_start (args=0x55daed7d9940) at ../util/qemu-thread-posix.c:521
#7  0x00007f4af8f1a3f9 in start_thread () from /lib64/libpthread.so.0
#8  0x00007f4af8e47b53 in clone () from /lib64/libc.so.6
(gdb) fr 4
#4  0x000055daebfdab95 in aio_poll (ctx=0x7f4ae0000b60, blocking=true) at ../util/aio-posix.c:567
567         assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
(gdb) list
562          *
563          * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
564          * is special in that it runs in the main thread, but that thread's context
565          * is qemu_aio_context.
566          */
567         assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
568                                           qemu_get_aio_context() : ctx));
569
570         qemu_lockcnt_inc(&ctx->list_lock);
571
Paolo Bonzini June 9, 2021, 12:17 p.m. UTC | #11
On 09/06/21 12:24, Vladimir Sementsov-Ogievskiy wrote:
> Thanks, I'll base v4 of nbd patches on it.
> 
> I now run make check. test-aio-multithread crashes on assertion:

With the patch I've sent it doesn't, so hopefully you can go ahead.

Paolo

> (gdb) bt
> #0  0x00007f4af8d839d5 in raise () from /lib64/libc.so.6
> #1  0x00007f4af8d6c8a4 in abort () from /lib64/libc.so.6
> #2  0x00007f4af8d6c789 in __assert_fail_base.cold () from /lib64/libc.so.6
> #3  0x00007f4af8d7c026 in __assert_fail () from /lib64/libc.so.6
> #4  0x000055daebfdab95 in aio_poll (ctx=0x7f4ae0000b60, blocking=true) 
> at ../util/aio-posix.c:567
> #5  0x000055daebea096c in iothread_run (opaque=0x55daed81bc90) at 
> ../tests/unit/iothread.c:91
> #6  0x000055daebfc6c4a in qemu_thread_start (args=0x55daed7d9940) at 
> ../util/qemu-thread-posix.c:521
> #7  0x00007f4af8f1a3f9 in start_thread () from /lib64/libpthread.so.0
> #8  0x00007f4af8e47b53 in clone () from /lib64/libc.so.6
> (gdb) fr 4
> #4  0x000055daebfdab95 in aio_poll (ctx=0x7f4ae0000b60, blocking=true) 
> at ../util/aio-posix.c:567
> 567         assert(in_aio_context_home_thread(ctx == 
> iohandler_get_aio_context() ?
> (gdb) list
> 562          *
> 563          * aio_poll() may only be called in the AioContext's thread. 
> iohandler_ctx
> 564          * is special in that it runs in the main thread, but that 
> thread's context
> 565          * is qemu_aio_context.
> 566          */
> 567         assert(in_aio_context_home_thread(ctx == 
> iohandler_get_aio_context() ?
> 568                                           qemu_get_aio_context() : 
> ctx));
> 569
> 570         qemu_lockcnt_inc(&ctx->list_lock);
diff mbox series

Patch

diff --git a/include/block/aio.h b/include/block/aio.h
index 5f342267d5..744b695525 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -643,7 +643,7 @@  static inline bool aio_node_check(AioContext *ctx, bool is_external)
 
 /**
  * aio_co_schedule:
- * @ctx: the aio context
+ * @ctx: the aio context, if NULL, the current ctx of @co will be used.
  * @co: the coroutine
  *
  * Start a coroutine on a remote AioContext.
diff --git a/util/async.c b/util/async.c
index 674dbefb7c..750be555c6 100644
--- a/util/async.c
+++ b/util/async.c
@@ -545,6 +545,14 @@  fail:
 
 void aio_co_schedule(AioContext *ctx, Coroutine *co)
 {
+    if (!ctx) {
+        /*
+         * Read coroutine before co->ctx.  Matches smp_wmb in
+         * qemu_coroutine_enter.
+         */
+        smp_read_barrier_depends();
+        ctx = qatomic_read(&co->ctx);
+    }
     trace_aio_co_schedule(ctx, co);
     const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL,
                                            __func__);