diff mbox series

[v2,22/23] migration/multifd: Fix MultiFDSendParams.packet_num race

Message ID 20240202102857.110210-23-peterx@redhat.com (mailing list archive)
State New, archived
Headers show
Series migration/multifd: Refactor ->send_prepare() and cleanups | expand

Commit Message

Peter Xu Feb. 2, 2024, 10:28 a.m. UTC
From: Peter Xu <peterx@redhat.com>

As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
to be assigned and stored.  Consider two consequent operations of: (1)
queue a job into multifd send thread X, then (2) queue another sync request
to the same send thread X.  Then the MultiFDSendParams.packet_num will be
assigned twice, and the first assignment can get lost already.

To avoid that, we move the packet_num assignment from p->packet_num into
where the thread will fill in the packet.  Use atomic operations to protect
the field, making sure there's no race.

Note that atomic fetch_add() may not be good for scaling purposes, however
multifd should be fine as number of threads should normally not go beyond
16 threads.  Let's leave that concern for later but fix the issue first.

There's also a trick on how to make it always work even on 32 bit hosts for
uint64_t packet number.  Switching to uintptr_t as of now to simply the
case.  It will cause packet number to overflow easier on 32 bit, but that
shouldn't be a major concern for now as 32 bit systems is not the major
audience for any performance concerns like what multifd wants to address.

We also need to move multifd_send_state definition upper, so that
multifd_send_fill_packet() can reference it.

