diff mbox series

[v3,2/5] monitor: drain requests queue with 'channel closed' event

Message ID 1606484146-913540-3-git-send-email-andrey.shinkevich@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series Increase amount of data for monitor to read | expand

Commit Message

Andrey Shinkevich Nov. 27, 2020, 1:35 p.m. UTC
When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
unprocessed commands. It can happen with QMP capability OOB enabled.
Let the dispatcher complete handling requests rest in the monitor
queue.

Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
---
 monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
 1 file changed, 21 insertions(+), 25 deletions(-)

Comments

Markus Armbruster March 2, 2021, 1:53 p.m. UTC | #1
Andrey Shinkevich via <qemu-devel@nongnu.org> writes:

> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
> unprocessed commands. It can happen with QMP capability OOB enabled.
> Let the dispatcher complete handling requests rest in the monitor
> queue.
>
> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
> ---
>  monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>  1 file changed, 21 insertions(+), 25 deletions(-)
>
> diff --git a/monitor/qmp.c b/monitor/qmp.c
> index 7169366..a86ed35 100644
> --- a/monitor/qmp.c
> +++ b/monitor/qmp.c
> @@ -75,36 +75,32 @@ static void monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>      }
>  }
>  
> -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
> +/*
> + * Let unprocessed QMP commands be handled.
> + */
> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>  {
> -    qemu_mutex_lock(&mon->qmp_queue_lock);
> +    bool q_is_empty = false;
>  
> -    /*
> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
> -     * removing an element from the queue (hence no `- 1`).
> -     * Also, the queue should not be empty either, otherwise the
> -     * monitor hasn't been suspended yet (or was already resumed).
> -     */
> -    bool need_resume = (!qmp_oob_enabled(mon) ||
> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
> -        && !g_queue_is_empty(mon->qmp_requests);
> +    while (!q_is_empty) {
> +        qemu_mutex_lock(&mon->qmp_queue_lock);
> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>  
> -    monitor_qmp_cleanup_req_queue_locked(mon);
> +        if (!q_is_empty) {
> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
> +                /* Kick the dispatcher coroutine */
> +                aio_co_wake(qmp_dispatcher_co);
> +            } else {
> +                /* Let the dispatcher do its job for a while */
> +                g_usleep(40);
> +            }
> +        }
> +    }
>  
> -    if (need_resume) {
> -        /*
> -         * handle_qmp_command() suspended the monitor because the
> -         * request queue filled up, to be resumed when the queue has
> -         * space again.  We just emptied it; resume the monitor.
> -         *
> -         * Without this, the monitor would remain suspended forever
> -         * when we get here while the monitor is suspended.  An
> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
> -         */
> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>          monitor_resume(&mon->common);
>      }
> -
> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>  }
>  
>  void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque, QEMUChrEvent event)
>           * stdio, it's possible that stdout is still open when stdin
>           * is closed.
>           */
> -        monitor_qmp_cleanup_queue_and_resume(mon);
> +        monitor_qmp_drain_queue(mon);
>          json_message_parser_destroy(&mon->parser);
>          json_message_parser_init(&mon->parser, handle_qmp_command,
>                                   mon, NULL);

Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
throw away the contents of the request queue, and resume the monitor if
suspended.

