[v3,3/4] qmp: Move dispatcher to a coroutine
diff mbox series

Message ID 20200115122326.26393-4-kwolf@redhat.com
State New
Headers show
Series
  • qmp: Optionally run handlers in coroutines
Related show

Commit Message

Kevin Wolf Jan. 15, 2020, 12:23 p.m. UTC
This moves the QMP dispatcher to a coroutine and runs all QMP command
handlers that declare 'coroutine': true in coroutine context so they
can avoid blocking the main loop while doing I/O or waiting for other
events.

For commands that are not declared safe to run in a coroutine, the
dispatcher drops out of coroutine context by calling the QMP command
handler from a bottom half.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Marc-André Lureau <marcandre.lureau@redhat.com>
---
 include/qapi/qmp/dispatch.h |  2 +
 monitor/monitor-internal.h  |  5 ++-
 monitor/monitor.c           | 24 +++++++----
 monitor/qmp.c               | 83 +++++++++++++++++++++++--------------
 qapi/qmp-dispatch.c         | 38 ++++++++++++++++-
 5 files changed, 111 insertions(+), 41 deletions(-)

Comments

Markus Armbruster Jan. 17, 2020, 12:20 p.m. UTC | #1
Kevin Wolf <kwolf@redhat.com> writes:

> This moves the QMP dispatcher to a coroutine and runs all QMP command
> handlers that declare 'coroutine': true in coroutine context so they
> can avoid blocking the main loop while doing I/O or waiting for other
> events.
>
> For commands that are not declared safe to run in a coroutine, the
> dispatcher drops out of coroutine context by calling the QMP command
> handler from a bottom half.
>
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> Reviewed-by: Marc-André Lureau <marcandre.lureau@redhat.com>
> ---
>  include/qapi/qmp/dispatch.h |  2 +
>  monitor/monitor-internal.h  |  5 ++-
>  monitor/monitor.c           | 24 +++++++----
>  monitor/qmp.c               | 83 +++++++++++++++++++++++--------------
>  qapi/qmp-dispatch.c         | 38 ++++++++++++++++-
>  5 files changed, 111 insertions(+), 41 deletions(-)
>
> diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
> index d6ce9efc8e..d6d5443391 100644
> --- a/include/qapi/qmp/dispatch.h
> +++ b/include/qapi/qmp/dispatch.h
> @@ -30,6 +30,8 @@ typedef enum QmpCommandOptions
>  typedef struct QmpCommand
>  {
>      const char *name;
> +    /* Runs in coroutine context if QCO_COROUTINE is set, except for OOB
> +     * commands */

Needs an update if we outlaw 'allow-oob': true, 'coroutine': true.

>      QmpCommandFunc *fn;
>      QmpCommandOptions options;
>      QTAILQ_ENTRY(QmpCommand) node;
> diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h
> index d78f5ca190..7389b6a56c 100644
> --- a/monitor/monitor-internal.h
> +++ b/monitor/monitor-internal.h
> @@ -154,7 +154,8 @@ static inline bool monitor_is_qmp(const Monitor *mon)
>  
>  typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
>  extern IOThread *mon_iothread;
> -extern QEMUBH *qmp_dispatcher_bh;
> +extern Coroutine *qmp_dispatcher_co;
> +extern bool qmp_dispatcher_co_busy;
>  extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
>  extern QemuMutex monitor_lock;
>  extern MonitorList mon_list;
> @@ -172,7 +173,7 @@ void monitor_fdsets_cleanup(void);
>  
>  void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
>  void monitor_data_destroy_qmp(MonitorQMP *mon);
> -void monitor_qmp_bh_dispatcher(void *data);
> +void coroutine_fn monitor_qmp_dispatcher_co(void *data);
>  
>  int get_monitor_def(int64_t *pval, const char *name);
>  void help_cmd(Monitor *mon, const char *name);
> diff --git a/monitor/monitor.c b/monitor/monitor.c
> index 12898b6448..c72763fa4e 100644
> --- a/monitor/monitor.c
> +++ b/monitor/monitor.c
> @@ -53,8 +53,9 @@ typedef struct {
>  /* Shared monitor I/O thread */
>  IOThread *mon_iothread;
>  
> -/* Bottom half to dispatch the requests received from I/O thread */
> -QEMUBH *qmp_dispatcher_bh;
> +/* Coroutine to dispatch the requests received from I/O thread */
> +Coroutine *qmp_dispatcher_co;
> +bool qmp_dispatcher_co_busy;

Purpose of @qmp_dispatcher_co_busy is not obvious.  Could it use a
comment?

>  
>  /* Protects mon_list, monitor_qapi_event_state, monitor_destroyed.  */
>  QemuMutex monitor_lock;
> @@ -579,9 +580,16 @@ void monitor_cleanup(void)
>      }
>      qemu_mutex_unlock(&monitor_lock);
>  
> -    /* QEMUBHs needs to be deleted before destroying the I/O thread */
> -    qemu_bh_delete(qmp_dispatcher_bh);
> -    qmp_dispatcher_bh = NULL;
> +    /* The dispatcher needs to stop before destroying the I/O thread */
> +    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
> +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

Looks rather low-level to my (untrained!) eyes.

> +        qmp_dispatcher_co = NULL;

As we'll see below, there's a subtle reason for zapping
qmp_dispatcher_co here.

> +    }
> +
> +    AIO_WAIT_WHILE(qemu_get_aio_context(),
> +                   (aio_bh_poll(iohandler_get_aio_context()),
> +                    atomic_mb_read(&qmp_dispatcher_co_busy)));
> +

