diff mbox

[08/13] migration: Create thread infrastructure for multifd send side

Message ID 1477078935-7182-9-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Oct. 21, 2016, 7:42 p.m. UTC
We make the locking and the transfer of information specific, even if we
are still transmiting things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 53 insertions(+), 2 deletions(-)
diff mbox

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 44b9380..0098d33 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -399,17 +399,25 @@  void migrate_compress_threads_create(void)
 /* Multiple fd's */

 struct MultiFDSendParams {
+    /* not changed */
     QemuThread thread;
+    QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
     bool started;
-    QIOChannel *c;
+    uint8_t *address;
+    /* protected by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

 static MultiFDSendParams *multifd_send;

+QemuMutex multifd_send_mutex;
+QemuCond multifd_send_cond;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
@@ -423,7 +431,17 @@  static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->address) {
+            params->address = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_send_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_send_cond);
+            qemu_mutex_unlock(&multifd_send_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -471,11 +489,15 @@  void migrate_multifd_send_threads_create(void)
     }
     thread_count = migrate_multifd_threads();
     multifd_send = g_new0(MultiFDSendParams, thread_count);
+    qemu_mutex_init(&multifd_send_mutex);
+    qemu_cond_init(&multifd_send_cond);
     for (i = 0; i < thread_count; i++) {
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
+        multifd_send[i].done = true;
+        multifd_send[i].address = 0;
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             printf("Error creating a send channel");
@@ -492,6 +514,34 @@  void migrate_multifd_send_threads_create(void)
     }
 }

+static int multifd_send_page(uint8_t *address)
+{
+    int i, thread_count;
+    bool found = false;
+
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_send_mutex);
+    while (!found) {
+        for (i = 0; i < thread_count; i++) {
+            if (multifd_send[i].done) {
+                multifd_send[i].done = false;
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_mutex);
+    qemu_mutex_lock(&multifd_send[i].mutex);
+    multifd_send[i].address = address;
+    qemu_cond_signal(&multifd_send[i].cond);
+    qemu_mutex_unlock(&multifd_send[i].mutex);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     QemuThread thread;
     QemuCond cond;
@@ -1020,6 +1070,7 @@  static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;