Afterwards: we call monitor_qmp_drain_queue() to wait for the request
queue to drain.  I think.  Before we discuss the how, I have a question
the commit message should answer, but doesn't: why?
Vladimir Sementsov-Ogievskiy March 2, 2021, 3:25 p.m. UTC | #2
02.03.2021 16:53, Markus Armbruster wrote:
> Andrey Shinkevich via <qemu-devel@nongnu.org> writes:
> 
>> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
>> unprocessed commands. It can happen with QMP capability OOB enabled.
>> Let the dispatcher complete handling requests rest in the monitor
>> queue.
>>
>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>> ---
>>   monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>>   1 file changed, 21 insertions(+), 25 deletions(-)
>>
>> diff --git a/monitor/qmp.c b/monitor/qmp.c
>> index 7169366..a86ed35 100644
>> --- a/monitor/qmp.c
>> +++ b/monitor/qmp.c
>> @@ -75,36 +75,32 @@ static void monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>>       }
>>   }
>>   
>> -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
>> +/*
>> + * Let unprocessed QMP commands be handled.
>> + */
>> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>>   {
>> -    qemu_mutex_lock(&mon->qmp_queue_lock);
>> +    bool q_is_empty = false;
>>   
>> -    /*
>> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
>> -     * removing an element from the queue (hence no `- 1`).
>> -     * Also, the queue should not be empty either, otherwise the
>> -     * monitor hasn't been suspended yet (or was already resumed).
>> -     */
>> -    bool need_resume = (!qmp_oob_enabled(mon) ||
>> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
>> -        && !g_queue_is_empty(mon->qmp_requests);
>> +    while (!q_is_empty) {
>> +        qemu_mutex_lock(&mon->qmp_queue_lock);
>> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
>> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>>   
>> -    monitor_qmp_cleanup_req_queue_locked(mon);
>> +        if (!q_is_empty) {
>> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
>> +                /* Kick the dispatcher coroutine */
>> +                aio_co_wake(qmp_dispatcher_co);
>> +            } else {
>> +                /* Let the dispatcher do its job for a while */
>> +                g_usleep(40);
>> +            }
>> +        }
>> +    }
>>   
>> -    if (need_resume) {
>> -        /*
>> -         * handle_qmp_command() suspended the monitor because the
>> -         * request queue filled up, to be resumed when the queue has
>> -         * space again.  We just emptied it; resume the monitor.
>> -         *
>> -         * Without this, the monitor would remain suspended forever
>> -         * when we get here while the monitor is suspended.  An
>> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
>> -         */
>> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>>           monitor_resume(&mon->common);
>>       }
>> -
>> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>>   }
>>   
>>   void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
>> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque, QEMUChrEvent event)
>>            * stdio, it's possible that stdout is still open when stdin
>>            * is closed.
>>            */
>> -        monitor_qmp_cleanup_queue_and_resume(mon);
>> +        monitor_qmp_drain_queue(mon);
>>           json_message_parser_destroy(&mon->parser);
>>           json_message_parser_init(&mon->parser, handle_qmp_command,
>>                                    mon, NULL);
> 
> Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
> throw away the contents of the request queue, and resume the monitor if
> suspended.
> 
> Afterwards: we call monitor_qmp_drain_queue() to wait for the request
> queue to drain.  I think.  Before we discuss the how, I have a question
> the commit message should answer, but doesn't: why?
> 

Hi!

Andrey is not in Virtuozzo now, and nobody doing this work actually.. Honestly, I don't believe that the feature should be so difficult.

Actually, we have the following patch in Virtuozzo 7 (Rhel7 based) for years, and it just works without any problems:

--- a/monitor.c
+++ b/monitor.c
@@ -4013,7 +4013,7 @@ static int monitor_can_read(void *opaque)
  {
      Monitor *mon = opaque;
  
-    return !atomic_mb_read(&mon->suspend_cnt);
+    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
  }


And in Vz8 (Rhel8 based), it looks like (to avoid assertion in handle_qmp_command()):

--- a/include/monitor/monitor.h
+++ b/include/monitor/monitor.h
@@ -9,7 +9,7 @@ extern __thread Monitor *cur_mon;
  typedef struct MonitorHMP MonitorHMP;
  typedef struct MonitorOptions MonitorOptions;
  
-#define QMP_REQ_QUEUE_LEN_MAX 8
+#define QMP_REQ_QUEUE_LEN_MAX 4096
  
  extern QemuOptsList qemu_mon_opts;
  
diff --git a/monitor/monitor.c b/monitor/monitor.c
index b385a3d569..a124d010f3 100644
--- a/monitor/monitor.c
+++ b/monitor/monitor.c
@@ -501,7 +501,7 @@ int monitor_can_read(void *opaque)
  {
      Monitor *mon = opaque;
  
-    return !atomic_mb_read(&mon->suspend_cnt);
+    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
  }