Looks like it waits for @qmp_dispatcher_co_busy to become false.  The
details are greek to me.

aio_bh_poll()'s function comment says "These are internal functions used
by the QEMU main loop."  This seems to be the first call outside
util/aio-{posix,win32}.c.  Hmm.

How exactly this implies the coroutine terminated I can't quite tell.

>      if (mon_iothread) {
>          iothread_destroy(mon_iothread);
>          mon_iothread = NULL;
> @@ -604,9 +612,9 @@ void monitor_init_globals_core(void)
>       * have commands assuming that context.  It would be nice to get
>       * rid of those assumptions.
>       */
> -    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
> -                                   monitor_qmp_bh_dispatcher,
> -                                   NULL);
> +    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
> +    atomic_mb_set(&qmp_dispatcher_co_busy, true);
> +    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);

Ignorant question: why not qemu_coroutine_enter()?

>  }
>  
>  QemuOptsList qemu_mon_opts = {
> diff --git a/monitor/qmp.c b/monitor/qmp.c
> index b67a8e7d1f..f29a8fe497 100644
> --- a/monitor/qmp.c
> +++ b/monitor/qmp.c
> @@ -133,6 +133,8 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
>      }
>  }
>  
> +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> + * for everything else. */

Wing this comment, please.

Note: the precondition is asserted in do_qmp_dispatch() below.
Asserting here is impractical, because we don't know whether this is an
OOB command.

>  static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
>  {
>      Monitor *old_mon;
> @@ -211,43 +213,62 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
>      return req_obj;
>  }
>  
> -void monitor_qmp_bh_dispatcher(void *data)
> +void coroutine_fn monitor_qmp_dispatcher_co(void *data)
>  {
> -    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
> +    QMPRequest *req_obj = NULL;
>      QDict *rsp;
>      bool need_resume;
>      MonitorQMP *mon;
>  
> -    if (!req_obj) {
> -        return;
> -    }
> +    while (true) {
> +        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
> +
> +        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
> +            /* Wait to be reentered from handle_qmp_command, or terminate if
> +             * qmp_dispatcher_co has been reset to NULL */
> +            atomic_mb_set(&qmp_dispatcher_co_busy, false);

Note for later: qmp_dispatcher_co_busy is false now.

> +            if (qmp_dispatcher_co) {
> +                qemu_coroutine_yield();
> +            }
> +            /* qmp_dispatcher_co may have changed if we yielded and were
> +             * reentered from monitor_cleanup() */
> +            if (!qmp_dispatcher_co) {
> +                return;
> +            }

!qmp_dispatcher_co asks the coroutine to terminate.

monitor_init_globals_core() sets it before scheduling the newly created
coroutine.  monitor_cleanup() clears it after scheduling the non-busy
coroutine.

When asked to terminate, we don't want to yield (first conditional), and
we don't want to loop (second conditional).

> +            assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);

How does qmp_dispatcher_co_busy become true again?  Is it the
atomic_xchg() in handle_qmp_command() below?

> +        }

Looks like the purpose of the loop above is "get request if we have
requests, else terminate if asked to, else yield".

The initial kick (in monitor_init_globals_core()) hits "else yield".

Subsequent kicks in handle_qmp_command() hit "get request".

The final kick in monitor_cleanup() hits "terminate".

Correct?

>  
> -    mon = req_obj->mon;
> -    /*  qmp_oob_enabled() might change after "qmp_capabilities" */
> -    need_resume = !qmp_oob_enabled(mon) ||
> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
> -    if (req_obj->req) {
> -        QDict *qdict = qobject_to(QDict, req_obj->req);
> -        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> -        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> -        monitor_qmp_dispatch(mon, req_obj->req);
> -    } else {
> -        assert(req_obj->err);
> -        rsp = qmp_error_response(req_obj->err);
> -        req_obj->err = NULL;
> -        monitor_qmp_respond(mon, rsp);
> -        qobject_unref(rsp);
> -    }

