Message ID | 20240131103111.306523-7-peterx@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration/multifd: Refactor ->send_prepare() and cleanups | expand |
peterx@redhat.com writes: > From: Peter Xu <peterx@redhat.com> > > Multifd provide a threaded model for processing jobs. On sender side, > there can be two kinds of job: (1) a list of pages to send, or (2) a sync > request. > > The sync request is a very special kind of job. It never contains a page > array, but only a multifd packet telling the dest side to synchronize with > sent pages. > > Before this patch, both requests use the pending_job field, no matter what > the request is, it will boost pending_job, while multifd sender thread will > decrement it after it finishes one job. > > However this should be racy, because SYNC is special in that it needs to > set p->flags with MULTIFD_FLAG_SYNC, showing that this is a sync request. > Consider a sequence of operations where: > > - migration thread enqueue a job to send some pages, pending_job++ (0->1) > > - [...before the selected multifd sender thread wakes up...] > > - migration thread enqueue another job to sync, pending_job++ (1->2), > setup p->flags=MULTIFD_FLAG_SYNC > > - multifd sender thread wakes up, found pending_job==2 > - send the 1st packet with MULTIFD_FLAG_SYNC and list of pages > - send the 2nd packet with flags==0 and no pages > > This is not expected, because MULTIFD_FLAG_SYNC should hopefully be done > after all the pages are received. Meanwhile, the 2nd packet will be > completely useless, which contains zero information. > > I didn't verify above, but I think this issue is still benign in that at > least on the recv side we always receive pages before handling > MULTIFD_FLAG_SYNC. However that's not always guaranteed and just tricky. > > One other reason I want to separate it is using p->flags to communicate > between the two threads is also not clearly defined, it's very hard to read > and understand why accessing p->flags is always safe; see the current impl > of multifd_send_thread() where we tried to cache only p->flags. It doesn't > need to be that complicated. > > This patch introduces pending_sync, a separate flag just to show that the > requester needs a sync. Alongside, we remove the tricky caching of > p->flags now because after this patch p->flags should only be used by > multifd sender thread now, which will be crystal clear. So it is always > thread safe to access p->flags. > > With that, we can also safely convert the pending_job into a boolean, > because we don't support >1 pending jobs anyway. > > Signed-off-by: Peter Xu <peterx@redhat.com> Reviewed-by: Fabiano Rosas <farosas@suse.de>
diff --git a/migration/multifd.h b/migration/multifd.h index 3920bdbcf1..08f26ef3fe 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -99,8 +99,17 @@ typedef struct { uint32_t flags; /* global number of generated multifd packets */ uint64_t packet_num; - /* thread has work to do */ - int pending_job; + /* + * The sender thread has work to do if either of below boolean is set. + * + * @pending_job: a job is pending + * @pending_sync: a sync request is pending + * + * For both of these fields, they're only set by the requesters, and + * cleared by the multifd sender threads. + */ + bool pending_job; + bool pending_sync; /* array of pages to sent. * The owner of 'pages' depends of 'pending_job' value: * pending_job == 0 -> migration_thread can use it. diff --git a/migration/multifd.c b/migration/multifd.c index 8bb1fd95cf..6a4863edd2 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -443,7 +443,7 @@ static int multifd_send_pages(void) p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); if (!p->pending_job) { - p->pending_job++; + p->pending_job = true; next_channel = (i + 1) % migrate_multifd_channels(); break; } @@ -631,8 +631,7 @@ int multifd_send_sync_main(void) qemu_mutex_lock(&p->mutex); p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; + p->pending_sync = true; qemu_mutex_unlock(&p->mutex); qemu_sem_post(&p->sem); } @@ -688,7 +687,6 @@ static void *multifd_send_thread(void *opaque) if (p->pending_job) { uint64_t packet_num = p->packet_num; MultiFDPages_t *pages = p->pages; - uint32_t flags; if (use_zero_copy_send) { p->iovs_num = 0; @@ -704,13 +702,11 @@ static void *multifd_send_thread(void *opaque) } } multifd_send_fill_packet(p); - flags = p->flags; - p->flags = 0; p->num_packets++; p->total_normal_pages += pages->num; qemu_mutex_unlock(&p->mutex); - trace_multifd_send(p->id, packet_num, pages->num, flags, + trace_multifd_send(p->id, packet_num, pages->num, p->flags, p->next_packet_size); if (use_zero_copy_send) { @@ -738,12 +734,23 @@ static void *multifd_send_thread(void *opaque) multifd_pages_reset(p->pages); p->next_packet_size = 0; qemu_mutex_lock(&p->mutex); - p->pending_job--; + p->pending_job = false; qemu_mutex_unlock(&p->mutex); - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); + } else if (p->pending_sync) { + p->flags = MULTIFD_FLAG_SYNC; + multifd_send_fill_packet(p); + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + qemu_mutex_unlock(&p->mutex); + break; } + /* p->next_packet_size will always be zero for a SYNC packet */ + stat64_add(&mig_stats.multifd_bytes, p->packet_len); + p->flags = 0; + p->pending_sync = false; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem_sync); } else { qemu_mutex_unlock(&p->mutex); /* sometimes there are spurious wakeups */