@@ -114,4 +114,12 @@ bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
char *data, size_t len);
bool migration_has_device_state_support(void);
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque);
+
+void multifd_abort_device_state_save_threads(void);
+int multifd_join_device_state_save_threads(void);
+
#endif
@@ -105,6 +105,29 @@ typedef struct SaveVMHandlers {
*/
int (*save_live_complete_precopy)(QEMUFile *f, void *opaque);
+ /* This runs in a separate thread. */
+
+ /**
+ * @save_live_complete_precopy_thread
+ *
+ * Called at the end of a precopy phase from a separate worker thread
+ * in configurations where multifd device state transfer is supported
+ * in order to perform asynchronous transmission of the remaining data in
+ * parallel with @save_live_complete_precopy handlers.
+ * When postcopy is enabled, devices that support postcopy will skip this
+ * step.
+ *
+ * @idstr: this device section idstr
+ * @instance_id: this device section instance_id
+ * @abort_flag: flag indicating that the migration core wants to abort
+ * the transmission and so the handler should exit ASAP. To be read by
+ * qatomic_read() or similar.
+ * @opaque: data pointer passed to register_savevm_live()
+ *
+ * Returns zero to indicate success and negative for error
+ */
+ SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread;
+
/* This runs both outside and inside the BQL. */
/**
@@ -132,5 +132,9 @@ typedef struct IRQState *qemu_irq;
*/
typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
+typedef int (*SaveLiveCompletePrecopyThreadHandler)(char *idstr,
+ uint32_t instance_id,
+ bool *abort_flag,
+ void *opaque);
#endif /* QEMU_TYPEDEFS_H */
@@ -9,12 +9,17 @@
#include "qemu/osdep.h"
#include "qemu/lockable.h"
+#include "block/thread-pool.h"
#include "migration/misc.h"
#include "multifd.h"
#include "options.h"
static QemuMutex queue_job_mutex;
+static ThreadPool *send_threads;
+static int send_threads_ret;
+static bool send_threads_abort;
+
static MultiFDSendData *device_state_send;
void multifd_device_state_send_setup(void)
@@ -22,6 +27,10 @@ void multifd_device_state_send_setup(void)
qemu_mutex_init(&queue_job_mutex);
device_state_send = multifd_send_data_alloc();
+
+ send_threads = thread_pool_new();
+ send_threads_ret = 0;
+ send_threads_abort = false;
}
void multifd_device_state_clear(MultiFDDeviceState_t *device_state)
@@ -32,6 +41,7 @@ void multifd_device_state_clear(MultiFDDeviceState_t *device_state)
void multifd_device_state_send_cleanup(void)
{
+ g_clear_pointer(&send_threads, thread_pool_free);
g_clear_pointer(&device_state_send, multifd_send_data_free);
qemu_mutex_destroy(&queue_job_mutex);
@@ -106,3 +116,78 @@ bool migration_has_device_state_support(void)
return migrate_multifd() && !migrate_mapped_ram() &&
migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
}
+
+struct MultiFDDSSaveThreadData {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+ char *idstr;
+ uint32_t instance_id;
+ void *handler_opaque;
+};
+
+static void multifd_device_state_save_thread_data_free(void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data = opaque;
+
+ g_clear_pointer(&data->idstr, g_free);
+ g_free(data);
+}
+
+static int multifd_device_state_save_thread(void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data = opaque;
+ int ret;
+
+ ret = data->hdlr(data->idstr, data->instance_id, &send_threads_abort,
+ data->handler_opaque);
+ if (ret && !qatomic_read(&send_threads_ret)) {
+ /*
+ * Racy with the above read but that's okay - which thread error
+ * return we report is purely arbitrary anyway.
+ */
+ qatomic_set(&send_threads_ret, ret);
+ }
+
+ return 0;
+}
+
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data;
+
+ assert(migration_has_device_state_support());
+
+ data = g_new(struct MultiFDDSSaveThreadData, 1);
+ data->hdlr = hdlr;
+ data->idstr = g_strdup(idstr);
+ data->instance_id = instance_id;
+ data->handler_opaque = opaque;
+
+ thread_pool_submit(send_threads,
+ multifd_device_state_save_thread,
+ data, multifd_device_state_save_thread_data_free);
+
+ /*
+ * Make sure that this new thread is actually spawned immediately so it
+ * can start its work right now.
+ */
+ thread_pool_adjust_max_threads_to_work(send_threads);
+}
+
+void multifd_abort_device_state_save_threads(void)
+{
+ assert(migration_has_device_state_support());
+
+ qatomic_set(&send_threads_abort, true);
+}
+
+int multifd_join_device_state_save_threads(void)
+{
+ assert(migration_has_device_state_support());
+
+ thread_pool_wait(send_threads);
+
+ return send_threads_ret;
+}
@@ -1499,6 +1499,23 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
int ret;
bool multifd_device_state = migration_has_device_state_support();
+ if (multifd_device_state) {
+ QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+
+ if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
+ se->ops->has_postcopy(se->opaque)) ||
+ !se->ops->save_live_complete_precopy_thread) {
+ continue;
+ }
+
+ hdlr = se->ops->save_live_complete_precopy_thread;
+ multifd_spawn_device_state_save_thread(hdlr,
+ se->idstr, se->instance_id,
+ se->opaque);
+ }
+ }
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops ||
(in_postcopy && se->ops->has_postcopy &&
@@ -1523,7 +1540,7 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
save_section_footer(f, se);
if (ret < 0) {
qemu_file_set_error(f, ret);
- return -1;
+ goto ret_fail_abort_threads;
}
end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME);
trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id,
@@ -1531,6 +1548,12 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
}
if (multifd_device_state) {
+ ret = multifd_join_device_state_save_threads();
+ if (ret) {
+ qemu_file_set_error(f, ret);
+ return -1;
+ }
+
/* Send the final SYNC */
ret = multifd_send_sync_main();
if (ret) {
@@ -1542,6 +1565,14 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
trace_vmstate_downtime_checkpoint("src-iterable-saved");
return 0;
+
+ret_fail_abort_threads:
+ if (multifd_device_state) {
+ multifd_abort_device_state_save_threads();
+ multifd_join_device_state_save_threads();
+ }
+
+ return -1;
}
int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f,