If we get here, we have a @req_obj.

> +        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
> +        qemu_coroutine_yield();

I'm confused... why do we yield here?

> +
> +        mon = req_obj->mon;
> +        /*  qmp_oob_enabled() might change after "qmp_capabilities" */
> +        need_resume = !qmp_oob_enabled(mon) ||
> +            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
> +        if (req_obj->req) {
> +            QDict *qdict = qobject_to(QDict, req_obj->req);
> +            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> +            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> +            monitor_qmp_dispatch(mon, req_obj->req);
> +        } else {
> +            assert(req_obj->err);
> +            rsp = qmp_error_response(req_obj->err);
> +            req_obj->err = NULL;
> +            monitor_qmp_respond(mon, rsp);
> +            qobject_unref(rsp);
> +        }
>  
> -    if (need_resume) {
> -        /* Pairs with the monitor_suspend() in handle_qmp_command() */
> -        monitor_resume(&mon->common);
> -    }
> -    qmp_request_free(req_obj);
> +        if (need_resume) {
> +            /* Pairs with the monitor_suspend() in handle_qmp_command() */
> +            monitor_resume(&mon->common);
> +        }
> +        qmp_request_free(req_obj);

Unchanged apart from indentation.

>  
> -    /* Reschedule instead of looping so the main loop stays responsive */
> -    qemu_bh_schedule(qmp_dispatcher_bh);
> +        /* Reschedule instead of looping so the main loop stays responsive */
> +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> +        qemu_coroutine_yield();

Does the comment need tweaking?  We actually loop now, just not
immediately...

> +    }
>  }
>  
>  static void handle_qmp_command(void *opaque, QObject *req, Error *err)
> @@ -308,7 +329,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
>      qemu_mutex_unlock(&mon->qmp_queue_lock);
>  
>      /* Kick the dispatcher routine */
> -    qemu_bh_schedule(qmp_dispatcher_bh);
> +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
> +        aio_co_wake(qmp_dispatcher_co);
> +    }
>  }
>  
>  static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
> diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
> index bc264b3c9b..6ccf19f2a2 100644
> --- a/qapi/qmp-dispatch.c
> +++ b/qapi/qmp-dispatch.c
> @@ -12,6 +12,8 @@
>   */
>  
>  #include "qemu/osdep.h"
> +
> +#include "monitor/monitor-internal.h"

Ugh.

>  #include "qapi/error.h"
>  #include "qapi/qmp/dispatch.h"
>  #include "qapi/qmp/qdict.h"
> @@ -75,6 +77,23 @@ static QDict *qmp_dispatch_check_obj(const QObject *request, bool allow_oob,
>      return dict;
>  }
>  
> +typedef struct QmpDispatchBH {
> +    QmpCommand *cmd;
> +    QDict *args;
> +    QObject **ret;
> +    Error **errp;
> +    Coroutine *co;
> +} QmpDispatchBH;
> +
> +static void do_qmp_dispatch_bh(void *opaque)
> +{
> +    QmpDispatchBH *data = opaque;
> +    data->cmd->fn(data->args, data->ret, data->errp);
> +    aio_co_wake(data->co);
> +}
> +
> +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> + * for everything else. */
>  static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
>                                  bool allow_oob, Error **errp)
>  {
> @@ -129,7 +148,22 @@ static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
>          qobject_ref(args);
>      }
>  
> -    cmd->fn(args, &ret, &local_err);
> +    assert(!(oob && qemu_in_coroutine()));
> +    if ((cmd->options & QCO_COROUTINE) || !qemu_in_coroutine()) {
> +        cmd->fn(args, &ret, &local_err);
> +    } else {
> +        /* Must drop out of coroutine context for this one */
> +        QmpDispatchBH data = {
> +            .cmd    = cmd,
> +            .args   = args,
> +            .ret    = &ret,
> +            .errp   = &local_err,
> +            .co     = qemu_coroutine_self(),
> +        };
> +        aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
> +                                &data);
> +        qemu_coroutine_yield();
> +    }
>      if (local_err) {
>          error_propagate(errp, local_err);
>      } else if (cmd->options & QCO_NO_SUCCESS_RESP) {
> @@ -164,6 +198,8 @@ bool qmp_is_oob(const QDict *dict)
>          && !qdict_haskey(dict, "execute");
>  }
>  
> +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> + * for everything else. */

Wing this comment, please.

Note: the precondition is asserted in do_qmp_dispatch() above.  We don't
want to assert here, because we don't want to duplicate
do_qmp_dispatch()'s computation of "execute OOB".