There are some theoretical risks of overflowing... But it just works. Still this probably not good for upstream. And I'm not sure how would it work with OOB..
Denis V. Lunev March 2, 2021, 4:32 p.m. UTC | #3
On 3/2/21 6:25 PM, Vladimir Sementsov-Ogievskiy wrote:
> 02.03.2021 16:53, Markus Armbruster wrote:
>> Andrey Shinkevich via <qemu-devel@nongnu.org> writes:
>>
>>> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
>>> unprocessed commands. It can happen with QMP capability OOB enabled.
>>> Let the dispatcher complete handling requests rest in the monitor
>>> queue.
>>>
>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>> ---
>>>   monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>>>   1 file changed, 21 insertions(+), 25 deletions(-)
>>>
>>> diff --git a/monitor/qmp.c b/monitor/qmp.c
>>> index 7169366..a86ed35 100644
>>> --- a/monitor/qmp.c
>>> +++ b/monitor/qmp.c
>>> @@ -75,36 +75,32 @@ static void
>>> monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>>>       }
>>>   }
>>>   -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
>>> +/*
>>> + * Let unprocessed QMP commands be handled.
>>> + */
>>> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>>>   {
>>> -    qemu_mutex_lock(&mon->qmp_queue_lock);
>>> +    bool q_is_empty = false;
>>>   -    /*
>>> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
>>> -     * removing an element from the queue (hence no `- 1`).
>>> -     * Also, the queue should not be empty either, otherwise the
>>> -     * monitor hasn't been suspended yet (or was already resumed).
>>> -     */
>>> -    bool need_resume = (!qmp_oob_enabled(mon) ||
>>> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
>>> -        && !g_queue_is_empty(mon->qmp_requests);
>>> +    while (!q_is_empty) {
>>> +        qemu_mutex_lock(&mon->qmp_queue_lock);
>>> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
>>> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>   -    monitor_qmp_cleanup_req_queue_locked(mon);
>>> +        if (!q_is_empty) {
>>> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
>>> +                /* Kick the dispatcher coroutine */
>>> +                aio_co_wake(qmp_dispatcher_co);
>>> +            } else {
>>> +                /* Let the dispatcher do its job for a while */
>>> +                g_usleep(40);
>>> +            }
>>> +        }
>>> +    }
>>>   -    if (need_resume) {
>>> -        /*
>>> -         * handle_qmp_command() suspended the monitor because the
>>> -         * request queue filled up, to be resumed when the queue has
>>> -         * space again.  We just emptied it; resume the monitor.
>>> -         *
>>> -         * Without this, the monitor would remain suspended forever
>>> -         * when we get here while the monitor is suspended.  An
>>> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
>>> -         */
>>> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>>>           monitor_resume(&mon->common);
>>>       }
>>> -
>>> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>   }
>>>     void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
>>> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque,
>>> QEMUChrEvent event)
>>>            * stdio, it's possible that stdout is still open when stdin
>>>            * is closed.
>>>            */
>>> -        monitor_qmp_cleanup_queue_and_resume(mon);
>>> +        monitor_qmp_drain_queue(mon);
>>>           json_message_parser_destroy(&mon->parser);
>>>           json_message_parser_init(&mon->parser, handle_qmp_command,
>>>                                    mon, NULL);
>>
>> Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
>> throw away the contents of the request queue, and resume the monitor if
>> suspended.
>>
>> Afterwards: we call monitor_qmp_drain_queue() to wait for the request
>> queue to drain.  I think.  Before we discuss the how, I have a question
>> the commit message should answer, but doesn't: why?
>>
>
> Hi!
>
> Andrey is not in Virtuozzo now, and nobody doing this work actually..
> Honestly, I don't believe that the feature should be so difficult.
>
> Actually, we have the following patch in Virtuozzo 7 (Rhel7 based) for
> years, and it just works without any problems:
>
> --- a/monitor.c
> +++ b/monitor.c
> @@ -4013,7 +4013,7 @@ static int monitor_can_read(void *opaque)
>  {
>      Monitor *mon = opaque;
>  
> -    return !atomic_mb_read(&mon->suspend_cnt);
> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>  }
>
>
> And in Vz8 (Rhel8 based), it looks like (to avoid assertion in
> handle_qmp_command()):
>
> --- a/include/monitor/monitor.h
> +++ b/include/monitor/monitor.h
> @@ -9,7 +9,7 @@ extern __thread Monitor *cur_mon;
>  typedef struct MonitorHMP MonitorHMP;
>  typedef struct MonitorOptions MonitorOptions;
>  
> -#define QMP_REQ_QUEUE_LEN_MAX 8
> +#define QMP_REQ_QUEUE_LEN_MAX 4096
>  
>  extern QemuOptsList qemu_mon_opts;
>  
> diff --git a/monitor/monitor.c b/monitor/monitor.c
> index b385a3d569..a124d010f3 100644
> --- a/monitor/monitor.c
> +++ b/monitor/monitor.c
> @@ -501,7 +501,7 @@ int monitor_can_read(void *opaque)
>  {
>      Monitor *mon = opaque;
>  
> -    return !atomic_mb_read(&mon->suspend_cnt);
> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>  }
>
>
> There are some theoretical risks of overflowing... But it just works.
> Still this probably not good for upstream. And I'm not sure how would
> it work with OOB..
>
>
I believe that this piece has been done to pass unit tests.
I am unsure at the moment which one will failed with
the queue length increase.