[1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de

Reported-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.h |  2 --
 migration/multifd.c | 56 +++++++++++++++++++++++++++------------------
 2 files changed, 34 insertions(+), 24 deletions(-)

Comments

Fabiano Rosas Feb. 2, 2024, 9:08 p.m. UTC | #1
peterx@redhat.com writes:

> From: Peter Xu <peterx@redhat.com>
>
> As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
> to be assigned and stored.  Consider two consequent operations of: (1)
> queue a job into multifd send thread X, then (2) queue another sync request
> to the same send thread X.  Then the MultiFDSendParams.packet_num will be
> assigned twice, and the first assignment can get lost already.
>
> To avoid that, we move the packet_num assignment from p->packet_num into
> where the thread will fill in the packet.  Use atomic operations to protect
> the field, making sure there's no race.
>
> Note that atomic fetch_add() may not be good for scaling purposes, however
> multifd should be fine as number of threads should normally not go beyond
> 16 threads.  Let's leave that concern for later but fix the issue first.
>
> There's also a trick on how to make it always work even on 32 bit hosts for
> uint64_t packet number.  Switching to uintptr_t as of now to simply the
> case.  It will cause packet number to overflow easier on 32 bit, but that
> shouldn't be a major concern for now as 32 bit systems is not the major
> audience for any performance concerns like what multifd wants to address.
>
> We also need to move multifd_send_state definition upper, so that
> multifd_send_fill_packet() can reference it.
>
> [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
>
> Reported-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>

Elena had reported this in October already.

Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Reviewed-by: Fabiano Rosas <farosas@suse.de>

> ---
>  migration/multifd.h |  2 --
>  migration/multifd.c | 56 +++++++++++++++++++++++++++------------------
>  2 files changed, 34 insertions(+), 24 deletions(-)
>
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 9b40a53cb6..98876ff94a 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -97,8 +97,6 @@ typedef struct {
>      bool running;
>      /* multifd flags for each packet */
>      uint32_t flags;
> -    /* global number of generated multifd packets */
> -    uint64_t packet_num;
>      /*
>       * The sender thread has work to do if either of below boolean is set.
>       *
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 130f86a1fb..b317d57d61 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -45,6 +45,35 @@ typedef struct {
>      uint64_t unused2[4];    /* Reserved for future use */
>  } __attribute__((packed)) MultiFDInit_t;
>  
> +struct {
> +    MultiFDSendParams *params;
> +    /* array of pages to sent */
> +    MultiFDPages_t *pages;
> +    /*
> +     * Global number of generated multifd packets.
> +     *
> +     * Note that we used 'uintptr_t' because it'll naturally support atomic
> +     * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
> +     * multifd will overflow the packet_num easier, but that should be
> +     * fine.
> +     *
> +     * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
> +     * hosts, however so far it does not support atomic fetch_add() yet.
> +     * Make it easy for now.
> +     */
> +    uintptr_t packet_num;
> +    /* send channels ready */
> +    QemuSemaphore channels_ready;
> +    /*
> +     * Have we already run terminate threads.  There is a race when it
> +     * happens that we got one error while we are exiting.
> +     * We will use atomic operations.  Only valid values are 0 and 1.
> +     */
> +    int exiting;
> +    /* multifd ops */
> +    MultiFDMethods *ops;
> +} *multifd_send_state;
> +
>  /* Multifd without compression */
>  
>  /**
> @@ -292,13 +321,16 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>  {
>      MultiFDPacket_t *packet = p->packet;
>      MultiFDPages_t *pages = p->pages;
> +    uint64_t packet_num;
>      int i;
>  
>      packet->flags = cpu_to_be32(p->flags);
>      packet->pages_alloc = cpu_to_be32(p->pages->allocated);
>      packet->normal_pages = cpu_to_be32(pages->num);
>      packet->next_packet_size = cpu_to_be32(p->next_packet_size);
> -    packet->packet_num = cpu_to_be64(p->packet_num);
> +
> +    packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
> +    packet->packet_num = cpu_to_be64(packet_num);
>  
>      if (pages->block) {
>          strncpy(packet->ramblock, pages->block->idstr, 256);
> @@ -314,7 +346,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>      p->packets_sent++;
>      p->total_normal_pages += pages->num;
>  
> -    trace_multifd_send(p->id, p->packet_num, pages->num, p->flags,
> +    trace_multifd_send(p->id, packet_num, pages->num, p->flags,
>                         p->next_packet_size);
>  }
>  
> @@ -398,24 +430,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>      return 0;
>  }
>  
> -struct {
> -    MultiFDSendParams *params;
> -    /* array of pages to sent */
> -    MultiFDPages_t *pages;
> -    /* global number of generated multifd packets */
> -    uint64_t packet_num;
> -    /* send channels ready */
> -    QemuSemaphore channels_ready;
> -    /*
> -     * Have we already run terminate threads.  There is a race when it
> -     * happens that we got one error while we are exiting.
> -     * We will use atomic operations.  Only valid values are 0 and 1.
> -     */
> -    int exiting;
> -    /* multifd ops */
> -    MultiFDMethods *ops;
> -} *multifd_send_state;
> -
>  static bool multifd_send_should_exit(void)
>  {
>      return qatomic_read(&multifd_send_state->exiting);
> @@ -497,7 +511,6 @@ static bool multifd_send_pages(void)
>       */
>      assert(qatomic_read(&p->pending_job) == false);
>      qatomic_set(&p->pending_job, true);
> -    p->packet_num = multifd_send_state->packet_num++;
>      multifd_send_state->pages = p->pages;
>      p->pages = pages;
>      qemu_mutex_unlock(&p->mutex);
> @@ -730,7 +743,6 @@ int multifd_send_sync_main(void)
>          trace_multifd_send_sync_main_signal(p->id);
>  
>          qemu_mutex_lock(&p->mutex);
> -        p->packet_num = multifd_send_state->packet_num++;
>          /*
>           * We should be the only user so far, so not possible to be set by
>           * others concurrently.
Peter Xu Feb. 5, 2024, 4:05 a.m. UTC | #2
On Fri, Feb 02, 2024 at 06:08:22PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
> 
> > From: Peter Xu <peterx@redhat.com>
> >
> > As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy
> > to be assigned and stored.  Consider two consequent operations of: (1)
> > queue a job into multifd send thread X, then (2) queue another sync request
> > to the same send thread X.  Then the MultiFDSendParams.packet_num will be
> > assigned twice, and the first assignment can get lost already.
> >
> > To avoid that, we move the packet_num assignment from p->packet_num into
> > where the thread will fill in the packet.  Use atomic operations to protect
> > the field, making sure there's no race.
> >
> > Note that atomic fetch_add() may not be good for scaling purposes, however
> > multifd should be fine as number of threads should normally not go beyond
> > 16 threads.  Let's leave that concern for later but fix the issue first.
> >
> > There's also a trick on how to make it always work even on 32 bit hosts for
> > uint64_t packet number.  Switching to uintptr_t as of now to simply the
> > case.  It will cause packet number to overflow easier on 32 bit, but that
> > shouldn't be a major concern for now as 32 bit systems is not the major
> > audience for any performance concerns like what multifd wants to address.
> >
> > We also need to move multifd_send_state definition upper, so that
> > multifd_send_fill_packet() can reference it.
> >
> > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de
> >
> > Reported-by: Fabiano Rosas <farosas@suse.de>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
> 
> Elena had reported this in October already.
> 
> Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>

Ah, I'll do the replacement.

> Reviewed-by: Fabiano Rosas <farosas@suse.de>

Thanks,
diff mbox series

Patch

diff --git a/migration/multifd.h b/migration/multifd.h
index 9b40a53cb6..98876ff94a 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -97,8 +97,6 @@  typedef struct {
     bool running;
     /* multifd flags for each packet */
     uint32_t flags;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
     /*
      * The sender thread has work to do if either of below boolean is set.
      *
diff --git a/migration/multifd.c b/migration/multifd.c
index 130f86a1fb..b317d57d61 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -45,6 +45,35 @@  typedef struct {
     uint64_t unused2[4];    /* Reserved for future use */
 } __attribute__((packed)) MultiFDInit_t;
 
+struct {
+    MultiFDSendParams *params;
+    /* array of pages to sent */
+    MultiFDPages_t *pages;
+    /*
+     * Global number of generated multifd packets.
+     *
+     * Note that we used 'uintptr_t' because it'll naturally support atomic
+     * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
+     * multifd will overflow the packet_num easier, but that should be
+     * fine.
+     *
+     * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
+     * hosts, however so far it does not support atomic fetch_add() yet.
+     * Make it easy for now.
+     */
+    uintptr_t packet_num;
+    /* send channels ready */
+    QemuSemaphore channels_ready;
+    /*
+     * Have we already run terminate threads.  There is a race when it
+     * happens that we got one error while we are exiting.
+     * We will use atomic operations.  Only valid values are 0 and 1.
+     */
+    int exiting;
+    /* multifd ops */
+    MultiFDMethods *ops;
+} *multifd_send_state;
+
 /* Multifd without compression */
 
 /**
@@ -292,13 +321,16 @@  void multifd_send_fill_packet(MultiFDSendParams *p)
 {
     MultiFDPacket_t *packet = p->packet;
     MultiFDPages_t *pages = p->pages;
+    uint64_t packet_num;
     int i;
 
     packet->flags = cpu_to_be32(p->flags);
     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
     packet->normal_pages = cpu_to_be32(pages->num);
     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
-    packet->packet_num = cpu_to_be64(p->packet_num);
+
+    packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
+    packet->packet_num = cpu_to_be64(packet_num);
 
     if (pages->block) {
         strncpy(packet->ramblock, pages->block->idstr, 256);
@@ -314,7 +346,7 @@  void multifd_send_fill_packet(MultiFDSendParams *p)
     p->packets_sent++;
     p->total_normal_pages += pages->num;
 
-    trace_multifd_send(p->id, p->packet_num, pages->num, p->flags,
+    trace_multifd_send(p->id, packet_num, pages->num, p->flags,
                        p->next_packet_size);
 }
 
@@ -398,24 +430,6 @@  static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     return 0;
 }
 
-struct {
-    MultiFDSendParams *params;
-    /* array of pages to sent */
-    MultiFDPages_t *pages;
-    /* global number of generated multifd packets */
-    uint64_t packet_num;
-    /* send channels ready */
-    QemuSemaphore channels_ready;
-    /*
-     * Have we already run terminate threads.  There is a race when it
-     * happens that we got one error while we are exiting.
-     * We will use atomic operations.  Only valid values are 0 and 1.
-     */
-    int exiting;
-    /* multifd ops */
-    MultiFDMethods *ops;
-} *multifd_send_state;
-
 static bool multifd_send_should_exit(void)
 {
     return qatomic_read(&multifd_send_state->exiting);
@@ -497,7 +511,6 @@  static bool multifd_send_pages(void)
      */
     assert(qatomic_read(&p->pending_job) == false);
     qatomic_set(&p->pending_job, true);
-    p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
     qemu_mutex_unlock(&p->mutex);
@@ -730,7 +743,6 @@  int multifd_send_sync_main(void)
         trace_multifd_send_sync_main_signal(p->id);
 
         qemu_mutex_lock(&p->mutex);
-        p->packet_num = multifd_send_state->packet_num++;
         /*
          * We should be the only user so far, so not possible to be set by
          * others concurrently.