@@ -92,3 +92,48 @@ void migration_channel_connect(MigrationState *s,
migrate_fd_connect(s, error);
error_free(error);
}
+
+
+/**
+ * @migration_channel_read_peek - Peek at migration channel, without
+ * actually removing it from channel buffer.
+ *
+ * @ioc: the channel object
+ * @buf: the memory region to read data into
+ * @buflen: the number of bytes to read in @buf
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Returns 0 if successful, returns -1 and sets @errp if fails.
+ */
+int migration_channel_read_peek(QIOChannel *ioc,
+ const char *buf,
+ const size_t buflen,
+ Error **errp)
+{
+ ssize_t len = 0;
+ struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
+
+ while (true) {
+ len = qio_channel_readv_full(ioc, &iov, 1, NULL,
+ NULL, QIO_CHANNEL_READ_FLAG_MSG_PEEK, errp);
+
+ if (len <= 0 && len != QIO_CHANNEL_ERR_BLOCK) {
+ error_setg(errp,
+ "Failed to peek at channel");
+ return -1;
+ }
+
+ if (len == buflen) {
+ break;
+ }
+
+ /* 1ms sleep. */
+ if (qemu_in_coroutine()) {
+ qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000);
+ } else {
+ g_usleep(1000);
+ }
+ }
+
+ return 0;
+}
@@ -24,4 +24,9 @@ void migration_channel_connect(MigrationState *s,
QIOChannel *ioc,
const char *hostname,
Error *error_in);
+
+int migration_channel_read_peek(QIOChannel *ioc,
+ const char *buf,
+ const size_t buflen,
+ Error **errp);
#endif
@@ -31,6 +31,7 @@
#include "migration.h"
#include "savevm.h"
#include "qemu-file.h"
+#include "channel.h"
#include "migration/vmstate.h"
#include "block/block.h"
#include "qapi/error.h"
@@ -663,10 +664,6 @@ static bool migration_incoming_setup(QEMUFile *f, Error **errp)
{
MigrationIncomingState *mis = migration_incoming_get_current();
- if (multifd_load_setup(errp) != 0) {
- return false;
- }
-
if (!mis->from_src_file) {
mis->from_src_file = f;
}
@@ -733,31 +730,56 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
{
MigrationIncomingState *mis = migration_incoming_get_current();
Error *local_err = NULL;
- bool start_migration;
QEMUFile *f;
+ bool default_channel = true;
+ uint32_t channel_magic = 0;
+ int ret = 0;
- if (!mis->from_src_file) {
- /* The first connection (multifd may have multiple) */
+ if (migrate_use_multifd() && !migrate_postcopy_ram() &&
+ qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
+ /*
+ * With multiple channels, it is possible that we receive channels
+ * out of order on destination side, causing incorrect mapping of
+ * source channels on destination side. Check channel MAGIC to
+ * decide type of channel. Please note this is best effort, postcopy
+ * preempt channel does not send any magic number so avoid it for
+ * postcopy live migration. Also tls live migration already does
+ * tls handshake while initializing main channel so with tls this
+ * issue is not possible.
+ */
+ ret = migration_channel_read_peek(ioc, (void *)&channel_magic,
+ sizeof(channel_magic), &local_err);
+
+ if (ret != 0) {
+ error_propagate(errp, local_err);
+ return;
+ }
+
+ default_channel = (channel_magic == cpu_to_be32(QEMU_VM_FILE_MAGIC));
+ } else {
+ default_channel = !mis->from_src_file;
+ }
+
+ if (multifd_load_setup(errp) != 0) {
+ error_setg(errp, "Failed to setup multifd channels");
+ return;
+ }
+
+ if (default_channel) {
f = qemu_file_new_input(ioc);
if (!migration_incoming_setup(f, errp)) {
return;
}
-
- /*
- * Common migration only needs one channel, so we can start
- * right now. Some features need more than one channel, we wait.
- */
- start_migration = !migration_needs_multiple_sockets();
} else {
/* Multiple connections */
assert(migration_needs_multiple_sockets());
if (migrate_use_multifd()) {
- start_migration = multifd_recv_new_channel(ioc, &local_err);
+ multifd_recv_new_channel(ioc, &local_err);
} else {
assert(migrate_postcopy_preempt());
f = qemu_file_new_input(ioc);
- start_migration = postcopy_preempt_new_channel(mis, f);
+ postcopy_preempt_new_channel(mis, f);
}
if (local_err) {
error_propagate(errp, local_err);
@@ -765,7 +787,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
}
}
- if (start_migration) {
+ if (migration_has_all_channels()) {
/* If it's a recovery, we're done */
if (postcopy_try_recover()) {
return;
@@ -1164,9 +1164,14 @@ int multifd_load_setup(Error **errp)
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
uint8_t i;
- if (!migrate_use_multifd()) {
+ /*
+ * Return successfully if multiFD recv state is already initialised
+ * or multiFD is not enabled.
+ */
+ if (multifd_recv_state || !migrate_use_multifd()) {
return 0;
}
+
if (!migrate_multi_channels_is_allowed()) {
error_setg(errp, "multifd is not supported by current protocol");
return -1;
@@ -1227,11 +1232,9 @@ bool multifd_recv_all_channels_created(void)
/*
* Try to receive all multifd channels to get ready for the migration.
- * - Return true and do not set @errp when correctly receiving all channels;
- * - Return false and do not set @errp when correctly receiving the current one;
- * - Return false and set @errp when failing to receive the current channel.
+ * Sets @errp when failing to receive the current channel.
*/
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
+void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
{
MultiFDRecvParams *p;
Error *local_err = NULL;
@@ -1244,7 +1247,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
"failed to receive packet"
" via multifd channel %d: ",
qatomic_read(&multifd_recv_state->count));
- return false;
+ return;
}
trace_multifd_recv_new_channel(id);
@@ -1254,7 +1257,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
id);
multifd_recv_terminate_threads(local_err);
error_propagate(errp, local_err);
- return false;
+ return;
}
p->c = ioc;
object_ref(OBJECT(ioc));
@@ -1265,6 +1268,4 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
QEMU_THREAD_JOINABLE);
qatomic_inc(&multifd_recv_state->count);
- return qatomic_read(&multifd_recv_state->count) ==
- migrate_multifd_channels();
}
@@ -18,7 +18,7 @@ void multifd_save_cleanup(void);
int multifd_load_setup(Error **errp);
int multifd_load_cleanup(Error **errp);
bool multifd_recv_all_channels_created(void);
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
+void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
void multifd_recv_sync_main(void);
int multifd_send_sync_main(QEMUFile *f);
int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset);
@@ -1539,7 +1539,7 @@ void postcopy_unregister_shared_ufd(struct PostCopyFD *pcfd)
}
}
-bool postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file)
+void postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file)
{
/*
* The new loading channel has its own threads, so it needs to be
@@ -1548,9 +1548,6 @@ bool postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file)
qemu_file_set_blocking(file, true);
mis->postcopy_qemufile_dst = file;
trace_postcopy_preempt_new_channel();
-
- /* Start the migration immediately */
- return true;
}
/*
@@ -190,7 +190,7 @@ enum PostcopyChannels {
RAM_CHANNEL_MAX,
};
-bool postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file);
+void postcopy_preempt_new_channel(MigrationIncomingState *mis, QEMUFile *file);
int postcopy_preempt_setup(MigrationState *s, Error **errp);
int postcopy_preempt_wait_channel(MigrationState *s);