diff mbox

[1/1] linux-aio: prevent submitting more than MAX_EVENTS

Message ID 1468345887-22077-1-git-send-email-roman.penyaev@profitbricks.com (mailing list archive)
State New, archived
Headers show

Commit Message

Roman Pen July 12, 2016, 5:51 p.m. UTC
Invoking io_setup(MAX_EVENTS) we ask kernel to create ring buffer for us
with specified number of events.  But kernel ring buffer allocation logic
is a bit tricky (ring buffer is page size aligned + some percpu allocation
are required) so eventually more than requested events number is allocated.

From a userspace side we have to following the convention and should not
try to io_submit() more or logic, which consumes completed events, should
be changed accordingly.  The pitfall is in the following sequence:

    MAX_EVENTS = 128
    io_setup(MAX_EVENTS)

    io_submit(MAX_EVENTS)
    io_submit(MAX_EVENTS)

    /* now 256 events are in-flight */

    io_getevents(MAX_EVENTS) = 128

    /* we can handle only 128 events at once, to be sure
     * that nothing is pended the io_getevents(MAX_EVENTS)
     * call must be invoked once more or hang will happen. */

To prevent the hang or reiteration of io_getevents() call this patch
restricts the number of in-flights, which is limited to MAX_EVENTS.

Signed-off-by: Roman Pen <roman.penyaev@profitbricks.com>
Cc: Stefan Hajnoczi <stefanha@redhat.com>
Cc: Paolo Bonzini <pbonzini@redhat.com>
Cc: qemu-devel@nongnu.org
---
 block/linux-aio.c | 25 +++++++++++++++----------
 1 file changed, 15 insertions(+), 10 deletions(-)

Comments

Fam Zheng July 13, 2016, 2:23 a.m. UTC | #1
On Tue, 07/12 19:51, Roman Pen wrote:
> Invoking io_setup(MAX_EVENTS) we ask kernel to create ring buffer for us
> with specified number of events.  But kernel ring buffer allocation logic
> is a bit tricky (ring buffer is page size aligned + some percpu allocation
> are required) so eventually more than requested events number is allocated.
> 
> From a userspace side we have to following the convention and should not

s/following/follow/

> try to io_submit() more or logic, which consumes completed events, should
> be changed accordingly.  The pitfall is in the following sequence:
> 
>     MAX_EVENTS = 128
>     io_setup(MAX_EVENTS)
> 
>     io_submit(MAX_EVENTS)
>     io_submit(MAX_EVENTS)
> 
>     /* now 256 events are in-flight */
> 
>     io_getevents(MAX_EVENTS) = 128
> 
>     /* we can handle only 128 events at once, to be sure
>      * that nothing is pended the io_getevents(MAX_EVENTS)
>      * call must be invoked once more or hang will happen. */
> 
> To prevent the hang or reiteration of io_getevents() call this patch
> restricts the number of in-flights, which is limited to MAX_EVENTS.
> 
> Signed-off-by: Roman Pen <roman.penyaev@profitbricks.com>
> Cc: Stefan Hajnoczi <stefanha@redhat.com>
> Cc: Paolo Bonzini <pbonzini@redhat.com>
> Cc: qemu-devel@nongnu.org
> ---
>  block/linux-aio.c | 25 +++++++++++++++----------
>  1 file changed, 15 insertions(+), 10 deletions(-)
> 
> diff --git a/block/linux-aio.c b/block/linux-aio.c
> index e468960..4a430a1 100644
> --- a/block/linux-aio.c
> +++ b/block/linux-aio.c
> @@ -28,8 +28,6 @@
>   */
>  #define MAX_EVENTS 128
>  
> -#define MAX_QUEUED_IO  128
> -
>  struct qemu_laiocb {
>      BlockAIOCB common;
>      Coroutine *co;
> @@ -44,7 +42,8 @@ struct qemu_laiocb {
>  
>  typedef struct {
>      int plugged;
> -    unsigned int n;
> +    unsigned int in_queue;
> +    unsigned int in_flight;
>      bool blocked;
>      QSIMPLEQ_HEAD(, qemu_laiocb) pending;
>  } LaioQueue;
> @@ -129,6 +128,7 @@ static void qemu_laio_completion_bh(void *opaque)
>              s->event_max = 0;
>              return; /* no more events */
>          }
> +        s->io_q.in_flight -= s->event_max;
>      }
>  
>      /* Reschedule so nested event loops see currently pending completions */
> @@ -190,7 +190,8 @@ static void ioq_init(LaioQueue *io_q)
>  {
>      QSIMPLEQ_INIT(&io_q->pending);
>      io_q->plugged = 0;
> -    io_q->n = 0;
> +    io_q->in_queue = 0;
> +    io_q->in_flight = 0;
>      io_q->blocked = false;
>  }
>  
> @@ -198,14 +199,16 @@ static void ioq_submit(LinuxAioState *s)
>  {
>      int ret, len;
>      struct qemu_laiocb *aiocb;
> -    struct iocb *iocbs[MAX_QUEUED_IO];
> +    struct iocb *iocbs[MAX_EVENTS];
>      QSIMPLEQ_HEAD(, qemu_laiocb) completed;
>  
>      do {
>          len = 0;
> +        if (s->io_q.in_flight >= MAX_EVENTS)
> +            break;

QEMU coding style forces braces for single line code blocks. Please check with
./scripts/checkpatch.pl.

>          QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
>              iocbs[len++] = &aiocb->iocb;
> -            if (len == MAX_QUEUED_IO) {
> +            if (s->io_q.in_flight + len >= MAX_EVENTS) {
>                  break;
>              }
>          }
> @@ -218,11 +221,12 @@ static void ioq_submit(LinuxAioState *s)
>              abort();
>          }
>  
> -        s->io_q.n -= ret;
> +        s->io_q.in_flight += ret;
> +        s->io_q.in_queue  -= ret;
>          aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
>          QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
>      } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
> -    s->io_q.blocked = (s->io_q.n > 0);
> +    s->io_q.blocked = (s->io_q.in_queue > 0);
>  }
>  
>  void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
> @@ -263,9 +267,10 @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
>      io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
>  
>      QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
> -    s->io_q.n++;
> +    s->io_q.in_queue++;
>      if (!s->io_q.blocked &&
> -        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
> +        (!s->io_q.plugged ||
> +         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
>          ioq_submit(s);
>      }
>  
> -- 
> 2.8.2
> 
> 

Apart from above two minor comments,

Reviewed-by: Fam Zheng <famz@redhat.com>
Paolo Bonzini July 13, 2016, 7:43 a.m. UTC | #2
On 12/07/2016 19:51, Roman Pen wrote:
> +        if (s->io_q.in_flight >= MAX_EVENTS)
> +            break;
>          QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
>              iocbs[len++] = &aiocb->iocb;
> -            if (len == MAX_QUEUED_IO) {
> +            if (s->io_q.in_flight + len >= MAX_EVENTS) {
>                  break;
>              }

More easily written like this:

        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            if (s->io_q.in_flight + len >= MAX_EVENTS) {
                break;
            }
            iocbs[len++] = &aiocb->iocb;
        }

so that the early "if" is not necessary.  Also because you forgot the
braces around it. :)