>  QDict *qmp_dispatch(QmpCommandList *cmds, QObject *request,
>                      bool allow_oob)
>  {

Puh, I made it!

My problem with this patch isn't that I don't trust it to work.  It's
that I don't trust my ability to maintain such subtle code going
forward.

Me learning more about low-level coroutine stuff should help.

Us making the code less subtle will certainly help.

Here's one idea.  The way we make the coroutine terminate was faitly
hard to grasp for me.  Can we use the existing communication pipe,
namely mon->qmp_requests?  It's a queue of QMPRequest.  A QMPRequest is
either a request object (req && !err), or an Error to be reported (!req
&& err).  We could use !req && !err to mean "terminate".
Kevin Wolf Jan. 17, 2020, 2:03 p.m. UTC | #2
Am 17.01.2020 um 13:20 hat Markus Armbruster geschrieben:
> > @@ -579,9 +580,16 @@ void monitor_cleanup(void)
> >      }
> >      qemu_mutex_unlock(&monitor_lock);
> >  
> > -    /* QEMUBHs needs to be deleted before destroying the I/O thread */
> > -    qemu_bh_delete(qmp_dispatcher_bh);
> > -    qmp_dispatcher_bh = NULL;
> > +    /* The dispatcher needs to stop before destroying the I/O thread */
> > +    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
> > +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> 
> Looks rather low-level to my (untrained!) eyes.
> 
> > +        qmp_dispatcher_co = NULL;
> 
> As we'll see below, there's a subtle reason for zapping
> qmp_dispatcher_co here.

As you complain below that the method for signalling a termination
request to the dispatcher coroutine, I think I could introduce an
additional boolean. The aio_co_schedule() can probably be simplified,
too, so that we would get something like:

    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
        qmp_dispatcher_shutdown = true;
        aio_co_wake(qmp_dispatcher_co);
    }

Does this look better?

> > +    }
> > +
> > +    AIO_WAIT_WHILE(qemu_get_aio_context(),
> > +                   (aio_bh_poll(iohandler_get_aio_context()),
> > +                    atomic_mb_read(&qmp_dispatcher_co_busy)));
> > +
> 
> Looks like it waits for @qmp_dispatcher_co_busy to become false.  The
> details are greek to me.
> 
> aio_bh_poll()'s function comment says "These are internal functions used
> by the QEMU main loop."  This seems to be the first call outside
> util/aio-{posix,win32}.c.  Hmm.

Much of this complication comes from the fact that the monitor runs in
iohandler_ctx, which is not the main AioContext of the main loop thread
(or any thread). This makes waiting for something in this AioContext
rather complicated because nothing wil poll that AioContext if I don't
do it in the loop condition.

