diff mbox

[07/13] migration: Start of multiple fd work

Message ID 1461163481-11439-8-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela April 20, 2016, 2:44 p.m. UTC
We create new channels for each new thread created, still nothing send
through them.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  5 ++++
 migration/ram.c               | 15 ++++++++++
 migration/tcp.c               | 69 ++++++++++++++++++++++++++++++++++++-------
 3 files changed, 78 insertions(+), 11 deletions(-)
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 9f94c75..343cd90 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -189,6 +189,11 @@  void tcp_start_incoming_migration(const char *host_port, Error **errp);

 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);

+int tcp_send_channel_create(void);
+int tcp_send_channel_destroy(int s);
+int tcp_recv_channel_create(void);
+int tcp_recv_channel_destroy(int s);
+
 void unix_start_incoming_migration(const char *path, Error **errp);

 void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
diff --git a/migration/ram.c b/migration/ram.c
index 6139f7c..d321e6b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -397,6 +397,7 @@  struct MultiFDSendParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    int s;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

@@ -441,6 +442,7 @@  void migrate_multifd_send_threads_join(void)
         qemu_thread_join(&multifd_send[i].thread);
         qemu_mutex_destroy(&multifd_send[i].mutex);
         qemu_cond_destroy(&multifd_send[i].cond);
+        tcp_recv_channel_destroy(multifd_send[i].s);
     }
     g_free(multifd_send);
     multifd_send = NULL;
@@ -459,6 +461,11 @@  void migrate_multifd_send_threads_create(void)
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
+        multifd_send[i].s = tcp_send_channel_create();
+        if(multifd_send[i].s < 0) {
+            printf("Error creating a send channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_send[i].thread, "multifd_send",
                            multifd_send_thread, &multifd_send[i],
                            QEMU_THREAD_JOINABLE);
@@ -470,6 +477,7 @@  struct MultiFDRecvParams {
     QemuCond cond;
     QemuMutex mutex;
     bool quit;
+    int s;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;

@@ -514,6 +522,7 @@  void migrate_multifd_recv_threads_join(void)
         qemu_thread_join(&multifd_recv[i].thread);
         qemu_mutex_destroy(&multifd_recv[i].mutex);
         qemu_cond_destroy(&multifd_recv[i].cond);
+        tcp_send_channel_destroy(multifd_recv[i].s);
     }
     g_free(multifd_recv);
     multifd_recv = NULL;
@@ -532,6 +541,12 @@  void migrate_multifd_recv_threads_create(void)
         qemu_mutex_init(&multifd_recv[i].mutex);
         qemu_cond_init(&multifd_recv[i].cond);
         multifd_recv[i].quit = false;
+        multifd_recv[i].s = tcp_recv_channel_create();
+
+        if(multifd_recv[i].s < 0) {
+            printf("Error creating a recv channel");
+            exit(0);
+        }
         qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
                            multifd_recv_thread, &multifd_recv[i],
                            QEMU_THREAD_JOINABLE);
diff --git a/migration/tcp.c b/migration/tcp.c
index 5d42c96..5e693a7 100644
--- a/migration/tcp.c
+++ b/migration/tcp.c
@@ -35,14 +35,15 @@ 

 struct OutgoingArgs {
     MigrationState *s;
-};
+    char *host_port;
+    Error **err;
+} out_args;

 static void tcp_wait_for_connect(int fd, Error *err, void *opaque)
 {
     struct OutgoingArgs *args = opaque;
     MigrationState *s = args->s;

-    g_free(args);
     if (fd < 0) {
         DPRINTF("migrate connect error: %s\n", error_get_pretty(err));
         s->to_dst_file = NULL;
@@ -54,17 +55,38 @@  static void tcp_wait_for_connect(int fd, Error *err, void *opaque)
     }
 }

+int tcp_send_channel_create(void)
+{
+    int s;
+
+    usleep(100000);
+    s = inet_connect(out_args.host_port, out_args.err);
+    if (s < 0) {
+        DPRINTF("migrate_connect multilpe fd error: %s\n",
+                error_get_pretty_(err));
+    }
+    return s;
+}
+
+int tcp_send_channel_destroy(int s)
+{
+    return closesocket(s);
+}
+
 void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp)
 {
-    struct OutgoingArgs *args = g_new0(struct OutgoingArgs, 1);

-    args->s = s;
-    inet_nonblocking_connect(host_port, tcp_wait_for_connect, args, errp);
+    out_args.s = s;
+    out_args.host_port = g_strdup(host_port);
+    out_args.err = errp;
+    inet_nonblocking_connect(host_port, tcp_wait_for_connect, &out_args, errp);
 }

 struct IncomingArgs {
     int s;
-};
+    char *host_port;
+    Error **errp;
+} in_args;

 static void tcp_accept_incoming_migration(void *opaque)
 {
@@ -75,7 +97,6 @@  static void tcp_accept_incoming_migration(void *opaque)
     QEMUFile *f;
     int c;

-    g_free(args);
     do {
         c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
     } while (c < 0 && errno == EINTR);
@@ -102,16 +123,42 @@  out:
     closesocket(c);
 }

+int tcp_recv_channel_create(void)
+{
+    int c;
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    int s = inet_listen(in_args.host_port, NULL, 256, SOCK_STREAM, 0,
+                        in_args.errp);
+    do {
+        c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
+    } while (c < 0 && errno == EINTR);
+    closesocket(s);
+
+    DPRINTF("accepted multiple fd migration\n");
+
+    if (c < 0) {
+        error_report("could not accept migration connection (%s)",
+                     strerror(errno));
+    }
+    return c;
+}
+
+int tcp_recv_channel_destroy(int s)
+{
+    return closesocket(s);
+}
+
 void tcp_start_incoming_migration(const char *host_port, Error **errp)
 {
-    struct IncomingArgs *args = g_new0(struct IncomingArgs, 1);
     int s;

     s = inet_listen(host_port, NULL, 256, SOCK_STREAM, 0, errp);
     if (s < 0) {
-        g_free(args);
         return;
     }
-    args->s = s;
-    qemu_set_fd_handler(s, tcp_accept_incoming_migration, NULL, args);
+    in_args.s = s;
+    in_args.host_port = g_strdup(host_port);
+    in_args.errp = errp;
+    qemu_set_fd_handler(s, tcp_accept_incoming_migration, NULL, &in_args);
 }