@@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
/* migration/block.c */
AnnounceParameters *migrate_announce_params(void);
+
/* migration/savevm.c */
void dump_vmstate_json_to_file(FILE *out_fp);
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+ void *opaque);
/* migration/migration.c */
void migration_object_init(void);
@@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
* Function types
*/
typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
#endif /* QEMU_TYPEDEFS_H */
@@ -54,6 +54,7 @@
#include "qemu/job.h"
#include "qemu/main-loop.h"
#include "block/snapshot.h"
+#include "block/thread-pool.h"
#include "qemu/cutils.h"
#include "io/channel-buffer.h"
#include "io/channel-file.h"
@@ -71,6 +72,10 @@
const unsigned int postcopy_ram_discard_version;
+static ThreadPool *load_threads;
+static int load_threads_ret;
+static bool load_threads_abort;
+
/* Subcommands for QEMU_VM_COMMAND */
enum qemu_vm_cmd {
MIG_CMD_INVALID = 0, /* Must be 0 */
@@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
int ret;
trace_loadvm_state_setup();
+
+ assert(!load_threads);
+ load_threads = thread_pool_new();
+ load_threads_ret = 0;
+ load_threads_abort = false;
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops || !se->ops->load_setup) {
continue;
@@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
return ret;
}
}
+
+ return 0;
+}
+
+struct LoadThreadData {
+ MigrationLoadThread function;
+ void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+ struct LoadThreadData *data = thread_opaque;
+ int ret;
+
+ ret = data->function(&load_threads_abort, data->opaque);
+ if (ret && !qatomic_read(&load_threads_ret)) {
+ /*
+ * Racy with the above read but that's okay - which thread error
+ * return we report is purely arbitrary anyway.
+ */
+ qatomic_set(&load_threads_ret, ret);
+ }
+
return 0;
}
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+ void *opaque)
+{
+ struct LoadThreadData *data;
+
+ /* We only set it from this thread so it's okay to read it directly */
+ assert(!load_threads_abort);
+
+ data = g_new(struct LoadThreadData, 1);
+ data->function = function;
+ data->opaque = opaque;
+
+ thread_pool_submit(load_threads, qemu_loadvm_load_thread,
+ data, g_free);
+ thread_pool_adjust_max_threads_to_work(load_threads);
+}
+
void qemu_loadvm_state_cleanup(void)
{
SaveStateEntry *se;
trace_loadvm_state_cleanup();
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (se->ops && se->ops->load_cleanup) {
se->ops->load_cleanup(se->opaque);
}
}
+
+ /*
+ * We might be called even without earlier qemu_loadvm_state_setup()
+ * call if qemu_loadvm_state() fails very early.
+ */
+ if (load_threads) {
+ qatomic_set(&load_threads_abort, true);
+ bql_unlock(); /* Load threads might be waiting for BQL */
+ thread_pool_wait(load_threads);
+ bql_lock();
+ g_clear_pointer(&load_threads, thread_pool_free);
+ }
}
/* Return true if we should continue the migration, or false. */
@@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
return ret;
}
+ if (ret == 0) {
+ bql_unlock(); /* Let load threads do work requiring BQL */
+ thread_pool_wait(load_threads);
+ bql_lock();
+
+ ret = load_threads_ret;
+ }
+ /*
+ * Set this flag unconditionally so we'll catch further attempts to
+ * start additional threads via an appropriate assert()
+ */
+ qatomic_set(&load_threads_abort, true);
+
if (ret == 0) {
ret = qemu_file_get_error(f);
}