@@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
/* migration/block.c */
AnnounceParameters *migrate_announce_params(void);
+
/* migration/savevm.c */
void dump_vmstate_json_to_file(FILE *out_fp);
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+ void *opaque);
/* migration/migration.c */
void migration_object_init(void);
@@ -131,5 +131,7 @@ typedef struct IRQState *qemu_irq;
* Function types
*/
typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
+ Error **errp);
#endif /* QEMU_TYPEDEFS_H */
@@ -400,7 +400,7 @@ void migration_incoming_state_destroy(void)
* RAM state cleanup needs to happen after multifd cleanup, because
* multifd threads can use some of its states (receivedmap).
*/
- qemu_loadvm_state_cleanup();
+ qemu_loadvm_state_cleanup(mis);
if (mis->to_src_file) {
/* Tell source that we are done */
@@ -43,6 +43,7 @@
#define MIGRATION_THREAD_DST_PREEMPT "mig/dst/preempt"
struct PostcopyBlocktimeContext;
+typedef struct ThreadPool ThreadPool;
#define MIGRATION_RESUME_ACK_VALUE (1)
@@ -187,6 +188,10 @@ struct MigrationIncomingState {
Coroutine *colo_incoming_co;
QemuSemaphore colo_incoming_sem;
+ /* Optional load threads pool and its thread exit request flag */
+ ThreadPool *load_threads;
+ bool load_threads_abort;
+
/*
* PostcopyBlocktimeContext to keep information for postcopy
* live migration, to calculate vCPU block time
@@ -54,6 +54,7 @@
#include "qemu/job.h"
#include "qemu/main-loop.h"
#include "block/snapshot.h"
+#include "block/thread-pool.h"
#include "qemu/cutils.h"
#include "io/channel-buffer.h"
#include "io/channel-file.h"
@@ -131,6 +132,35 @@ static struct mig_cmd_args {
* generic extendable format with an exception for two old entities.
*/
+/***********************************************************/
+/* Optional load threads pool support */
+
+static void qemu_loadvm_thread_pool_create(MigrationIncomingState *mis)
+{
+ assert(!mis->load_threads);
+ mis->load_threads = thread_pool_new();
+ mis->load_threads_abort = false;
+}
+
+static void qemu_loadvm_thread_pool_destroy(MigrationIncomingState *mis)
+{
+ qatomic_set(&mis->load_threads_abort, true);
+
+ bql_unlock(); /* Load threads might be waiting for BQL */
+ g_clear_pointer(&mis->load_threads, thread_pool_free);
+ bql_lock();
+}
+
+static bool qemu_loadvm_thread_pool_wait(MigrationState *s,
+ MigrationIncomingState *mis)
+{
+ bql_unlock(); /* Let load threads do work requiring BQL */
+ thread_pool_wait(mis->load_threads);
+ bql_lock();
+
+ return !migrate_has_error(s);
+}
+
/***********************************************************/
/* savevm/loadvm support */
@@ -2810,16 +2840,62 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
return 0;
}
-void qemu_loadvm_state_cleanup(void)
+struct LoadThreadData {
+ MigrationLoadThread function;
+ void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+ struct LoadThreadData *data = thread_opaque;
+ MigrationIncomingState *mis = migration_incoming_get_current();
+ g_autoptr(Error) local_err = NULL;
+
+ if (!data->function(data->opaque, &mis->load_threads_abort, &local_err)) {
+ MigrationState *s = migrate_get_current();
+
+ assert(local_err);
+
+ /*
+ * In case of multiple load threads failing which thread error
+ * return we end setting is purely arbitrary.
+ */
+ migrate_set_error(s, local_err);
+ }
+
+ return 0;
+}
+
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+ void *opaque)
+{
+ MigrationIncomingState *mis = migration_incoming_get_current();
+ struct LoadThreadData *data;
+
+ /* We only set it from this thread so it's okay to read it directly */
+ assert(!mis->load_threads_abort);
+
+ data = g_new(struct LoadThreadData, 1);
+ data->function = function;
+ data->opaque = opaque;
+
+ thread_pool_submit_immediate(mis->load_threads, qemu_loadvm_load_thread,
+ data, g_free);
+}
+
+void qemu_loadvm_state_cleanup(MigrationIncomingState *mis)
{
SaveStateEntry *se;
trace_loadvm_state_cleanup();
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (se->ops && se->ops->load_cleanup) {
se->ops->load_cleanup(se->opaque);
}
}
+
+ qemu_loadvm_thread_pool_destroy(mis);
}
/* Return true if we should continue the migration, or false. */
@@ -2970,6 +3046,7 @@ out:
int qemu_loadvm_state(QEMUFile *f)
{
+ MigrationState *s = migrate_get_current();
MigrationIncomingState *mis = migration_incoming_get_current();
Error *local_err = NULL;
int ret;
@@ -2979,6 +3056,8 @@ int qemu_loadvm_state(QEMUFile *f)
return -EINVAL;
}
+ qemu_loadvm_thread_pool_create(mis);
+
ret = qemu_loadvm_state_header(f);
if (ret) {
return ret;
@@ -3008,6 +3087,17 @@ int qemu_loadvm_state(QEMUFile *f)
return ret;
}
+ if (ret == 0) {
+ if (!qemu_loadvm_thread_pool_wait(s, mis)) {
+ ret = -EINVAL;
+ }
+ }
+ /*
+ * Set this flag unconditionally so we'll catch further attempts to
+ * start additional threads via an appropriate assert()
+ */
+ qatomic_set(&mis->load_threads_abort, true);
+
if (ret == 0) {
ret = qemu_file_get_error(f);
}
@@ -64,7 +64,7 @@ void qemu_savevm_live_state(QEMUFile *f);
int qemu_save_device_state(QEMUFile *f);
int qemu_loadvm_state(QEMUFile *f);
-void qemu_loadvm_state_cleanup(void);
+void qemu_loadvm_state_cleanup(MigrationIncomingState *mis);
int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis);
int qemu_load_device_state(QEMUFile *f);
int qemu_loadvm_approve_switchover(void);