@@ -215,9 +215,11 @@ void migration_object_init(void)
current_incoming->postcopy_remote_fds =
g_array_new(FALSE, TRUE, sizeof(struct PostCopyFD));
qemu_mutex_init(¤t_incoming->rp_mutex);
+ qemu_mutex_init(¤t_incoming->postcopy_prio_thread_mutex);
qemu_event_init(¤t_incoming->main_thread_load_event, false);
qemu_sem_init(¤t_incoming->postcopy_pause_sem_dst, 0);
qemu_sem_init(¤t_incoming->postcopy_pause_sem_fault, 0);
+ qemu_sem_init(¤t_incoming->postcopy_pause_sem_fast_load, 0);
qemu_mutex_init(¤t_incoming->page_request_mutex);
current_incoming->page_requested = g_tree_new(page_request_addr_cmp);
@@ -697,9 +699,9 @@ static bool postcopy_try_recover(void)
/*
* Here, we only wake up the main loading thread (while the
- * fault thread will still be waiting), so that we can receive
+ * rest threads will still be waiting), so that we can receive
* commands from source now, and answer it if needed. The
- * fault thread will be woken up afterwards until we are sure
+ * rest threads will be woken up afterwards until we are sure
* that source is ready to reply to page requests.
*/
qemu_sem_post(&mis->postcopy_pause_sem_dst);
@@ -3513,6 +3515,18 @@ static MigThrError postcopy_pause(MigrationState *s)
qemu_file_shutdown(file);
qemu_fclose(file);
+ /*
+ * Do the same to postcopy fast path socket too if there is. No
+ * locking needed because no racer as long as we do this before setting
+ * status to paused.
+ */
+ if (s->postcopy_qemufile_src) {
+ migration_ioc_unregister_yank_from_file(s->postcopy_qemufile_src);
+ qemu_file_shutdown(s->postcopy_qemufile_src);
+ qemu_fclose(s->postcopy_qemufile_src);
+ s->postcopy_qemufile_src = NULL;
+ }
+
migrate_set_state(&s->state, s->state,
MIGRATION_STATUS_POSTCOPY_PAUSED);
@@ -3568,8 +3582,13 @@ static MigThrError migration_detect_error(MigrationState *s)
return MIG_THR_ERR_FATAL;
}
- /* Try to detect any file errors */
- ret = qemu_file_get_error_obj(s->to_dst_file, &local_error);
+ /*
+ * Try to detect any file errors. Note that postcopy_qemufile_src will
+ * be NULL when postcopy preempt is not enabled.
+ */
+ ret = qemu_file_get_error_obj_any(s->to_dst_file,
+ s->postcopy_qemufile_src,
+ &local_error);
if (!ret) {
/* Everything is fine */
assert(!local_error);
@@ -118,6 +118,18 @@ struct MigrationIncomingState {
/* Postcopy priority thread is used to receive postcopy requested pages */
QemuThread postcopy_prio_thread;
bool postcopy_prio_thread_created;
+ /*
+ * Used to sync between the ram load main thread and the fast ram load
+ * thread. It protects postcopy_qemufile_dst, which is the postcopy
+ * fast channel.
+ *
+ * The ram fast load thread will take it mostly for the whole lifecycle
+ * because it needs to continuously read data from the channel, and
+ * it'll only release this mutex if postcopy is interrupted, so that
+ * the ram load main thread will take this mutex over and properly
+ * release the broken channel.
+ */
+ QemuMutex postcopy_prio_thread_mutex;
/*
* An array of temp host huge pages to be used, one for each postcopy
* channel.
@@ -147,6 +159,13 @@ struct MigrationIncomingState {
/* notify PAUSED postcopy incoming migrations to try to continue */
QemuSemaphore postcopy_pause_sem_dst;
QemuSemaphore postcopy_pause_sem_fault;
+ /*
+ * This semaphore is used to allow the ram fast load thread (only when
+ * postcopy preempt is enabled) fall into sleep when there's network
+ * interruption detected. When the recovery is done, the main load
+ * thread will kick the fast ram load thread using this semaphore.
+ */
+ QemuSemaphore postcopy_pause_sem_fast_load;
/* List of listening socket addresses */
SocketAddressList *socket_address_list;
@@ -1580,6 +1580,15 @@ int postcopy_preempt_setup(MigrationState *s, Error **errp)
return 0;
}
+static void postcopy_pause_ram_fast_load(MigrationIncomingState *mis)
+{
+ trace_postcopy_pause_fast_load();
+ qemu_mutex_unlock(&mis->postcopy_prio_thread_mutex);
+ qemu_sem_wait(&mis->postcopy_pause_sem_fast_load);
+ qemu_mutex_lock(&mis->postcopy_prio_thread_mutex);
+ trace_postcopy_pause_fast_load_continued();
+}
+
void *postcopy_preempt_thread(void *opaque)
{
MigrationIncomingState *mis = opaque;
@@ -1592,11 +1601,23 @@ void *postcopy_preempt_thread(void *opaque)
qemu_sem_post(&mis->thread_sync_sem);
/* Sending RAM_SAVE_FLAG_EOS to terminate this thread */
- ret = ram_load_postcopy(mis->postcopy_qemufile_dst, RAM_CHANNEL_POSTCOPY);
+ qemu_mutex_lock(&mis->postcopy_prio_thread_mutex);
+ while (1) {
+ ret = ram_load_postcopy(mis->postcopy_qemufile_dst,
+ RAM_CHANNEL_POSTCOPY);
+ /* If error happened, go into recovery routine */
+ if (ret) {
+ postcopy_pause_ram_fast_load(mis);
+ } else {
+ /* We're done */
+ break;
+ }
+ }
+ qemu_mutex_unlock(&mis->postcopy_prio_thread_mutex);
rcu_unregister_thread();
trace_postcopy_preempt_thread_exit();
- return ret == 0 ? NULL : (void *)-1;
+ return NULL;
}
@@ -139,6 +139,33 @@ int qemu_file_get_error_obj(QEMUFile *f, Error **errp)
return f->last_error;
}
+/*
+ * Get last error for either stream f1 or f2 with optional Error*.
+ * The error returned (non-zero) can be either from f1 or f2.
+ *
+ * If any of the qemufile* is NULL, then skip the check on that file.
+ *
+ * When there is no error on both qemufile, zero is returned.
+ */
+int qemu_file_get_error_obj_any(QEMUFile *f1, QEMUFile *f2, Error **errp)
+{
+ int ret = 0;
+
+ if (f1) {
+ ret = qemu_file_get_error_obj(f1, errp);
+ /* If there's already error detected, return */
+ if (ret) {
+ return ret;
+ }
+ }
+
+ if (f2) {
+ ret = qemu_file_get_error_obj(f2, errp);
+ }
+
+ return ret;
+}
+
/*
* Set the last error for stream f with optional Error*
*/
@@ -156,6 +156,7 @@ void qemu_file_update_transfer(QEMUFile *f, int64_t len);
void qemu_file_set_rate_limit(QEMUFile *f, int64_t new_rate);
int64_t qemu_file_get_rate_limit(QEMUFile *f);
int qemu_file_get_error_obj(QEMUFile *f, Error **errp);
+int qemu_file_get_error_obj_any(QEMUFile *f1, QEMUFile *f2, Error **errp);
void qemu_file_set_error_obj(QEMUFile *f, int ret, Error *err);
void qemu_file_set_error(QEMUFile *f, int ret);
int qemu_file_shutdown(QEMUFile *f);
@@ -2152,6 +2152,13 @@ static int loadvm_postcopy_handle_resume(MigrationIncomingState *mis)
*/
qemu_sem_post(&mis->postcopy_pause_sem_fault);
+ if (migrate_postcopy_preempt()) {
+ /* The channel should already be setup again; make sure of it */
+ assert(mis->postcopy_qemufile_dst);
+ /* Kick the fast ram load thread too */
+ qemu_sem_post(&mis->postcopy_pause_sem_fast_load);
+ }
+
return 0;
}
@@ -2597,6 +2604,21 @@ static bool postcopy_pause_incoming(MigrationIncomingState *mis)
mis->to_src_file = NULL;
qemu_mutex_unlock(&mis->rp_mutex);
+ /*
+ * NOTE: this must happen before reset the PostcopyTmpPages below,
+ * otherwise it's racy to reset those fields when the fast load thread
+ * can be accessing it in parallel.
+ */
+ if (mis->postcopy_qemufile_dst) {
+ qemu_file_shutdown(mis->postcopy_qemufile_dst);
+ /* Take the mutex to make sure the fast ram load thread halted */
+ qemu_mutex_lock(&mis->postcopy_prio_thread_mutex);
+ migration_ioc_unregister_yank_from_file(mis->postcopy_qemufile_dst);
+ qemu_fclose(mis->postcopy_qemufile_dst);
+ mis->postcopy_qemufile_dst = NULL;
+ qemu_mutex_unlock(&mis->postcopy_prio_thread_mutex);
+ }
+
migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_ACTIVE,
MIGRATION_STATUS_POSTCOPY_PAUSED);
@@ -2634,8 +2656,8 @@ retry:
while (true) {
section_type = qemu_get_byte(f);
- if (qemu_file_get_error(f)) {
- ret = qemu_file_get_error(f);
+ ret = qemu_file_get_error_obj_any(f, mis->postcopy_qemufile_dst, NULL);
+ if (ret) {
break;
}
@@ -270,6 +270,8 @@ mark_postcopy_blocktime_begin(uint64_t addr, void *dd, uint32_t time, int cpu, i
mark_postcopy_blocktime_end(uint64_t addr, void *dd, uint32_t time, int affected_cpu) "addr: 0x%" PRIx64 ", dd: %p, time: %u, affected_cpu: %d"
postcopy_pause_fault_thread(void) ""
postcopy_pause_fault_thread_continued(void) ""
+postcopy_pause_fast_load(void) ""
+postcopy_pause_fast_load_continued(void) ""
postcopy_ram_fault_thread_entry(void) ""
postcopy_ram_fault_thread_exit(void) ""
postcopy_ram_fault_thread_fds_core(int baseufd, int quitfd) "ufd: %d quitfd: %d"