At least this is my gut feeling.

Den
Vladimir Sementsov-Ogievskiy March 2, 2021, 5:02 p.m. UTC | #4
02.03.2021 19:32, Denis V. Lunev wrote:
> On 3/2/21 6:25 PM, Vladimir Sementsov-Ogievskiy wrote:
>> 02.03.2021 16:53, Markus Armbruster wrote:
>>> Andrey Shinkevich via <qemu-devel@nongnu.org> writes:
>>>
>>>> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
>>>> unprocessed commands. It can happen with QMP capability OOB enabled.
>>>> Let the dispatcher complete handling requests rest in the monitor
>>>> queue.
>>>>
>>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>>> ---
>>>>    monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>>>>    1 file changed, 21 insertions(+), 25 deletions(-)
>>>>
>>>> diff --git a/monitor/qmp.c b/monitor/qmp.c
>>>> index 7169366..a86ed35 100644
>>>> --- a/monitor/qmp.c
>>>> +++ b/monitor/qmp.c
>>>> @@ -75,36 +75,32 @@ static void
>>>> monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>>>>        }
>>>>    }
>>>>    -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
>>>> +/*
>>>> + * Let unprocessed QMP commands be handled.
>>>> + */
>>>> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>>>>    {
>>>> -    qemu_mutex_lock(&mon->qmp_queue_lock);
>>>> +    bool q_is_empty = false;
>>>>    -    /*
>>>> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
>>>> -     * removing an element from the queue (hence no `- 1`).
>>>> -     * Also, the queue should not be empty either, otherwise the
>>>> -     * monitor hasn't been suspended yet (or was already resumed).
>>>> -     */
>>>> -    bool need_resume = (!qmp_oob_enabled(mon) ||
>>>> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
>>>> -        && !g_queue_is_empty(mon->qmp_requests);
>>>> +    while (!q_is_empty) {
>>>> +        qemu_mutex_lock(&mon->qmp_queue_lock);
>>>> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
>>>> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>>    -    monitor_qmp_cleanup_req_queue_locked(mon);
>>>> +        if (!q_is_empty) {
>>>> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
>>>> +                /* Kick the dispatcher coroutine */
>>>> +                aio_co_wake(qmp_dispatcher_co);
>>>> +            } else {
>>>> +                /* Let the dispatcher do its job for a while */
>>>> +                g_usleep(40);
>>>> +            }
>>>> +        }
>>>> +    }
>>>>    -    if (need_resume) {
>>>> -        /*
>>>> -         * handle_qmp_command() suspended the monitor because the
>>>> -         * request queue filled up, to be resumed when the queue has
>>>> -         * space again.  We just emptied it; resume the monitor.
>>>> -         *
>>>> -         * Without this, the monitor would remain suspended forever
>>>> -         * when we get here while the monitor is suspended.  An
>>>> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
>>>> -         */
>>>> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>>>>            monitor_resume(&mon->common);
>>>>        }
>>>> -
>>>> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>>    }
>>>>      void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
>>>> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque,
>>>> QEMUChrEvent event)
>>>>             * stdio, it's possible that stdout is still open when stdin
>>>>             * is closed.
>>>>             */
>>>> -        monitor_qmp_cleanup_queue_and_resume(mon);
>>>> +        monitor_qmp_drain_queue(mon);
>>>>            json_message_parser_destroy(&mon->parser);
>>>>            json_message_parser_init(&mon->parser, handle_qmp_command,
>>>>                                     mon, NULL);
>>>
>>> Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
>>> throw away the contents of the request queue, and resume the monitor if
>>> suspended.
>>>
>>> Afterwards: we call monitor_qmp_drain_queue() to wait for the request
>>> queue to drain.  I think.  Before we discuss the how, I have a question
>>> the commit message should answer, but doesn't: why?
>>>
>>
>> Hi!
>>
>> Andrey is not in Virtuozzo now, and nobody doing this work actually..
>> Honestly, I don't believe that the feature should be so difficult.
>>
>> Actually, we have the following patch in Virtuozzo 7 (Rhel7 based) for
>> years, and it just works without any problems:
>>
>> --- a/monitor.c
>> +++ b/monitor.c
>> @@ -4013,7 +4013,7 @@ static int monitor_can_read(void *opaque)
>>   {
>>       Monitor *mon = opaque;
>>   
>> -    return !atomic_mb_read(&mon->suspend_cnt);
>> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>>   }
>>
>>
>> And in Vz8 (Rhel8 based), it looks like (to avoid assertion in
>> handle_qmp_command()):
>>
>> --- a/include/monitor/monitor.h
>> +++ b/include/monitor/monitor.h
>> @@ -9,7 +9,7 @@ extern __thread Monitor *cur_mon;
>>   typedef struct MonitorHMP MonitorHMP;
>>   typedef struct MonitorOptions MonitorOptions;
>>   
>> -#define QMP_REQ_QUEUE_LEN_MAX 8
>> +#define QMP_REQ_QUEUE_LEN_MAX 4096
>>   
>>   extern QemuOptsList qemu_mon_opts;
>>   
>> diff --git a/monitor/monitor.c b/monitor/monitor.c
>> index b385a3d569..a124d010f3 100644
>> --- a/monitor/monitor.c
>> +++ b/monitor/monitor.c
>> @@ -501,7 +501,7 @@ int monitor_can_read(void *opaque)
>>   {
>>       Monitor *mon = opaque;
>>   
>> -    return !atomic_mb_read(&mon->suspend_cnt);
>> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>>   }
>>
>>
>> There are some theoretical risks of overflowing... But it just works.
>> Still this probably not good for upstream. And I'm not sure how would
>> it work with OOB..
>>
>>
> I believe that this piece has been done to pass unit tests.
> I am unsure at the moment which one will failed with
> the queue length increase.
> 
> At least this is my gut feeling.
> 


