@@ -21,6 +21,7 @@
#include "migration/vmstate.h"
#include "qapi-types.h"
#include "exec/cpu-common.h"
+#include "io/channel.h"
#define QEMU_VM_FILE_MAGIC 0x5145564d
#define QEMU_VM_FILE_VERSION_COMPAT 0x00000002
@@ -211,6 +212,11 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp);
void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp);
+QIOChannel *socket_recv_channel_create(void);
+int socket_recv_channel_destroy(QIOChannel *recv);
+QIOChannel *socket_send_channel_create(void);
+int socket_send_channel_destroy(QIOChannel *send);
+
void unix_start_incoming_migration(const char *path, Error **errp);
void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp);
@@ -396,6 +396,8 @@ struct MultiFDSendParams {
QemuCond cond;
QemuMutex mutex;
bool quit;
+ bool started;
+ QIOChannel *c;
};
typedef struct MultiFDSendParams MultiFDSendParams;
@@ -404,6 +406,13 @@ static MultiFDSendParams *multifd_send;
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *params = opaque;
+ char start = 's';
+
+ qio_channel_write(params->c, &start, 1, &error_abort);
+ qemu_mutex_lock(¶ms->mutex);
+ params->started = true;
+ qemu_cond_signal(¶ms->cond);
+ qemu_mutex_unlock(¶ms->mutex);
qemu_mutex_lock(¶ms->mutex);
while (!params->quit){
@@ -440,6 +449,7 @@ void migrate_multifd_send_threads_join(void)
qemu_thread_join(&multifd_send[i].thread);
qemu_mutex_destroy(&multifd_send[i].mutex);
qemu_cond_destroy(&multifd_send[i].cond);
+ socket_send_channel_destroy(multifd_send[i].c);
}
g_free(multifd_send);
multifd_send = NULL;
@@ -458,9 +468,20 @@ void migrate_multifd_send_threads_create(void)
qemu_mutex_init(&multifd_send[i].mutex);
qemu_cond_init(&multifd_send[i].cond);
multifd_send[i].quit = false;
+ multifd_send[i].started = false;
+ multifd_send[i].c = socket_send_channel_create();
+ if(!multifd_send[i].c) {
+ printf("Error creating a send channel");
+ exit(0);
+ }
qemu_thread_create(&multifd_send[i].thread, "multifd_send",
multifd_send_thread, &multifd_send[i],
QEMU_THREAD_JOINABLE);
+ qemu_mutex_lock(&multifd_send[i].mutex);
+ while (!multifd_send[i].started) {
+ qemu_cond_wait(&multifd_send[i].cond, &multifd_send[i].mutex);
+ }
+ qemu_mutex_unlock(&multifd_send[i].mutex);
}
}
@@ -469,6 +490,8 @@ struct MultiFDRecvParams {
QemuCond cond;
QemuMutex mutex;
bool quit;
+ bool started;
+ QIOChannel *c;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -477,7 +500,14 @@ static MultiFDRecvParams *multifd_recv;
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *params = opaque;
-
+ char start;
+
+ qio_channel_read(params->c, &start, 1, &error_abort);
+ qemu_mutex_lock(¶ms->mutex);
+ params->started = true;
+ qemu_cond_signal(¶ms->cond);
+ qemu_mutex_unlock(¶ms->mutex);
+
qemu_mutex_lock(¶ms->mutex);
while (!params->quit){
qemu_cond_wait(¶ms->cond, ¶ms->mutex);
@@ -513,6 +543,7 @@ void migrate_multifd_recv_threads_join(void)
qemu_thread_join(&multifd_recv[i].thread);
qemu_mutex_destroy(&multifd_recv[i].mutex);
qemu_cond_destroy(&multifd_recv[i].cond);
+ socket_send_channel_destroy(multifd_recv[i].c);
}
g_free(multifd_recv);
multifd_recv = NULL;
@@ -531,9 +562,21 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&multifd_recv[i].mutex);
qemu_cond_init(&multifd_recv[i].cond);
multifd_recv[i].quit = false;
+ multifd_recv[i].started = false;
+ multifd_recv[i].c = socket_recv_channel_create();
+
+ if(!multifd_recv[i].c) {
+ printf("Error creating a recv channel");
+ exit(0);
+ }
qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
multifd_recv_thread, &multifd_recv[i],
QEMU_THREAD_JOINABLE);
+ qemu_mutex_lock(&multifd_recv[i].mutex);
+ while (!multifd_recv[i].started) {
+ qemu_cond_wait(&multifd_recv[i].cond, &multifd_recv[i].mutex);
+ }
+ qemu_mutex_unlock(&multifd_recv[i].mutex);
}
}
@@ -24,6 +24,48 @@
#include "io/channel-socket.h"
#include "trace.h"
+struct SocketArgs {
+ QIOChannelSocket *ioc;
+ SocketAddress *saddr;
+ Error **errp;
+} socket_args;
+
+QIOChannel *socket_recv_channel_create(void)
+{
+ QIOChannelSocket *sioc;
+ Error *err = NULL;
+
+ sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc),
+ &err);
+ if (!sioc) {
+ error_report("could not accept migration connection (%s)",
+ error_get_pretty(err));
+ return NULL;
+ }
+ return QIO_CHANNEL(sioc);
+}
+
+int socket_recv_channel_destroy(QIOChannel *recv)
+{
+ // Remove channel here
+ return 0;
+}
+
+QIOChannel *socket_send_channel_create(void)
+{
+ QIOChannelSocket *sioc = qio_channel_socket_new();
+
+ qio_channel_socket_connect_sync(sioc, socket_args.saddr,
+ socket_args.errp);
+ qio_channel_set_delay(QIO_CHANNEL(sioc), false);
+ return QIO_CHANNEL(sioc);
+}
+
+int socket_send_channel_destroy(QIOChannel *send)
+{
+ // Remove channel here
+ return 0;
+}
static SocketAddress *tcp_build_address(const char *host_port, Error **errp)
{
@@ -96,6 +138,10 @@ static void socket_start_outgoing_migration(MigrationState *s,
struct SocketConnectData *data = g_new0(struct SocketConnectData, 1);
data->s = s;
+
+ socket_args.saddr = saddr;
+ socket_args.errp = errp;
+
if (saddr->type == SOCKET_ADDRESS_KIND_INET) {
data->hostname = g_strdup(saddr->u.inet.data->host);
}
@@ -105,7 +151,11 @@ static void socket_start_outgoing_migration(MigrationState *s,
socket_outgoing_migration,
data,
socket_connect_data_free);
+ /*
+ We are not freeing saddr yet, we need some kind of reference
+ counting
qapi_free_SocketAddress(saddr);
+ */
}
void tcp_start_outgoing_migration(MigrationState *s,
@@ -152,7 +202,7 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc,
out:
/* Close listening socket as its no longer needed */
- qio_channel_close(ioc, NULL);
+// qio_channel_close(ioc, NULL);
return FALSE; /* unregister */
}
@@ -161,6 +211,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
Error **errp)
{
QIOChannelSocket *listen_ioc = qio_channel_socket_new();
+ socket_args.ioc = listen_ioc;
if (qio_channel_socket_listen_sync(listen_ioc, saddr, errp) < 0) {
object_unref(OBJECT(listen_ioc));
We create new channels for each new thread created. We only send through them a character to be sure that we are creating the channels in the right order. Note: Reference count/freeing of channels is not done Signed-off-by: Juan Quintela <quintela@redhat.com> --- include/migration/migration.h | 6 +++++ migration/ram.c | 45 +++++++++++++++++++++++++++++++++++- migration/socket.c | 53 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 102 insertions(+), 2 deletions(-)