@@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
/* Multiple fd's */
struct MultiFDSendParams {
+ /* not changed */
QemuThread thread;
QIOChannel *c;
QemuSemaphore sem;
QemuSemaphore init;
QemuMutex mutex;
+ /* protected by param mutex */
bool quit;
+ uint8_t *address;
+ /* protected by multifd mutex */
+ bool done;
};
typedef struct MultiFDSendParams MultiFDSendParams;
static MultiFDSendParams *multifd_send;
+QemuMutex multifd_send_mutex;
+QemuSemaphore multifd_send_sem;
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *params = opaque;
@@ -410,6 +418,7 @@ static void *multifd_send_thread(void *opaque)
qio_channel_write(params->c, &start, 1, &error_abort);
qemu_sem_post(¶ms->init);
+ qemu_sem_post(&multifd_send_sem);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -417,6 +426,15 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_unlock(¶ms->mutex);
break;
}
+ if (params->address) {
+ params->address = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_mutex_lock(&multifd_send_mutex);
+ params->done = true;
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_sem_post(&multifd_send_sem);
+ continue;
+ }
qemu_mutex_unlock(¶ms->mutex);
qemu_sem_wait(¶ms->sem);
}
@@ -471,6 +489,8 @@ void migrate_multifd_send_threads_create(void)
}
thread_count = migrate_multifd_threads();
multifd_send = g_new0(MultiFDSendParams, thread_count);
+ qemu_mutex_init(&multifd_send_mutex);
+ qemu_sem_init(&multifd_send_sem, 0);
for (i = 0; i < thread_count; i++) {
char thread_name[15];
MultiFDSendParams *p = &multifd_send[i];
@@ -479,6 +499,8 @@ void migrate_multifd_send_threads_create(void)
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->init, 0);
p->quit = false;
+ p->done = true;
+ p->address = 0;
p->c = socket_send_channel_create();
if (!p->c) {
error_report("Error creating a send channel");
@@ -491,6 +513,28 @@ void migrate_multifd_send_threads_create(void)
}
}
+static int multifd_send_page(uint8_t *address)
+{
+ int i, thread_count;
+
+ thread_count = migrate_multifd_threads();
+ qemu_sem_wait(&multifd_send_sem);
+ qemu_mutex_lock(&multifd_send_mutex);
+ for (i = 0; i < thread_count; i++) {
+ if (multifd_send[i].done) {
+ multifd_send[i].done = false;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_mutex_lock(&multifd_send[i].mutex);
+ multifd_send[i].address = address;
+ qemu_mutex_unlock(&multifd_send[i].mutex);
+ qemu_sem_post(&multifd_send[i].sem);
+
+ return 0;
+}
+
struct MultiFDRecvParams {
QemuThread thread;
QIOChannel *c;
@@ -1023,6 +1067,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+ multifd_send_page(p);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
We make the locking and the transfer of information specific, even if we are still transmiting things through the main thread. Signed-off-by: Juan Quintela <quintela@redhat.com> -- Move synchronization to use semaphores, as paolo suggestion. --- migration/ram.c | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+)