@@ -99,8 +99,17 @@ typedef struct {
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
- /* thread has work to do */
- int pending_job;
+ /*
+ * The sender thread has work to do if either of below boolean is set.
+ *
+ * @pending_job: a job is pending
+ * @pending_sync: a sync request is pending
+ *
+ * For both of these fields, they're only set by the requesters, and
+ * cleared by the multifd sender threads.
+ */
+ bool pending_job;
+ bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
@@ -442,8 +442,8 @@ static int multifd_send_pages(void)
}
p = &multifd_send_state->params[i];
qemu_mutex_lock(&p->mutex);
- if (!p->pending_job) {
- p->pending_job++;
+ if (qatomic_read(&p->pending_job) == false) {
+ qatomic_set(&p->pending_job, true);
next_channel = (i + 1) % migrate_multifd_channels();
break;
}
@@ -631,8 +631,12 @@ int multifd_send_sync_main(void)
qemu_mutex_lock(&p->mutex);
p->packet_num = multifd_send_state->packet_num++;
- p->flags |= MULTIFD_FLAG_SYNC;
- p->pending_job++;
+ /*
+ * We should be the only user so far, so not possible to be set by
+ * others concurrently.
+ */
+ assert(qatomic_read(&p->pending_sync) == false);
+ qatomic_set(&p->pending_sync, true);
qemu_mutex_unlock(&p->mutex);
qemu_sem_post(&p->sem);
}
@@ -685,10 +689,9 @@ static void *multifd_send_thread(void *opaque)
}
qemu_mutex_lock(&p->mutex);
- if (p->pending_job) {
+ if (qatomic_read(&p->pending_job)) {
uint64_t packet_num = p->packet_num;
MultiFDPages_t *pages = p->pages;
- uint32_t flags;
if (use_zero_copy_send) {
p->iovs_num = 0;
@@ -704,13 +707,11 @@ static void *multifd_send_thread(void *opaque)
}
}
multifd_send_fill_packet(p);
- flags = p->flags;
- p->flags = 0;
p->num_packets++;
p->total_normal_pages += pages->num;
qemu_mutex_unlock(&p->mutex);
- trace_multifd_send(p->id, packet_num, pages->num, flags,
+ trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
if (use_zero_copy_send) {
@@ -738,12 +739,23 @@ static void *multifd_send_thread(void *opaque)
multifd_pages_reset(p->pages);
p->next_packet_size = 0;
qemu_mutex_lock(&p->mutex);
- p->pending_job--;
+ qatomic_set(&p->pending_job, false);
qemu_mutex_unlock(&p->mutex);
-
- if (flags & MULTIFD_FLAG_SYNC) {
- qemu_sem_post(&p->sem_sync);
+ } else if (qatomic_read(&p->pending_sync)) {
+ p->flags = MULTIFD_FLAG_SYNC;
+ multifd_send_fill_packet(p);
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
}
+ /* p->next_packet_size will always be zero for a SYNC packet */
+ stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+ p->flags = 0;
+ qatomic_set(&p->pending_sync, false);
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_post(&p->sem_sync);
} else {
qemu_mutex_unlock(&p->mutex);
/* sometimes there are spurious wakeups */
@@ -907,7 +919,6 @@ int multifd_save_setup(Error **errp)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->sem_sync, 0);
- p->pending_job = 0;
p->id = i;
p->pages = multifd_pages_init(page_count);
p->packet_len = sizeof(MultiFDPacket_t)