Tests are passing.. Actually, the most relevant thread is:

   https://patchew.org/QEMU/20190610105906.28524-1-dplotnikov@virtuozzo.com/

I'll ping it
Markus Armbruster March 5, 2021, 1:41 p.m. UTC | #5
Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> writes:

> 02.03.2021 16:53, Markus Armbruster wrote:
>> Andrey Shinkevich via <qemu-devel@nongnu.org> writes:
>> 
>>> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
>>> unprocessed commands. It can happen with QMP capability OOB enabled.
>>> Let the dispatcher complete handling requests rest in the monitor
>>> queue.
>>>
>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>> ---
>>>   monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>>>   1 file changed, 21 insertions(+), 25 deletions(-)
>>>
>>> diff --git a/monitor/qmp.c b/monitor/qmp.c
>>> index 7169366..a86ed35 100644
>>> --- a/monitor/qmp.c
>>> +++ b/monitor/qmp.c
>>> @@ -75,36 +75,32 @@ static void monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>>>       }
>>>   }
>>>   
>>> -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
>>> +/*
>>> + * Let unprocessed QMP commands be handled.
>>> + */
>>> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>>>   {
>>> -    qemu_mutex_lock(&mon->qmp_queue_lock);
>>> +    bool q_is_empty = false;
>>>   
>>> -    /*
>>> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
>>> -     * removing an element from the queue (hence no `- 1`).
>>> -     * Also, the queue should not be empty either, otherwise the
>>> -     * monitor hasn't been suspended yet (or was already resumed).
>>> -     */
>>> -    bool need_resume = (!qmp_oob_enabled(mon) ||
>>> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
>>> -        && !g_queue_is_empty(mon->qmp_requests);
>>> +    while (!q_is_empty) {
>>> +        qemu_mutex_lock(&mon->qmp_queue_lock);
>>> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
>>> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>   
>>> -    monitor_qmp_cleanup_req_queue_locked(mon);
>>> +        if (!q_is_empty) {
>>> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
>>> +                /* Kick the dispatcher coroutine */
>>> +                aio_co_wake(qmp_dispatcher_co);
>>> +            } else {
>>> +                /* Let the dispatcher do its job for a while */
>>> +                g_usleep(40);
>>> +            }
>>> +        }
>>> +    }
>>>   
>>> -    if (need_resume) {
>>> -        /*
>>> -         * handle_qmp_command() suspended the monitor because the
>>> -         * request queue filled up, to be resumed when the queue has
>>> -         * space again.  We just emptied it; resume the monitor.
>>> -         *
>>> -         * Without this, the monitor would remain suspended forever
>>> -         * when we get here while the monitor is suspended.  An
>>> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
>>> -         */
>>> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>>>           monitor_resume(&mon->common);
>>>       }
>>> -
>>> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>   }
>>>   
>>>   void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
>>> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque, QEMUChrEvent event)
>>>            * stdio, it's possible that stdout is still open when stdin
>>>            * is closed.
>>>            */
>>> -        monitor_qmp_cleanup_queue_and_resume(mon);
>>> +        monitor_qmp_drain_queue(mon);
>>>           json_message_parser_destroy(&mon->parser);
>>>           json_message_parser_init(&mon->parser, handle_qmp_command,
>>>                                    mon, NULL);
>> 
>> Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
>> throw away the contents of the request queue, and resume the monitor if
>> suspended.
>> 
>> Afterwards: we call monitor_qmp_drain_queue() to wait for the request
>> queue to drain.  I think.  Before we discuss the how, I have a question
>> the commit message should answer, but doesn't: why?
>> 
>
> Hi!
>
> Andrey is not in Virtuozzo now, and nobody doing this work actually.. Honestly, I don't believe that the feature should be so difficult.
>
> Actually, we have the following patch in Virtuozzo 7 (Rhel7 based) for years, and it just works without any problems:

