Message ID | 20240202102857.110210-23-peterx@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration/multifd: Refactor ->send_prepare() and cleanups | expand |
peterx@redhat.com writes: > From: Peter Xu <peterx@redhat.com> > > As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy > to be assigned and stored. Consider two consequent operations of: (1) > queue a job into multifd send thread X, then (2) queue another sync request > to the same send thread X. Then the MultiFDSendParams.packet_num will be > assigned twice, and the first assignment can get lost already. > > To avoid that, we move the packet_num assignment from p->packet_num into > where the thread will fill in the packet. Use atomic operations to protect > the field, making sure there's no race. > > Note that atomic fetch_add() may not be good for scaling purposes, however > multifd should be fine as number of threads should normally not go beyond > 16 threads. Let's leave that concern for later but fix the issue first. > > There's also a trick on how to make it always work even on 32 bit hosts for > uint64_t packet number. Switching to uintptr_t as of now to simply the > case. It will cause packet number to overflow easier on 32 bit, but that > shouldn't be a major concern for now as 32 bit systems is not the major > audience for any performance concerns like what multifd wants to address. > > We also need to move multifd_send_state definition upper, so that > multifd_send_fill_packet() can reference it. > > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de > > Reported-by: Fabiano Rosas <farosas@suse.de> > Signed-off-by: Peter Xu <peterx@redhat.com> Elena had reported this in October already. Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> Reviewed-by: Fabiano Rosas <farosas@suse.de> > --- > migration/multifd.h | 2 -- > migration/multifd.c | 56 +++++++++++++++++++++++++++------------------ > 2 files changed, 34 insertions(+), 24 deletions(-) > > diff --git a/migration/multifd.h b/migration/multifd.h > index 9b40a53cb6..98876ff94a 100644 > --- a/migration/multifd.h > +++ b/migration/multifd.h > @@ -97,8 +97,6 @@ typedef struct { > bool running; > /* multifd flags for each packet */ > uint32_t flags; > - /* global number of generated multifd packets */ > - uint64_t packet_num; > /* > * The sender thread has work to do if either of below boolean is set. > * > diff --git a/migration/multifd.c b/migration/multifd.c > index 130f86a1fb..b317d57d61 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -45,6 +45,35 @@ typedef struct { > uint64_t unused2[4]; /* Reserved for future use */ > } __attribute__((packed)) MultiFDInit_t; > > +struct { > + MultiFDSendParams *params; > + /* array of pages to sent */ > + MultiFDPages_t *pages; > + /* > + * Global number of generated multifd packets. > + * > + * Note that we used 'uintptr_t' because it'll naturally support atomic > + * operations on both 32bit / 64 bits hosts. It means on 32bit systems > + * multifd will overflow the packet_num easier, but that should be > + * fine. > + * > + * Another option is to use QEMU's Stat64 then it'll be 64 bits on all > + * hosts, however so far it does not support atomic fetch_add() yet. > + * Make it easy for now. > + */ > + uintptr_t packet_num; > + /* send channels ready */ > + QemuSemaphore channels_ready; > + /* > + * Have we already run terminate threads. There is a race when it > + * happens that we got one error while we are exiting. > + * We will use atomic operations. Only valid values are 0 and 1. > + */ > + int exiting; > + /* multifd ops */ > + MultiFDMethods *ops; > +} *multifd_send_state; > + > /* Multifd without compression */ > > /** > @@ -292,13 +321,16 @@ void multifd_send_fill_packet(MultiFDSendParams *p) > { > MultiFDPacket_t *packet = p->packet; > MultiFDPages_t *pages = p->pages; > + uint64_t packet_num; > int i; > > packet->flags = cpu_to_be32(p->flags); > packet->pages_alloc = cpu_to_be32(p->pages->allocated); > packet->normal_pages = cpu_to_be32(pages->num); > packet->next_packet_size = cpu_to_be32(p->next_packet_size); > - packet->packet_num = cpu_to_be64(p->packet_num); > + > + packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); > + packet->packet_num = cpu_to_be64(packet_num); > > if (pages->block) { > strncpy(packet->ramblock, pages->block->idstr, 256); > @@ -314,7 +346,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p) > p->packets_sent++; > p->total_normal_pages += pages->num; > > - trace_multifd_send(p->id, p->packet_num, pages->num, p->flags, > + trace_multifd_send(p->id, packet_num, pages->num, p->flags, > p->next_packet_size); > } > > @@ -398,24 +430,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) > return 0; > } > > -struct { > - MultiFDSendParams *params; > - /* array of pages to sent */ > - MultiFDPages_t *pages; > - /* global number of generated multifd packets */ > - uint64_t packet_num; > - /* send channels ready */ > - QemuSemaphore channels_ready; > - /* > - * Have we already run terminate threads. There is a race when it > - * happens that we got one error while we are exiting. > - * We will use atomic operations. Only valid values are 0 and 1. > - */ > - int exiting; > - /* multifd ops */ > - MultiFDMethods *ops; > -} *multifd_send_state; > - > static bool multifd_send_should_exit(void) > { > return qatomic_read(&multifd_send_state->exiting); > @@ -497,7 +511,6 @@ static bool multifd_send_pages(void) > */ > assert(qatomic_read(&p->pending_job) == false); > qatomic_set(&p->pending_job, true); > - p->packet_num = multifd_send_state->packet_num++; > multifd_send_state->pages = p->pages; > p->pages = pages; > qemu_mutex_unlock(&p->mutex); > @@ -730,7 +743,6 @@ int multifd_send_sync_main(void) > trace_multifd_send_sync_main_signal(p->id); > > qemu_mutex_lock(&p->mutex); > - p->packet_num = multifd_send_state->packet_num++; > /* > * We should be the only user so far, so not possible to be set by > * others concurrently.
On Fri, Feb 02, 2024 at 06:08:22PM -0300, Fabiano Rosas wrote: > peterx@redhat.com writes: > > > From: Peter Xu <peterx@redhat.com> > > > > As reported correctly by Fabiano [1], MultiFDSendParams.packet_num is buggy > > to be assigned and stored. Consider two consequent operations of: (1) > > queue a job into multifd send thread X, then (2) queue another sync request > > to the same send thread X. Then the MultiFDSendParams.packet_num will be > > assigned twice, and the first assignment can get lost already. > > > > To avoid that, we move the packet_num assignment from p->packet_num into > > where the thread will fill in the packet. Use atomic operations to protect > > the field, making sure there's no race. > > > > Note that atomic fetch_add() may not be good for scaling purposes, however > > multifd should be fine as number of threads should normally not go beyond > > 16 threads. Let's leave that concern for later but fix the issue first. > > > > There's also a trick on how to make it always work even on 32 bit hosts for > > uint64_t packet number. Switching to uintptr_t as of now to simply the > > case. It will cause packet number to overflow easier on 32 bit, but that > > shouldn't be a major concern for now as 32 bit systems is not the major > > audience for any performance concerns like what multifd wants to address. > > > > We also need to move multifd_send_state definition upper, so that > > multifd_send_fill_packet() can reference it. > > > > [1] https://lore.kernel.org/r/87o7d1jlu5.fsf@suse.de > > > > Reported-by: Fabiano Rosas <farosas@suse.de> > > Signed-off-by: Peter Xu <peterx@redhat.com> > > Elena had reported this in October already. > > Reported-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> Ah, I'll do the replacement. > Reviewed-by: Fabiano Rosas <farosas@suse.de> Thanks,
diff --git a/migration/multifd.h b/migration/multifd.h index 9b40a53cb6..98876ff94a 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -97,8 +97,6 @@ typedef struct { bool running; /* multifd flags for each packet */ uint32_t flags; - /* global number of generated multifd packets */ - uint64_t packet_num; /* * The sender thread has work to do if either of below boolean is set. * diff --git a/migration/multifd.c b/migration/multifd.c index 130f86a1fb..b317d57d61 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -45,6 +45,35 @@ typedef struct { uint64_t unused2[4]; /* Reserved for future use */ } __attribute__((packed)) MultiFDInit_t; +struct { + MultiFDSendParams *params; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* + * Global number of generated multifd packets. + * + * Note that we used 'uintptr_t' because it'll naturally support atomic + * operations on both 32bit / 64 bits hosts. It means on 32bit systems + * multifd will overflow the packet_num easier, but that should be + * fine. + * + * Another option is to use QEMU's Stat64 then it'll be 64 bits on all + * hosts, however so far it does not support atomic fetch_add() yet. + * Make it easy for now. + */ + uintptr_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; + /* + * Have we already run terminate threads. There is a race when it + * happens that we got one error while we are exiting. + * We will use atomic operations. Only valid values are 0 and 1. + */ + int exiting; + /* multifd ops */ + MultiFDMethods *ops; +} *multifd_send_state; + /* Multifd without compression */ /** @@ -292,13 +321,16 @@ void multifd_send_fill_packet(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; MultiFDPages_t *pages = p->pages; + uint64_t packet_num; int i; packet->flags = cpu_to_be32(p->flags); packet->pages_alloc = cpu_to_be32(p->pages->allocated); packet->normal_pages = cpu_to_be32(pages->num); packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); + + packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); + packet->packet_num = cpu_to_be64(packet_num); if (pages->block) { strncpy(packet->ramblock, pages->block->idstr, 256); @@ -314,7 +346,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p) p->packets_sent++; p->total_normal_pages += pages->num; - trace_multifd_send(p->id, p->packet_num, pages->num, p->flags, + trace_multifd_send(p->id, packet_num, pages->num, p->flags, p->next_packet_size); } @@ -398,24 +430,6 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) return 0; } -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; - /* multifd ops */ - MultiFDMethods *ops; -} *multifd_send_state; - static bool multifd_send_should_exit(void) { return qatomic_read(&multifd_send_state->exiting); @@ -497,7 +511,6 @@ static bool multifd_send_pages(void) */ assert(qatomic_read(&p->pending_job) == false); qatomic_set(&p->pending_job, true); - p->packet_num = multifd_send_state->packet_num++; multifd_send_state->pages = p->pages; p->pages = pages; qemu_mutex_unlock(&p->mutex); @@ -730,7 +743,6 @@ int multifd_send_sync_main(void) trace_multifd_send_sync_main_signal(p->id); qemu_mutex_lock(&p->mutex); - p->packet_num = multifd_send_state->packet_num++; /* * We should be the only user so far, so not possible to be set by * others concurrently.