@@ -399,17 +399,25 @@ void migrate_compress_threads_create(void)
/* Multiple fd's */
struct MultiFDSendParams {
+ /* not changed */
QemuThread thread;
+ QIOChannel *c;
QemuCond cond;
QemuMutex mutex;
+ /* protected by param mutex */
bool quit;
bool started;
- QIOChannel *c;
+ uint8_t *address;
+ /* protected by multifd mutex */
+ bool done;
};
typedef struct MultiFDSendParams MultiFDSendParams;
static MultiFDSendParams *multifd_send;
+QemuMutex multifd_send_mutex;
+QemuCond multifd_send_cond;
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *params = opaque;
@@ -423,7 +431,17 @@ static void *multifd_send_thread(void *opaque)
qemu_mutex_lock(¶ms->mutex);
while (!params->quit){
- qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ if (params->address) {
+ params->address = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_mutex_lock(&multifd_send_mutex);
+ params->done = true;
+ qemu_cond_signal(&multifd_send_cond);
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_mutex_lock(¶ms->mutex);
+ } else {
+ qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ }
}
qemu_mutex_unlock(¶ms->mutex);
@@ -471,11 +489,15 @@ void migrate_multifd_send_threads_create(void)
}
thread_count = migrate_multifd_threads();
multifd_send = g_new0(MultiFDSendParams, thread_count);
+ qemu_mutex_init(&multifd_send_mutex);
+ qemu_cond_init(&multifd_send_cond);
for (i = 0; i < thread_count; i++) {
qemu_mutex_init(&multifd_send[i].mutex);
qemu_cond_init(&multifd_send[i].cond);
multifd_send[i].quit = false;
multifd_send[i].started = false;
+ multifd_send[i].done = true;
+ multifd_send[i].address = 0;
multifd_send[i].c = socket_send_channel_create();
if(!multifd_send[i].c) {
printf("Error creating a send channel");
@@ -492,6 +514,34 @@ void migrate_multifd_send_threads_create(void)
}
}
+static int multifd_send_page(uint8_t *address)
+{
+ int i, thread_count;
+ bool found = false;
+
+ thread_count = migrate_multifd_threads();
+ qemu_mutex_lock(&multifd_send_mutex);
+ while (!found) {
+ for (i = 0; i < thread_count; i++) {
+ if (multifd_send[i].done) {
+ multifd_send[i].done = false;
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+ }
+ }
+ qemu_mutex_unlock(&multifd_send_mutex);
+ qemu_mutex_lock(&multifd_send[i].mutex);
+ multifd_send[i].address = address;
+ qemu_cond_signal(&multifd_send[i].cond);
+ qemu_mutex_unlock(&multifd_send[i].mutex);
+
+ return 0;
+}
+
struct MultiFDRecvParams {
QemuThread thread;
QemuCond cond;
@@ -1020,6 +1070,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+ multifd_send_page(p);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
We make the locking and the transfer of information specific, even if we are still transmiting things through the main thread. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-)