I appreciate your repeated efforts to get your downstream patch
upstream.

> --- a/monitor.c
> +++ b/monitor.c
> @@ -4013,7 +4013,7 @@ static int monitor_can_read(void *opaque)
>   {
>       Monitor *mon = opaque;
>   
> -    return !atomic_mb_read(&mon->suspend_cnt);
> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>   }
>
>
> And in Vz8 (Rhel8 based), it looks like (to avoid assertion in handle_qmp_command()):
>
> --- a/include/monitor/monitor.h
> +++ b/include/monitor/monitor.h
> @@ -9,7 +9,7 @@ extern __thread Monitor *cur_mon;
>   typedef struct MonitorHMP MonitorHMP;
>   typedef struct MonitorOptions MonitorOptions;
>   
> -#define QMP_REQ_QUEUE_LEN_MAX 8
> +#define QMP_REQ_QUEUE_LEN_MAX 4096
>   
>   extern QemuOptsList qemu_mon_opts;
>   
>
> diff --git a/monitor/monitor.c b/monitor/monitor.c
> index b385a3d569..a124d010f3 100644
> --- a/monitor/monitor.c
> +++ b/monitor/monitor.c
> @@ -501,7 +501,7 @@ int monitor_can_read(void *opaque)
>   {
>       Monitor *mon = opaque;
>   
> -    return !atomic_mb_read(&mon->suspend_cnt);
> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>   }
>
>
> There are some theoretical risks of overflowing... But it just works. Still this probably not good for upstream. And I'm not sure how would it work with OOB..

This is exactly what makes the feature difficult: we need to think
through the ramifications taking OOB and coroutines into account.

So far, the feature has been important enough to post patches, but not
important enough to accompany them with a "think through".