I would have called aio_poll(), but it's forbidden for iohandler_ctx:

    util/aio-posix.c:619: aio_poll: Assertion `in_aio_context_home_thread(ctx)' failed.

Maybe if we want to use aio_poll() instead of aio_bh_poll(), we could
special case iohandler_ctx in in_aio_context_home_thread().

> How exactly this implies the coroutine terminated I can't quite tell.

There is only one place which sets qmp_dispatcher_co_busy to false,
which is immediately before terminating for qmp_dispatcher_co == NULL.

What's probably missing is setting it to true above before waking up the
dispatcher coroutine. The atomic_mb_read() needs to be atomic_xchg()
like in handle_qmp_command().

> >      if (mon_iothread) {
> >          iothread_destroy(mon_iothread);
> >          mon_iothread = NULL;
> > @@ -604,9 +612,9 @@ void monitor_init_globals_core(void)
> >       * have commands assuming that context.  It would be nice to get
> >       * rid of those assumptions.
> >       */
> > -    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
> > -                                   monitor_qmp_bh_dispatcher,
> > -                                   NULL);
> > +    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
> > +    atomic_mb_set(&qmp_dispatcher_co_busy, true);
> > +    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> 
> Ignorant question: why not qemu_coroutine_enter()?

Because the old code didn't run the BH right away. Should it? We're
pretty early in the initialisation of QEMU, but it should work as long
as we're allowed to call monitor_qmp_requests_pop_any_with_lock()
already.

> >  }
> >  
> >  QemuOptsList qemu_mon_opts = {
> > diff --git a/monitor/qmp.c b/monitor/qmp.c
> > index b67a8e7d1f..f29a8fe497 100644
> > --- a/monitor/qmp.c
> > +++ b/monitor/qmp.c
> > @@ -133,6 +133,8 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
> >      }
> >  }
> >  
> > +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> > + * for everything else. */
> 
> Wing this comment, please.
> 
> Note: the precondition is asserted in do_qmp_dispatch() below.
> Asserting here is impractical, because we don't know whether this is an
> OOB command.
> 
> >  static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
> >  {
> >      Monitor *old_mon;
> > @@ -211,43 +213,62 @@ static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
> >      return req_obj;
> >  }
> >  
> > -void monitor_qmp_bh_dispatcher(void *data)
> > +void coroutine_fn monitor_qmp_dispatcher_co(void *data)
> >  {
> > -    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
> > +    QMPRequest *req_obj = NULL;
> >      QDict *rsp;
> >      bool need_resume;
> >      MonitorQMP *mon;
> >  
> > -    if (!req_obj) {
> > -        return;
> > -    }
> > +    while (true) {
> > +        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
> > +
> > +        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
> > +            /* Wait to be reentered from handle_qmp_command, or terminate if
> > +             * qmp_dispatcher_co has been reset to NULL */
> > +            atomic_mb_set(&qmp_dispatcher_co_busy, false);
> 
> Note for later: qmp_dispatcher_co_busy is false now.
> 
> > +            if (qmp_dispatcher_co) {
> > +                qemu_coroutine_yield();
> > +            }
> > +            /* qmp_dispatcher_co may have changed if we yielded and were
> > +             * reentered from monitor_cleanup() */
> > +            if (!qmp_dispatcher_co) {
> > +                return;
> > +            }
> 
> !qmp_dispatcher_co asks the coroutine to terminate.
> 
> monitor_init_globals_core() sets it before scheduling the newly created
> coroutine.  monitor_cleanup() clears it after scheduling the non-busy
> coroutine.
> 
> When asked to terminate, we don't want to yield (first conditional), and
> we don't want to loop (second conditional).
> 
> > +            assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
> 
> How does qmp_dispatcher_co_busy become true again?  Is it the
> atomic_xchg() in handle_qmp_command() below?
> 
> > +        }
> 
> Looks like the purpose of the loop above is "get request if we have
> requests, else terminate if asked to, else yield".
> 
> The initial kick (in monitor_init_globals_core()) hits "else yield".
> 
> Subsequent kicks in handle_qmp_command() hit "get request".
> 
> The final kick in monitor_cleanup() hits "terminate".
> 
> Correct?

Yes.

> >  
> > -    mon = req_obj->mon;
> > -    /*  qmp_oob_enabled() might change after "qmp_capabilities" */
> > -    need_resume = !qmp_oob_enabled(mon) ||
> > -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> > -    qemu_mutex_unlock(&mon->qmp_queue_lock);
> > -    if (req_obj->req) {
> > -        QDict *qdict = qobject_to(QDict, req_obj->req);
> > -        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> > -        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> > -        monitor_qmp_dispatch(mon, req_obj->req);
> > -    } else {
> > -        assert(req_obj->err);
> > -        rsp = qmp_error_response(req_obj->err);
> > -        req_obj->err = NULL;
> > -        monitor_qmp_respond(mon, rsp);
> > -        qobject_unref(rsp);
> > -    }
> 
> If we get here, we have a @req_obj.
> 
> > +        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
> > +        qemu_coroutine_yield();
> 
> I'm confused... why do we yield here?

We're waiting for new requests in iohandler_ctx because that's where the
monitor has always been running and I don't want to make changes that I
don't fully understand myself (I think it means that AIO_WAIT_WHILE()
doesn't allow new monitor requests to run).

But the QMP command handler coroutine needs to execute in
qemu_aio_context so that the handlers can make progress during
AIO_WAIT_WHILE(), otherwise you get hangs.

So the QMP dispatcher moves back and forth between both contexts. One
for getting new requests, the other for executing handlers.

This clearly needs a comment.

> > +
> > +        mon = req_obj->mon;
> > +        /*  qmp_oob_enabled() might change after "qmp_capabilities" */
> > +        need_resume = !qmp_oob_enabled(mon) ||
> > +            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
> > +        qemu_mutex_unlock(&mon->qmp_queue_lock);
> > +        if (req_obj->req) {
> > +            QDict *qdict = qobject_to(QDict, req_obj->req);
> > +            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
> > +            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
> > +            monitor_qmp_dispatch(mon, req_obj->req);
> > +        } else {
> > +            assert(req_obj->err);
> > +            rsp = qmp_error_response(req_obj->err);
> > +            req_obj->err = NULL;
> > +            monitor_qmp_respond(mon, rsp);
> > +            qobject_unref(rsp);
> > +        }
> >  
> > -    if (need_resume) {
> > -        /* Pairs with the monitor_suspend() in handle_qmp_command() */
> > -        monitor_resume(&mon->common);
> > -    }
> > -    qmp_request_free(req_obj);
> > +        if (need_resume) {
> > +            /* Pairs with the monitor_suspend() in handle_qmp_command() */
> > +            monitor_resume(&mon->common);
> > +        }
> > +        qmp_request_free(req_obj);
> 
> Unchanged apart from indentation.
> 
> >  
> > -    /* Reschedule instead of looping so the main loop stays responsive */
> > -    qemu_bh_schedule(qmp_dispatcher_bh);
> > +        /* Reschedule instead of looping so the main loop stays responsive */
> > +        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
> > +        qemu_coroutine_yield();
> 
> Does the comment need tweaking?  We actually loop now, just not
> immediately...

Hm, it's a loop now, but we're still rescheduling instead of directly
looping for the same reason. Suggestions how to phrase it?

> > +    }
> >  }
> >  
> >  static void handle_qmp_command(void *opaque, QObject *req, Error *err)
> > @@ -308,7 +329,9 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
> >      qemu_mutex_unlock(&mon->qmp_queue_lock);
> >  
> >      /* Kick the dispatcher routine */
> > -    qemu_bh_schedule(qmp_dispatcher_bh);
> > +    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
> > +        aio_co_wake(qmp_dispatcher_co);
> > +    }
> >  }
> >  
> >  static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
> > diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
> > index bc264b3c9b..6ccf19f2a2 100644
> > --- a/qapi/qmp-dispatch.c
> > +++ b/qapi/qmp-dispatch.c
> > @@ -12,6 +12,8 @@
> >   */
> >  
> >  #include "qemu/osdep.h"
> > +
> > +#include "monitor/monitor-internal.h"
> 
> Ugh.

Actually unused. I think I removed the offender because I thought the
same, but forgot to remove the #include.

> >  #include "qapi/error.h"
> >  #include "qapi/qmp/dispatch.h"
> >  #include "qapi/qmp/qdict.h"
> > @@ -75,6 +77,23 @@ static QDict *qmp_dispatch_check_obj(const QObject *request, bool allow_oob,
> >      return dict;
> >  }
> >  
> > +typedef struct QmpDispatchBH {
> > +    QmpCommand *cmd;
> > +    QDict *args;
> > +    QObject **ret;
> > +    Error **errp;
> > +    Coroutine *co;
> > +} QmpDispatchBH;
> > +
> > +static void do_qmp_dispatch_bh(void *opaque)
> > +{
> > +    QmpDispatchBH *data = opaque;
> > +    data->cmd->fn(data->args, data->ret, data->errp);
> > +    aio_co_wake(data->co);
> > +}
> > +
> > +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> > + * for everything else. */
> >  static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
> >                                  bool allow_oob, Error **errp)
> >  {
> > @@ -129,7 +148,22 @@ static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
> >          qobject_ref(args);
> >      }
> >  
> > -    cmd->fn(args, &ret, &local_err);
> > +    assert(!(oob && qemu_in_coroutine()));
> > +    if ((cmd->options & QCO_COROUTINE) || !qemu_in_coroutine()) {
> > +        cmd->fn(args, &ret, &local_err);
> > +    } else {
> > +        /* Must drop out of coroutine context for this one */
> > +        QmpDispatchBH data = {
> > +            .cmd    = cmd,
> > +            .args   = args,
> > +            .ret    = &ret,
> > +            .errp   = &local_err,
> > +            .co     = qemu_coroutine_self(),
> > +        };
> > +        aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
> > +                                &data);
> > +        qemu_coroutine_yield();
> > +    }
> >      if (local_err) {
> >          error_propagate(errp, local_err);
> >      } else if (cmd->options & QCO_NO_SUCCESS_RESP) {
> > @@ -164,6 +198,8 @@ bool qmp_is_oob(const QDict *dict)
> >          && !qdict_haskey(dict, "execute");
> >  }
> >  
> > +/* Runs outside of coroutine context for OOB commands, but in coroutine context
> > + * for everything else. */
> 
> Wing this comment, please.
> 
> Note: the precondition is asserted in do_qmp_dispatch() above.  We don't
> want to assert here, because we don't want to duplicate
> do_qmp_dispatch()'s computation of "execute OOB".
> 
> >  QDict *qmp_dispatch(QmpCommandList *cmds, QObject *request,
> >                      bool allow_oob)
> >  {
> 
> Puh, I made it!
> 
> My problem with this patch isn't that I don't trust it to work.  It's
> that I don't trust my ability to maintain such subtle code going
> forward.
> 
> Me learning more about low-level coroutine stuff should help.
> 
> Us making the code less subtle will certainly help.
> 
> Here's one idea.  The way we make the coroutine terminate was faitly
> hard to grasp for me.  Can we use the existing communication pipe,
> namely mon->qmp_requests?  It's a queue of QMPRequest.  A QMPRequest is
> either a request object (req && !err), or an Error to be reported (!req
> && err).  We could use !req && !err to mean "terminate".

I don't think this would make things any easier. All of the
synchronisation between the monitor thread and dispatcher stays, and all
of the iohandler_ctx vs. qemu_aio_context stays, too.

The only way I see to make termination a bit more obvious would be the
additional bool to request termination that I mentioned above.

Kevin

Patch
diff mbox series

diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index d6ce9efc8e..d6d5443391 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -30,6 +30,8 @@  typedef enum QmpCommandOptions
 typedef struct QmpCommand
 {
     const char *name;
+    /* Runs in coroutine context if QCO_COROUTINE is set, except for OOB
+     * commands */
     QmpCommandFunc *fn;
     QmpCommandOptions options;
     QTAILQ_ENTRY(QmpCommand) node;
diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h
index d78f5ca190..7389b6a56c 100644
--- a/monitor/monitor-internal.h
+++ b/monitor/monitor-internal.h
@@ -154,7 +154,8 @@  static inline bool monitor_is_qmp(const Monitor *mon)
 
 typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
 extern IOThread *mon_iothread;
-extern QEMUBH *qmp_dispatcher_bh;
+extern Coroutine *qmp_dispatcher_co;
+extern bool qmp_dispatcher_co_busy;
 extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
 extern QemuMutex monitor_lock;
 extern MonitorList mon_list;
@@ -172,7 +173,7 @@  void monitor_fdsets_cleanup(void);
 
 void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
 void monitor_data_destroy_qmp(MonitorQMP *mon);
-void monitor_qmp_bh_dispatcher(void *data);
+void coroutine_fn monitor_qmp_dispatcher_co(void *data);
 
 int get_monitor_def(int64_t *pval, const char *name);
 void help_cmd(Monitor *mon, const char *name);
diff --git a/monitor/monitor.c b/monitor/monitor.c
index 12898b6448..c72763fa4e 100644
--- a/monitor/monitor.c
+++ b/monitor/monitor.c
@@ -53,8 +53,9 @@  typedef struct {
 /* Shared monitor I/O thread */
 IOThread *mon_iothread;
 
-/* Bottom half to dispatch the requests received from I/O thread */
-QEMUBH *qmp_dispatcher_bh;
+/* Coroutine to dispatch the requests received from I/O thread */
+Coroutine *qmp_dispatcher_co;
+bool qmp_dispatcher_co_busy;
 
 /* Protects mon_list, monitor_qapi_event_state, monitor_destroyed.  */
 QemuMutex monitor_lock;
@@ -579,9 +580,16 @@  void monitor_cleanup(void)
     }
     qemu_mutex_unlock(&monitor_lock);
 
-    /* QEMUBHs needs to be deleted before destroying the I/O thread */
-    qemu_bh_delete(qmp_dispatcher_bh);
-    qmp_dispatcher_bh = NULL;
+    /* The dispatcher needs to stop before destroying the I/O thread */
+    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
+        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+        qmp_dispatcher_co = NULL;
+    }
+
+    AIO_WAIT_WHILE(qemu_get_aio_context(),
+                   (aio_bh_poll(iohandler_get_aio_context()),
+                    atomic_mb_read(&qmp_dispatcher_co_busy)));
+
     if (mon_iothread) {
         iothread_destroy(mon_iothread);
         mon_iothread = NULL;
@@ -604,9 +612,9 @@  void monitor_init_globals_core(void)
      * have commands assuming that context.  It would be nice to get
      * rid of those assumptions.
      */
-    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
-                                   monitor_qmp_bh_dispatcher,
-                                   NULL);
+    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
+    atomic_mb_set(&qmp_dispatcher_co_busy, true);
+    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
 }
 
 QemuOptsList qemu_mon_opts = {
diff --git a/monitor/qmp.c b/monitor/qmp.c
index b67a8e7d1f..f29a8fe497 100644
--- a/monitor/qmp.c
+++ b/monitor/qmp.c
@@ -133,6 +133,8 @@  static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
     }
 }
 
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
 {
     Monitor *old_mon;
@@ -211,43 +213,62 @@  static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
     return req_obj;
 }
 
-void monitor_qmp_bh_dispatcher(void *data)
+void coroutine_fn monitor_qmp_dispatcher_co(void *data)
 {
-    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
+    QMPRequest *req_obj = NULL;
     QDict *rsp;
     bool need_resume;
     MonitorQMP *mon;
 
-    if (!req_obj) {
-        return;
-    }
+    while (true) {
+        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
+
+        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
+            /* Wait to be reentered from handle_qmp_command, or terminate if
+             * qmp_dispatcher_co has been reset to NULL */
+            atomic_mb_set(&qmp_dispatcher_co_busy, false);
+            if (qmp_dispatcher_co) {
+                qemu_coroutine_yield();
+            }
+            /* qmp_dispatcher_co may have changed if we yielded and were
+             * reentered from monitor_cleanup() */
+            if (!qmp_dispatcher_co) {
+                return;
+            }
+            assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
+        }
 
-    mon = req_obj->mon;
-    /*  qmp_oob_enabled() might change after "qmp_capabilities" */
-    need_resume = !qmp_oob_enabled(mon) ||
-        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
-    qemu_mutex_unlock(&mon->qmp_queue_lock);
-    if (req_obj->req) {
-        QDict *qdict = qobject_to(QDict, req_obj->req);
-        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
-        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
-        monitor_qmp_dispatch(mon, req_obj->req);
-    } else {
-        assert(req_obj->err);
-        rsp = qmp_error_response(req_obj->err);
-        req_obj->err = NULL;
-        monitor_qmp_respond(mon, rsp);
-        qobject_unref(rsp);
-    }
+        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+
+        mon = req_obj->mon;
+        /*  qmp_oob_enabled() might change after "qmp_capabilities" */
+        need_resume = !qmp_oob_enabled(mon) ||
+            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
+        qemu_mutex_unlock(&mon->qmp_queue_lock);
+        if (req_obj->req) {
+            QDict *qdict = qobject_to(QDict, req_obj->req);
+            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
+            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
+            monitor_qmp_dispatch(mon, req_obj->req);
+        } else {
+            assert(req_obj->err);
+            rsp = qmp_error_response(req_obj->err);
+            req_obj->err = NULL;
+            monitor_qmp_respond(mon, rsp);
+            qobject_unref(rsp);
+        }
 
-    if (need_resume) {
-        /* Pairs with the monitor_suspend() in handle_qmp_command() */
-        monitor_resume(&mon->common);
-    }
-    qmp_request_free(req_obj);
+        if (need_resume) {
+            /* Pairs with the monitor_suspend() in handle_qmp_command() */
+            monitor_resume(&mon->common);
+        }
+        qmp_request_free(req_obj);
 
-    /* Reschedule instead of looping so the main loop stays responsive */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+        /* Reschedule instead of looping so the main loop stays responsive */
+        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+    }
 }
 
 static void handle_qmp_command(void *opaque, QObject *req, Error *err)
@@ -308,7 +329,9 @@  static void handle_qmp_command(void *opaque, QObject *req, Error *err)
     qemu_mutex_unlock(&mon->qmp_queue_lock);
 
     /* Kick the dispatcher routine */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+    if (!atomic_xchg(&qmp_dispatcher_co_busy, true)) {
+        aio_co_wake(qmp_dispatcher_co);
+    }
 }
 
 static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index bc264b3c9b..6ccf19f2a2 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -12,6 +12,8 @@ 
  */
 
 #include "qemu/osdep.h"
+
+#include "monitor/monitor-internal.h"
 #include "qapi/error.h"
 #include "qapi/qmp/dispatch.h"
 #include "qapi/qmp/qdict.h"
@@ -75,6 +77,23 @@  static QDict *qmp_dispatch_check_obj(const QObject *request, bool allow_oob,
     return dict;
 }
 
+typedef struct QmpDispatchBH {
+    QmpCommand *cmd;
+    QDict *args;
+    QObject **ret;
+    Error **errp;
+    Coroutine *co;
+} QmpDispatchBH;
+
+static void do_qmp_dispatch_bh(void *opaque)
+{
+    QmpDispatchBH *data = opaque;
+    data->cmd->fn(data->args, data->ret, data->errp);
+    aio_co_wake(data->co);
+}
+
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
                                 bool allow_oob, Error **errp)
 {
@@ -129,7 +148,22 @@  static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
         qobject_ref(args);
     }
 
-    cmd->fn(args, &ret, &local_err);
+    assert(!(oob && qemu_in_coroutine()));
+    if ((cmd->options & QCO_COROUTINE) || !qemu_in_coroutine()) {
+        cmd->fn(args, &ret, &local_err);
+    } else {
+        /* Must drop out of coroutine context for this one */
+        QmpDispatchBH data = {
+            .cmd    = cmd,
+            .args   = args,
+            .ret    = &ret,
+            .errp   = &local_err,
+            .co     = qemu_coroutine_self(),
+        };
+        aio_bh_schedule_oneshot(qemu_get_aio_context(), do_qmp_dispatch_bh,
+                                &data);
+        qemu_coroutine_yield();
+    }
     if (local_err) {
         error_propagate(errp, local_err);
     } else if (cmd->options & QCO_NO_SUCCESS_RESP) {
@@ -164,6 +198,8 @@  bool qmp_is_oob(const QDict *dict)
         && !qdict_haskey(dict, "execute");
 }
 
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 QDict *qmp_dispatch(QmpCommandList *cmds, QObject *request,
                     bool allow_oob)
 {