diff mbox

[06/13] migration: Start of multiple fd work

Message ID 1477078935-7182-7-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Oct. 21, 2016, 7:42 p.m. UTC
We create new channels for each new thread created. We only send through
them a character to be sure that we are creating the channels in the
right order.

Note: Reference count/freeing of channels is not done

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  6 +++++
 migration/ram.c               | 45 +++++++++++++++++++++++++++++++++++-
 migration/socket.c            | 53 ++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 102 insertions(+), 2 deletions(-)
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 80ab8c0..0b455d6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -21,6 +21,7 @@ 
 #include "migration/vmstate.h"
 #include "qapi-types.h"
 #include "exec/cpu-common.h"
+#include "io/channel.h"

 #define QEMU_VM_FILE_MAGIC           0x5145564d
 #define QEMU_VM_FILE_VERSION_COMPAT  0x00000002
@@ -211,6 +212,11 @@  void tcp_start_incoming_migration(const char *host_port, Error **errp);

 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);

+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
 void unix_start_incoming_migration(const char *path, Error **errp);

 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 78d400e..0ea40eb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -396,6 +396,8 @@  struct MultiFDSendParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
+    QIOChannel *c;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

@@ -404,6 +406,13 @@  static MultiFDSendParams *multifd_send;
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
+    char start = 's';
+
+    qio_channel_write(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
@@ -440,6 +449,7 @@  void migrate_multifd_send_threads_join(void)
         qemu_thread_join(&multifd_send[i].thread);
         qemu_mutex_destroy(&multifd_send[i].mutex);
         qemu_cond_destroy(&multifd_send[i].cond);
+        socket_send_channel_destroy(multifd_send[i].c);
     }
     g_free(multifd_send);
     multifd_send = NULL;
@@ -458,9 +468,20 @@  void migrate_multifd_send_threads_create(void)
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
+        multifd_send[i].started = false;
+        multifd_send[i].c = socket_send_channel_create();
+        if(!multifd_send[i].c) {
+            printf("Error creating a send channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_send[i].thread, "multifd_send",
                            multifd_send_thread, &multifd_send[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        while (!multifd_send[i].started) {
+            qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_send[i].mutex);
     }
 }

@@ -469,6 +490,8 @@  struct MultiFDRecvParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    bool started;
+    QIOChannel *c;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

@@ -477,7 +500,14 @@  static MultiFDRecvParams *multifd_recv;
 static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *params = opaque;
- 
+    char start;
+
+    qio_channel_read(params->c, &start, 1, &error_abort);
+    qemu_mutex_lock(&params->mutex);
+    params->started = true;
+    qemu_cond_signal(&params->cond);
+    qemu_mutex_unlock(&params->mutex);
+
     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
         qemu_cond_wait(&params->cond, &params->mutex);
@@ -513,6 +543,7 @@  void migrate_multifd_recv_threads_join(void)
         qemu_thread_join(&multifd_recv[i].thread);
         qemu_mutex_destroy(&multifd_recv[i].mutex);
         qemu_cond_destroy(&multifd_recv[i].cond);
+        socket_send_channel_destroy(multifd_recv[i].c);
     }
     g_free(multifd_recv);
     multifd_recv = NULL;
@@ -531,9 +562,21 @@  void migrate_multifd_recv_threads_create(void)
         qemu_mutex_init(&multifd_recv[i].mutex);
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
+        multifd_recv[i].started = false;
+        multifd_recv[i].c = socket_recv_channel_create();
+
+        if(!multifd_recv[i].c) {
+            printf("Error creating a recv channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
                            multifd_recv_thread, &multifd_recv[i],
                            QEMU_THREAD_JOINABLE);
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        while (!multifd_recv[i].started) {
+            qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
+        }
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
     }
 }

diff --git a/migration/socket.c b/migration/socket.c
index a21c0c5..f001396 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -24,6 +24,48 @@ 
 #include "io/channel-socket.h"
 #include "trace.h"

+struct SocketArgs {
+    QIOChannelSocket *ioc;
+    SocketAddress *saddr;
+    Error **errp;
+} socket_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+    QIOChannelSocket *sioc;
+    Error *err = NULL;
+
+    sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
+                                     &err);
+    if (!sioc) {
+        error_report("could not accept migration connection (%s)",
+                     error_get_pretty(err));
+        return NULL;
+    }
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+    // Remove channel here
+    return 0;
+}
+
+QIOChannel *socket_send_channel_create(void)
+{
+    QIOChannelSocket *sioc = qio_channel_socket_new();
+
+    qio_channel_socket_connect_sync(sioc, socket_args.saddr,
+                                    socket_args.errp);
+    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+    return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+    // Remove channel here
+    return 0;
+}

 static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
 {
@@ -96,6 +138,10 @@  static void socket_start_outgoing_migration(MigrationState *s,
     struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);

     data->s = s;
+
+    socket_args.saddr = saddr;
+    socket_args.errp = errp;
+
     if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
         data->hostname = g_strdup(saddr->u.inet.data->host);
     }
@@ -105,7 +151,11 @@  static void socket_start_outgoing_migration(MigrationState *s,
                                      socket_outgoing_migration,
                                      data,
                                      socket_connect_data_free);
+    /*
+      We are not freeing saddr yet, we need some kind of reference
+       counting
     qapi_free_SocketAddress(saddr);
+    */
 }

 void tcp_start_outgoing_migration(MigrationState *s,
@@ -152,7 +202,7 @@  static gboolean socket_accept_incoming_migration(QIOChannel *ioc,

 out:
     /* Close listening socket as its no longer needed */
-    qio_channel_close(ioc, NULL);
+//    qio_channel_close(ioc, NULL);
     return FALSE; /* unregister */
 }

@@ -161,6 +211,7 @@  static void socket_start_incoming_migration(SocketAddress *saddr,
                                             Error **errp)
 {
     QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+    socket_args.ioc = listen_ioc;

     if (qio_channel_socket_listen_sync(listen_ioc, saddr, errp) < 0) {
         object_unref(OBJECT(listen_ioc));