Sometimes, maintainers are willing and able to do some of the patch
submitter's work for them.  I haven't been able to do that for this
feature.  I'll need more help, I'm afraid.
Vladimir Sementsov-Ogievskiy March 5, 2021, 2:01 p.m. UTC | #6
05.03.2021 16:41, Markus Armbruster wrote:
> Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> writes:
> 
>> 02.03.2021 16:53, Markus Armbruster wrote:
>>> Andrey Shinkevich via <qemu-devel@nongnu.org> writes:
>>>
>>>> When CHR_EVENT_CLOSED comes, the QMP requests queue may still contain
>>>> unprocessed commands. It can happen with QMP capability OOB enabled.
>>>> Let the dispatcher complete handling requests rest in the monitor
>>>> queue.
>>>>
>>>> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
>>>> ---
>>>>    monitor/qmp.c | 46 +++++++++++++++++++++-------------------------
>>>>    1 file changed, 21 insertions(+), 25 deletions(-)
>>>>
>>>> diff --git a/monitor/qmp.c b/monitor/qmp.c
>>>> index 7169366..a86ed35 100644
>>>> --- a/monitor/qmp.c
>>>> +++ b/monitor/qmp.c
>>>> @@ -75,36 +75,32 @@ static void monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
>>>>        }
>>>>    }
>>>>    
>>>> -static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
>>>> +/*
>>>> + * Let unprocessed QMP commands be handled.
>>>> + */
>>>> +static void monitor_qmp_drain_queue(MonitorQMP *mon)
>>>>    {
>>>> -    qemu_mutex_lock(&mon->qmp_queue_lock);
>>>> +    bool q_is_empty = false;
>>>>    
>>>> -    /*
>>>> -     * Same condition as in monitor_qmp_dispatcher_co(), but before
>>>> -     * removing an element from the queue (hence no `- 1`).
>>>> -     * Also, the queue should not be empty either, otherwise the
>>>> -     * monitor hasn't been suspended yet (or was already resumed).
>>>> -     */
>>>> -    bool need_resume = (!qmp_oob_enabled(mon) ||
>>>> -        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
>>>> -        && !g_queue_is_empty(mon->qmp_requests);
>>>> +    while (!q_is_empty) {
>>>> +        qemu_mutex_lock(&mon->qmp_queue_lock);
>>>> +        q_is_empty = g_queue_is_empty(mon->qmp_requests);
>>>> +        qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>>    
>>>> -    monitor_qmp_cleanup_req_queue_locked(mon);
>>>> +        if (!q_is_empty) {
>>>> +            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
>>>> +                /* Kick the dispatcher coroutine */
>>>> +                aio_co_wake(qmp_dispatcher_co);
>>>> +            } else {
>>>> +                /* Let the dispatcher do its job for a while */
>>>> +                g_usleep(40);
>>>> +            }
>>>> +        }
>>>> +    }
>>>>    
>>>> -    if (need_resume) {
>>>> -        /*
>>>> -         * handle_qmp_command() suspended the monitor because the
>>>> -         * request queue filled up, to be resumed when the queue has
>>>> -         * space again.  We just emptied it; resume the monitor.
>>>> -         *
>>>> -         * Without this, the monitor would remain suspended forever
>>>> -         * when we get here while the monitor is suspended.  An
>>>> -         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
>>>> -         */
>>>> +    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
>>>>            monitor_resume(&mon->common);
>>>>        }
>>>> -
>>>> -    qemu_mutex_unlock(&mon->qmp_queue_lock);
>>>>    }
>>>>    
>>>>    void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
>>>> @@ -418,7 +414,7 @@ static void monitor_qmp_event(void *opaque, QEMUChrEvent event)
>>>>             * stdio, it's possible that stdout is still open when stdin
>>>>             * is closed.
>>>>             */
>>>> -        monitor_qmp_cleanup_queue_and_resume(mon);
>>>> +        monitor_qmp_drain_queue(mon);
>>>>            json_message_parser_destroy(&mon->parser);
>>>>            json_message_parser_init(&mon->parser, handle_qmp_command,
>>>>                                     mon, NULL);
>>>
>>> Before the patch: we call monitor_qmp_cleanup_queue_and_resume() to
>>> throw away the contents of the request queue, and resume the monitor if
>>> suspended.
>>>
>>> Afterwards: we call monitor_qmp_drain_queue() to wait for the request
>>> queue to drain.  I think.  Before we discuss the how, I have a question
>>> the commit message should answer, but doesn't: why?
>>>
>>
>> Hi!
>>
>> Andrey is not in Virtuozzo now, and nobody doing this work actually.. Honestly, I don't believe that the feature should be so difficult.
>>
>> Actually, we have the following patch in Virtuozzo 7 (Rhel7 based) for years, and it just works without any problems:
> 
> I appreciate your repeated efforts to get your downstream patch
> upstream.
> 
>> --- a/monitor.c
>> +++ b/monitor.c
>> @@ -4013,7 +4013,7 @@ static int monitor_can_read(void *opaque)
>>    {
>>        Monitor *mon = opaque;
>>    
>> -    return !atomic_mb_read(&mon->suspend_cnt);
>> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>>    }
>>
>>
>> And in Vz8 (Rhel8 based), it looks like (to avoid assertion in handle_qmp_command()):
>>
>> --- a/include/monitor/monitor.h
>> +++ b/include/monitor/monitor.h
>> @@ -9,7 +9,7 @@ extern __thread Monitor *cur_mon;
>>    typedef struct MonitorHMP MonitorHMP;
>>    typedef struct MonitorOptions MonitorOptions;
>>    
>> -#define QMP_REQ_QUEUE_LEN_MAX 8
>> +#define QMP_REQ_QUEUE_LEN_MAX 4096
>>    
>>    extern QemuOptsList qemu_mon_opts;
>>    
>>
>> diff --git a/monitor/monitor.c b/monitor/monitor.c
>> index b385a3d569..a124d010f3 100644
>> --- a/monitor/monitor.c
>> +++ b/monitor/monitor.c
>> @@ -501,7 +501,7 @@ int monitor_can_read(void *opaque)
>>    {
>>        Monitor *mon = opaque;
>>    
>> -    return !atomic_mb_read(&mon->suspend_cnt);
>> +    return !atomic_mb_read(&mon->suspend_cnt) ? 4096 : 0;
>>    }
>>
>>
>> There are some theoretical risks of overflowing... But it just works. Still this probably not good for upstream. And I'm not sure how would it work with OOB..
> 
> This is exactly what makes the feature difficult: we need to think
> through the ramifications taking OOB and coroutines into account.
> 
> So far, the feature has been important enough to post patches, but not
> important enough to accompany them with a "think through".
> 
> Sometimes, maintainers are willing and able to do some of the patch
> submitter's work for them.  I haven't been able to do that for this
> feature.  I'll need more help, I'm afraid.
> 

