@@ -2,6 +2,7 @@
#include "io/channel-file.h"
#include "file.h"
#include "qemu/error-report.h"
+#include "migration.h"
static struct FileOutgoingArgs {
char *fname;
@@ -77,17 +78,44 @@ void file_start_outgoing_migration(MigrationState *s, const char *fname, Error *
object_unref(OBJECT(ioc));
}
+static void file_process_migration_incoming(QIOTask *task, gpointer opaque)
+{
+ QIOChannelFile *ioc = opaque;
+
+ migration_channel_process_incoming(QIO_CHANNEL(ioc));
+ object_unref(OBJECT(ioc));
+}
+
void file_start_incoming_migration(const char *fname, Error **errp)
{
QIOChannelFile *ioc;
+ QIOTask *task;
+ int channels = 1;
+ int i = 0, fd;
ioc = qio_channel_file_new_path(fname, O_RDONLY, 0, errp);
if (!ioc) {
- error_report("Error creating a channel");
+ goto out;
+ }
+
+ if (migrate_use_multifd()) {
+ channels += migrate_multifd_channels();
+ }
+
+ fd = ioc->fd;
+
+ do {
+ qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
+ task = qio_task_new(OBJECT(ioc), file_process_migration_incoming,
+ (gpointer)ioc, NULL);
+
+ qio_task_run_in_thread(task, qio_channel_file_connect_worker,
+ (gpointer)ioc, NULL, NULL);
+ } while (++i < channels && (ioc = qio_channel_file_new_fd(fd)));
+
+out:
+ if (!ioc) {
+ error_report("Error creating migration incoming channel");
return;
}
-
- qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
- migration_channel_process_incoming(QIO_CHANNEL(ioc));
- object_unref(OBJECT(ioc));
}
@@ -794,6 +794,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
}
default_channel = (channel_magic == cpu_to_be32(QEMU_VM_FILE_MAGIC));
+ } else if (migrate_use_multifd() && migrate_fixed_ram()) {
+ default_channel = multifd_recv_first_channel();
} else {
default_channel = !mis->from_src_file;
}
@@ -1254,6 +1254,11 @@ int multifd_load_setup(Error **errp)
return 0;
}
+bool multifd_recv_first_channel(void)
+{
+ return !multifd_recv_state;
+}
+
bool multifd_recv_all_channels_created(void)
{
int thread_count = migrate_multifd_channels();
@@ -1296,7 +1301,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
/* initial packet */
num_packets = 1;
} else {
- id = 0;
+ id = qatomic_read(&multifd_recv_state->count);
}
p = &multifd_recv_state->params[id];
@@ -18,6 +18,7 @@ void multifd_save_cleanup(void);
int multifd_load_setup(Error **errp);
void multifd_load_cleanup(void);
void multifd_load_shutdown(void);
+bool multifd_recv_first_channel(void);
bool multifd_recv_all_channels_created(void);
void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
On the receiving side we don't need to differentiate between main channel and threads, so whichever channel is defined first gets to be the main one. And since there are no packets, use the atomic channel count to index into the params array. Signed-off-by: Fabiano Rosas <farosas@suse.de> --- migration/file.c | 38 +++++++++++++++++++++++++++++++++----- migration/migration.c | 2 ++ migration/multifd.c | 7 ++++++- migration/multifd.h | 1 + 4 files changed, 42 insertions(+), 6 deletions(-)