@@ -189,6 +189,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
+int tcp_send_channel_create(void);
+int tcp_send_channel_destroy(int s);
+int tcp_recv_channel_create(void);
+int tcp_recv_channel_destroy(int s);
+
void unix_start_incoming_migration(const char *path, Error **errp);
void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
@@ -397,6 +397,7 @@ struct MultiFDSendParams {
QemuCond cond;
QemuMutex mutex;
bool quit;
+ int s;
};
typedef struct MultiFDSendParams MultiFDSendParams;
@@ -441,6 +442,7 @@ void migrate_multifd_send_threads_join(void)
qemu_thread_join(&multifd_send[i].thread);
qemu_mutex_destroy(&multifd_send[i].mutex);
qemu_cond_destroy(&multifd_send[i].cond);
+ tcp_recv_channel_destroy(multifd_send[i].s);
}
g_free(multifd_send);
multifd_send = NULL;
@@ -459,6 +461,11 @@ void migrate_multifd_send_threads_create(void)
qemu_mutex_init(&multifd_send[i].mutex);
qemu_cond_init(&multifd_send[i].cond);
multifd_send[i].quit = false;
+ multifd_send[i].s = tcp_send_channel_create();
+ if(multifd_send[i].s < 0) {
+ printf("Error creating a send channel");
+ exit(0);
+ }
qemu_thread_create(&multifd_send[i].thread, "multifd_send",
multifd_send_thread, &multifd_send[i],
QEMU_THREAD_JOINABLE);
@@ -470,6 +477,7 @@ struct MultiFDRecvParams {
QemuCond cond;
QemuMutex mutex;
bool quit;
+ int s;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -514,6 +522,7 @@ void migrate_multifd_recv_threads_join(void)
qemu_thread_join(&multifd_recv[i].thread);
qemu_mutex_destroy(&multifd_recv[i].mutex);
qemu_cond_destroy(&multifd_recv[i].cond);
+ tcp_send_channel_destroy(multifd_recv[i].s);
}
g_free(multifd_recv);
multifd_recv = NULL;
@@ -532,6 +541,12 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&multifd_recv[i].mutex);
qemu_cond_init(&multifd_recv[i].cond);
multifd_recv[i].quit = false;
+ multifd_recv[i].s = tcp_recv_channel_create();
+
+ if(multifd_recv[i].s < 0) {
+ printf("Error creating a recv channel");
+ exit(0);
+ }
qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
multifd_recv_thread, &multifd_recv[i],
QEMU_THREAD_JOINABLE);
@@ -35,14 +35,15 @@
struct OutgoingArgs {
MigrationState *s;
-};
+ char *host_port;
+ Error **err;
+} out_args;
static void tcp_wait_for_connect(int fd, Error *err, void *opaque)
{
struct OutgoingArgs *args = opaque;
MigrationState *s = args->s;
- g_free(args);
if (fd < 0) {
DPRINTF("migrate connect error: %s\n", error_get_pretty(err));
s->to_dst_file = NULL;
@@ -54,17 +55,38 @@ static void tcp_wait_for_connect(int fd, Error *err, void *opaque)
}
}
+int tcp_send_channel_create(void)
+{
+ int s;
+
+ usleep(100000);
+ s = inet_connect(out_args.host_port, out_args.err);
+ if (s < 0) {
+ DPRINTF("migrate_connect multilpe fd error: %s\n",
+ error_get_pretty_(err));
+ }
+ return s;
+}
+
+int tcp_send_channel_destroy(int s)
+{
+ return closesocket(s);
+}
+
void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp)
{
- struct OutgoingArgs *args = g_new0(struct OutgoingArgs, 1);
- args->s = s;
- inet_nonblocking_connect(host_port, tcp_wait_for_connect, args, errp);
+ out_args.s = s;
+ out_args.host_port = g_strdup(host_port);
+ out_args.err = errp;
+ inet_nonblocking_connect(host_port, tcp_wait_for_connect, &out_args, errp);
}
struct IncomingArgs {
int s;
-};
+ char *host_port;
+ Error **errp;
+} in_args;
static void tcp_accept_incoming_migration(void *opaque)
{
@@ -75,7 +97,6 @@ static void tcp_accept_incoming_migration(void *opaque)
QEMUFile *f;
int c;
- g_free(args);
do {
c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
} while (c < 0 && errno == EINTR);
@@ -102,16 +123,42 @@ out:
closesocket(c);
}
+int tcp_recv_channel_create(void)
+{
+ int c;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+ int s = inet_listen(in_args.host_port, NULL, 256, SOCK_STREAM, 0,
+ in_args.errp);
+ do {
+ c = qemu_accept(s, (struct sockaddr *)&addr, &addrlen);
+ } while (c < 0 && errno == EINTR);
+ closesocket(s);
+
+ DPRINTF("accepted multiple fd migration\n");
+
+ if (c < 0) {
+ error_report("could not accept migration connection (%s)",
+ strerror(errno));
+ }
+ return c;
+}
+
+int tcp_recv_channel_destroy(int s)
+{
+ return closesocket(s);
+}
+
void tcp_start_incoming_migration(const char *host_port, Error **errp)
{
- struct IncomingArgs *args = g_new0(struct IncomingArgs, 1);
int s;
s = inet_listen(host_port, NULL, 256, SOCK_STREAM, 0, errp);
if (s < 0) {
- g_free(args);
return;
}
- args->s = s;
- qemu_set_fd_handler(s, tcp_accept_incoming_migration, NULL, args);
+ in_args.s = s;
+ in_args.host_port = g_strdup(host_port);
+ in_args.errp = errp;
+ qemu_set_fd_handler(s, tcp_accept_incoming_migration, NULL, &in_args);
}
We create new channels for each new thread created, still nothing send through them. Signed-off-by: Juan Quintela <quintela@redhat.com> --- include/migration/migration.h | 5 ++++ migration/ram.c | 15 ++++++++++ migration/tcp.c | 69 ++++++++++++++++++++++++++++++++++++------- 3 files changed, 78 insertions(+), 11 deletions(-)