OK, I understand. So, when we are ready, we'll come with new fresh series and good explanation.
diff mbox series

Patch

diff --git a/monitor/qmp.c b/monitor/qmp.c
index 7169366..a86ed35 100644
--- a/monitor/qmp.c
+++ b/monitor/qmp.c
@@ -75,36 +75,32 @@  static void monitor_qmp_cleanup_req_queue_locked(MonitorQMP *mon)
     }
 }
 
-static void monitor_qmp_cleanup_queue_and_resume(MonitorQMP *mon)
+/*
+ * Let unprocessed QMP commands be handled.
+ */
+static void monitor_qmp_drain_queue(MonitorQMP *mon)
 {
-    qemu_mutex_lock(&mon->qmp_queue_lock);
+    bool q_is_empty = false;
 
-    /*
-     * Same condition as in monitor_qmp_dispatcher_co(), but before
-     * removing an element from the queue (hence no `- 1`).
-     * Also, the queue should not be empty either, otherwise the
-     * monitor hasn't been suspended yet (or was already resumed).
-     */
-    bool need_resume = (!qmp_oob_enabled(mon) ||
-        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX)
-        && !g_queue_is_empty(mon->qmp_requests);
+    while (!q_is_empty) {
+        qemu_mutex_lock(&mon->qmp_queue_lock);
+        q_is_empty = g_queue_is_empty(mon->qmp_requests);
+        qemu_mutex_unlock(&mon->qmp_queue_lock);
 
-    monitor_qmp_cleanup_req_queue_locked(mon);
+        if (!q_is_empty) {
+            if (!qatomic_xchg(&qmp_dispatcher_co_busy, true)) {
+                /* Kick the dispatcher coroutine */
+                aio_co_wake(qmp_dispatcher_co);
+            } else {
+                /* Let the dispatcher do its job for a while */
+                g_usleep(40);
+            }
+        }
+    }
 
-    if (need_resume) {
-        /*
-         * handle_qmp_command() suspended the monitor because the
-         * request queue filled up, to be resumed when the queue has
-         * space again.  We just emptied it; resume the monitor.
-         *
-         * Without this, the monitor would remain suspended forever
-         * when we get here while the monitor is suspended.  An
-         * unfortunately timed CHR_EVENT_CLOSED can do the trick.
-         */
+    if (qatomic_mb_read(&mon->common.suspend_cnt)) {
         monitor_resume(&mon->common);
     }
-
-    qemu_mutex_unlock(&mon->qmp_queue_lock);
 }
 
 void qmp_send_response(MonitorQMP *mon, const QDict *rsp)
@@ -418,7 +414,7 @@  static void monitor_qmp_event(void *opaque, QEMUChrEvent event)
          * stdio, it's possible that stdout is still open when stdin
          * is closed.
          */
-        monitor_qmp_cleanup_queue_and_resume(mon);
+        monitor_qmp_drain_queue(mon);
         json_message_parser_destroy(&mon->parser);
         json_message_parser_init(&mon->parser, handle_qmp_command,
                                  mon, NULL);