Paolo
Roman Pen July 13, 2016, 7:50 a.m. UTC | #3
On Wed, Jul 13, 2016 at 9:43 AM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>
>
> On 12/07/2016 19:51, Roman Pen wrote:
>> +        if (s->io_q.in_flight >= MAX_EVENTS)
>> +            break;
>>          QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
>>              iocbs[len++] = &aiocb->iocb;
>> -            if (len == MAX_QUEUED_IO) {
>> +            if (s->io_q.in_flight + len >= MAX_EVENTS) {
>>                  break;
>>              }
>
> More easily written like this:
>
>         QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
>             if (s->io_q.in_flight + len >= MAX_EVENTS) {
>                 break;
>             }
>             iocbs[len++] = &aiocb->iocb;
>         }
>
> so that the early "if" is not necessary.  Also because you forgot the
> braces around it. :)

No, we will jump out of the QSIMPLEQ_FOREACH loop and will then
invoke io_submit() with len == 0.

--
Roman
diff mbox

Patch

diff --git a/block/linux-aio.c b/block/linux-aio.c
index e468960..4a430a1 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -28,8 +28,6 @@ 
  */
 #define MAX_EVENTS 128
 
-#define MAX_QUEUED_IO  128
-
 struct qemu_laiocb {
     BlockAIOCB common;
     Coroutine *co;
@@ -44,7 +42,8 @@  struct qemu_laiocb {
 
 typedef struct {
     int plugged;
-    unsigned int n;
+    unsigned int in_queue;
+    unsigned int in_flight;
     bool blocked;
     QSIMPLEQ_HEAD(, qemu_laiocb) pending;
 } LaioQueue;
@@ -129,6 +128,7 @@  static void qemu_laio_completion_bh(void *opaque)
             s->event_max = 0;
             return; /* no more events */
         }
+        s->io_q.in_flight -= s->event_max;
     }
 
     /* Reschedule so nested event loops see currently pending completions */
@@ -190,7 +190,8 @@  static void ioq_init(LaioQueue *io_q)
 {
     QSIMPLEQ_INIT(&io_q->pending);
     io_q->plugged = 0;
-    io_q->n = 0;
+    io_q->in_queue = 0;
+    io_q->in_flight = 0;
     io_q->blocked = false;
 }
 
@@ -198,14 +199,16 @@  static void ioq_submit(LinuxAioState *s)
 {
     int ret, len;
     struct qemu_laiocb *aiocb;
-    struct iocb *iocbs[MAX_QUEUED_IO];
+    struct iocb *iocbs[MAX_EVENTS];
     QSIMPLEQ_HEAD(, qemu_laiocb) completed;
 
     do {
         len = 0;
+        if (s->io_q.in_flight >= MAX_EVENTS)
+            break;
         QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
             iocbs[len++] = &aiocb->iocb;
-            if (len == MAX_QUEUED_IO) {
+            if (s->io_q.in_flight + len >= MAX_EVENTS) {
                 break;
             }
         }
@@ -218,11 +221,12 @@  static void ioq_submit(LinuxAioState *s)
             abort();
         }
 
-        s->io_q.n -= ret;
+        s->io_q.in_flight += ret;
+        s->io_q.in_queue  -= ret;
         aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
         QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
     } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
-    s->io_q.blocked = (s->io_q.n > 0);
+    s->io_q.blocked = (s->io_q.in_queue > 0);
 }
 
 void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
@@ -263,9 +267,10 @@  static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
     io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
 
     QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
-    s->io_q.n++;
+    s->io_q.in_queue++;
     if (!s->io_q.blocked &&
-        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
+        (!s->io_q.plugged ||
+         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
         ioq_submit(s);
     }