diff mbox series

[v2,15/17] vfio/migration: Multifd device state transfer support - receive side

Message ID 4133ce80174fa3b81070adaeeb068554beba2854.1724701542.git.maciej.szmigiero@oracle.com (mailing list archive)
State New
Headers show
Series Multifd | expand

Commit Message

Maciej S. Szmigiero Aug. 27, 2024, 5:54 p.m. UTC
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

The multifd received data needs to be reassembled since device state
packets sent via different multifd channels can arrive out-of-order.

Therefore, each VFIO device state packet carries a header indicating
its position in the stream.

The last such VFIO device state packet should have
VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config
state.

Since it's important to finish loading device state transferred via
the main migration channel (via save_live_iterate handler) before
starting loading the data asynchronously transferred via multifd
a new VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE flag is introduced to
mark the end of the main migration channel data.

The device state loading process waits until that flag is seen before
commencing loading of the multifd-transferred device state.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 hw/vfio/migration.c           | 338 +++++++++++++++++++++++++++++++++-
 hw/vfio/pci.c                 |   2 +
 hw/vfio/trace-events          |   9 +-
 include/hw/vfio/vfio-common.h |  17 ++
 4 files changed, 362 insertions(+), 4 deletions(-)

Comments

Avihai Horon Sept. 9, 2024, 8:55 a.m. UTC | #1
On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> The multifd received data needs to be reassembled since device state
> packets sent via different multifd channels can arrive out-of-order.
>
> Therefore, each VFIO device state packet carries a header indicating
> its position in the stream.
>
> The last such VFIO device state packet should have
> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config
> state.
>
> Since it's important to finish loading device state transferred via
> the main migration channel (via save_live_iterate handler) before
> starting loading the data asynchronously transferred via multifd
> a new VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE flag is introduced to
> mark the end of the main migration channel data.
>
> The device state loading process waits until that flag is seen before
> commencing loading of the multifd-transferred device state.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   hw/vfio/migration.c           | 338 +++++++++++++++++++++++++++++++++-
>   hw/vfio/pci.c                 |   2 +
>   hw/vfio/trace-events          |   9 +-
>   include/hw/vfio/vfio-common.h |  17 ++
>   4 files changed, 362 insertions(+), 4 deletions(-)
>
> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
> index 24679d8c5034..57c1542528dc 100644
> --- a/hw/vfio/migration.c
> +++ b/hw/vfio/migration.c
> @@ -15,6 +15,7 @@
>   #include <linux/vfio.h>
>   #include <sys/ioctl.h>
>
> +#include "io/channel-buffer.h"
>   #include "sysemu/runstate.h"
>   #include "hw/vfio/vfio-common.h"
>   #include "migration/misc.h"
> @@ -47,6 +48,7 @@
>   #define VFIO_MIG_FLAG_DEV_SETUP_STATE   (0xffffffffef100003ULL)
>   #define VFIO_MIG_FLAG_DEV_DATA_STATE    (0xffffffffef100004ULL)
>   #define VFIO_MIG_FLAG_DEV_INIT_DATA_SENT (0xffffffffef100005ULL)
> +#define VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE    (0xffffffffef100006ULL)
>
>   /*
>    * This is an arbitrary size based on migration of mlx5 devices, where typically
> @@ -55,6 +57,15 @@
>    */
>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>
> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
> +
> +typedef struct VFIODeviceStatePacket {
> +    uint32_t version;
> +    uint32_t idx;
> +    uint32_t flags;
> +    uint8_t data[0];
> +} QEMU_PACKED VFIODeviceStatePacket;
> +
>   static int64_t bytes_transferred;
>
>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
> @@ -254,6 +265,188 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>       return ret;
>   }
>
> +typedef struct LoadedBuffer {
> +    bool is_present;
> +    char *data;
> +    size_t len;
> +} LoadedBuffer;

Maybe rename LoadedBuffer to a more specific name, like VFIOStateBuffer?

I also feel like LoadedBuffer deserves a separate commit.
Plus, I think it will be good to add a full API for this, that wraps the 
g_array_* calls and holds the extra members.
E.g, VFIOStateBuffer, VFIOStateArray (will hold load_buf_idx, 
load_buf_idx_last, etc.), vfio_state_array_destroy(), 
vfio_state_array_alloc(), vfio_state_array_get(), etc...
IMHO, this will make it clearer.

> +
> +static void loaded_buffer_clear(gpointer data)
> +{
> +    LoadedBuffer *lb = data;
> +
> +    if (!lb->is_present) {
> +        return;
> +    }
> +
> +    g_clear_pointer(&lb->data, g_free);
> +    lb->is_present = false;
> +}
> +
> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
> +                                  Error **errp)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);

Move lock to where it's needed? I.e., after 
trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx)

> +    LoadedBuffer *lb;
> +
> +    if (data_size < sizeof(*packet)) {
> +        error_setg(errp, "packet too short at %zu (min is %zu)",
> +                   data_size, sizeof(*packet));
> +        return -1;
> +    }
> +
> +    if (packet->version != 0) {
> +        error_setg(errp, "packet has unknown version %" PRIu32,
> +                   packet->version);
> +        return -1;
> +    }
> +
> +    if (packet->idx == UINT32_MAX) {
> +        error_setg(errp, "packet has too high idx %" PRIu32,
> +                   packet->idx);
> +        return -1;
> +    }
> +
> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
> +
> +    /* config state packet should be the last one in the stream */
> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
> +        migration->load_buf_idx_last = packet->idx;
> +    }
> +
> +    assert(migration->load_bufs);
> +    if (packet->idx >= migration->load_bufs->len) {
> +        g_array_set_size(migration->load_bufs, packet->idx + 1);
> +    }
> +
> +    lb = &g_array_index(migration->load_bufs, typeof(*lb), packet->idx);
> +    if (lb->is_present) {
> +        error_setg(errp, "state buffer %" PRIu32 " already filled", packet->idx);
> +        return -1;
> +    }
> +
> +    assert(packet->idx >= migration->load_buf_idx);
> +
> +    migration->load_buf_queued_pending_buffers++;
> +    if (migration->load_buf_queued_pending_buffers >
> +        vbasedev->migration_max_queued_buffers) {
> +        error_setg(errp,
> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
> +                   packet->idx, vbasedev->migration_max_queued_buffers);
> +        return -1;
> +    }

I feel like max_queued_buffers accounting/checking/configuration should 
be split to a separate patch that will come after this patch.
Also, should we count bytes instead of buffers? Current buffer size is 
1MB but this could change, and the normal user should not care or know 
what is the buffer size.
So maybe rename to migration_max_pending_bytes or such?

> +
> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
> +    lb->len = data_size - sizeof(*packet);
> +    lb->is_present = true;
> +
> +    qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);

There is only one thread waiting, shouldn't signal be enough?

> +
> +    return 0;
> +}
> +
> +static void *vfio_load_bufs_thread(void *opaque)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    Error **errp = &migration->load_bufs_thread_errp;
> +    g_autoptr(QemuLockable) locker = qemu_lockable_auto_lock(
> +        QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));

Any special reason to use QemuLockable?

> +    LoadedBuffer *lb;
> +
> +    while (!migration->load_bufs_device_ready &&
> +           !migration->load_bufs_thread_want_exit) {
> +        qemu_cond_wait(&migration->load_bufs_device_ready_cond, &migration->load_bufs_mutex);
> +    }
> +
> +    while (!migration->load_bufs_thread_want_exit) {
> +        bool starved;
> +        ssize_t ret;
> +
> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
> +
> +        if (migration->load_buf_idx >= migration->load_bufs->len) {
> +            assert(migration->load_buf_idx == migration->load_bufs->len);
> +            starved = true;
> +        } else {
> +            lb = &g_array_index(migration->load_bufs, typeof(*lb), migration->load_buf_idx);
> +            starved = !lb->is_present;
> +        }
> +
> +        if (starved) {
> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name, migration->load_buf_idx);
> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond, &migration->load_bufs_mutex);
> +            continue;
> +        }
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
> +            break;
> +        }
> +
> +        if (migration->load_buf_idx == 0) {
> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
> +        }
> +
> +        if (lb->len) {
> +            g_autofree char *buf = NULL;
> +            size_t buf_len;
> +            int errno_save;
> +
> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
> +                                                           migration->load_buf_idx);
> +
> +            /* lb might become re-allocated when we drop the lock */
> +            buf = g_steal_pointer(&lb->data);
> +            buf_len = lb->len;
> +
> +            /* Loading data to the device takes a while, drop the lock during this process */
> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
> +            ret = write(migration->data_fd, buf, buf_len);
> +            errno_save = errno;
> +            qemu_mutex_lock(&migration->load_bufs_mutex);
> +
> +            if (ret < 0) {
> +                error_setg(errp, "write to state buffer %" PRIu32 " failed with %d",
> +                           migration->load_buf_idx, errno_save);
> +                break;
> +            } else if (ret < buf_len) {
> +                error_setg(errp, "write to state buffer %" PRIu32 " incomplete %zd / %zu",
> +                           migration->load_buf_idx, ret, buf_len);
> +                break;
> +            }
> +
> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
> +                                                         migration->load_buf_idx);
> +        }
> +
> +        assert(migration->load_buf_queued_pending_buffers > 0);
> +        migration->load_buf_queued_pending_buffers--;
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
> +        }
> +
> +        migration->load_buf_idx++;
> +    }
> +
> +    if (migration->load_bufs_thread_want_exit &&
> +        !*errp) {
> +        error_setg(errp, "load bufs thread asked to quit");
> +    }
> +
> +    g_clear_pointer(&locker, qemu_lockable_auto_unlock);
> +
> +    qemu_loadvm_load_finish_ready_lock();
> +    migration->load_bufs_thread_finished = true;
> +    qemu_loadvm_load_finish_ready_broadcast();
> +    qemu_loadvm_load_finish_ready_unlock();
> +
> +    return NULL;
> +}
> +
>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>                                            Error **errp)
>   {
> @@ -285,6 +478,8 @@ static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
>       VFIODevice *vbasedev = opaque;
>       uint64_t data;
>
> +    trace_vfio_load_device_config_state_start(vbasedev->name);

Maybe split this and below trace_vfio_load_device_config_state_end to a 
separate patch?

> +
>       if (vbasedev->ops && vbasedev->ops->vfio_load_config) {
>           int ret;
>
> @@ -303,7 +498,7 @@ static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
>           return -EINVAL;
>       }
>
> -    trace_vfio_load_device_config_state(vbasedev->name);
> +    trace_vfio_load_device_config_state_end(vbasedev->name);
>       return qemu_file_get_error(f);
>   }
>
> @@ -687,16 +882,70 @@ static void vfio_save_state(QEMUFile *f, void *opaque)
>   static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>   {
>       VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    int ret;
> +
> +    ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
> +                                   vbasedev->migration->device_state, errp);
> +    if (ret) {
> +        return ret;
> +    }
> +
> +    assert(!migration->load_bufs);
> +    migration->load_bufs = g_array_new(FALSE, TRUE, sizeof(LoadedBuffer));
> +    g_array_set_clear_func(migration->load_bufs, loaded_buffer_clear);
> +
> +    qemu_mutex_init(&migration->load_bufs_mutex);
> +
> +    migration->load_bufs_device_ready = false;
> +    qemu_cond_init(&migration->load_bufs_device_ready_cond);
> +
> +    migration->load_buf_idx = 0;
> +    migration->load_buf_idx_last = UINT32_MAX;
> +    migration->load_buf_queued_pending_buffers = 0;
> +    qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
> +
> +    migration->config_state_loaded_to_dev = false;
> +
> +    assert(!migration->load_bufs_thread_started);

Maybe do all these allocations (and de-allocations) only if multifd 
device state is supported and enabled?
Extracting this to its own function could also be good.

>
> -    return vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
> -                                    vbasedev->migration->device_state, errp);
> +    migration->load_bufs_thread_finished = false;
> +    migration->load_bufs_thread_want_exit = false;
> +    qemu_thread_create(&migration->load_bufs_thread, "vfio-load-bufs",
> +                       vfio_load_bufs_thread, opaque, QEMU_THREAD_JOINABLE);

The device state save threads are manged by migration core thread pool. 
Don't we want to apply the same thread management scheme for the load 
flow as well?

> +
> +    migration->load_bufs_thread_started = true;
> +
> +    return 0;
>   }
>
>   static int vfio_load_cleanup(void *opaque)
>   {
>       VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    if (migration->load_bufs_thread_started) {
> +        qemu_mutex_lock(&migration->load_bufs_mutex);
> +        migration->load_bufs_thread_want_exit = true;
> +        qemu_mutex_unlock(&migration->load_bufs_mutex);
> +
> +        qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
> +        qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
> +
> +        qemu_thread_join(&migration->load_bufs_thread);
> +
> +        assert(migration->load_bufs_thread_finished);
> +
> +        migration->load_bufs_thread_started = false;
> +    }
>
>       vfio_migration_cleanup(vbasedev);
> +
> +    g_clear_pointer(&migration->load_bufs, g_array_unref);
> +    qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
> +    qemu_cond_destroy(&migration->load_bufs_device_ready_cond);
> +    qemu_mutex_destroy(&migration->load_bufs_mutex);
> +
>       trace_vfio_load_cleanup(vbasedev->name);
>
>       return 0;
> @@ -705,6 +954,7 @@ static int vfio_load_cleanup(void *opaque)
>   static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>   {
>       VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
>       int ret = 0;
>       uint64_t data;
>
> @@ -716,6 +966,7 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>           switch (data) {
>           case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
>           {
> +            migration->config_state_loaded_to_dev = true;
>               return vfio_load_device_config_state(f, opaque);
>           }
>           case VFIO_MIG_FLAG_DEV_SETUP_STATE:
> @@ -742,6 +993,15 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>               }
>               break;
>           }
> +        case VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE:
> +        {
> +            QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> +
> +            migration->load_bufs_device_ready = true;
> +            qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
> +
> +            break;
> +        }
>           case VFIO_MIG_FLAG_DEV_INIT_DATA_SENT:
>           {
>               if (!vfio_precopy_supported(vbasedev) ||
> @@ -774,6 +1034,76 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>       return ret;
>   }
>
> +static int vfio_load_finish(void *opaque, bool *is_finished, Error **errp)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    g_autoptr(QemuLockable) locker = NULL;

Any special reason to use QemuLockable?

Thanks.

> +    LoadedBuffer *lb;
> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
> +    QEMUFile *f_out = NULL, *f_in = NULL;
> +    uint64_t mig_header;
> +    int ret;
> +
> +    if (migration->config_state_loaded_to_dev) {
> +        *is_finished = true;
> +        return 0;
> +    }
> +
> +    if (!migration->load_bufs_thread_finished) {
> +        assert(migration->load_bufs_thread_started);
> +        *is_finished = false;
> +        return 0;
> +    }
> +
> +    if (migration->load_bufs_thread_errp) {
> +        error_propagate(errp, g_steal_pointer(&migration->load_bufs_thread_errp));
> +        return -1;
> +    }
> +
> +    locker = qemu_lockable_auto_lock(QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));
> +
> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
> +    lb = &g_array_index(migration->load_bufs, typeof(*lb), migration->load_buf_idx);
> +    assert(lb->is_present);
> +
> +    bioc = qio_channel_buffer_new(lb->len);
> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
> +
> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
> +
> +    ret = qemu_fflush(f_out);
> +    if (ret) {
> +        error_setg(errp, "load device config state file flush failed with %d", ret);
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        return -1;
> +    }
> +
> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
> +
> +    mig_header = qemu_get_be64(f_in);
> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
> +        error_setg(errp, "load device config state invalid header %"PRIu64, mig_header);
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        g_clear_pointer(&f_in, qemu_fclose);
> +        return -1;
> +    }
> +
> +    ret = vfio_load_device_config_state(f_in, opaque);
> +    g_clear_pointer(&f_out, qemu_fclose);
> +    g_clear_pointer(&f_in, qemu_fclose);
> +    if (ret < 0) {
> +        error_setg(errp, "load device config state failed with %d", ret);
> +        return -1;
> +    }
> +
> +    migration->config_state_loaded_to_dev = true;
> +    *is_finished = true;
> +    return 0;
> +}
> +
>   static bool vfio_switchover_ack_needed(void *opaque)
>   {
>       VFIODevice *vbasedev = opaque;
> @@ -794,6 +1124,8 @@ static const SaveVMHandlers savevm_vfio_handlers = {
>       .load_setup = vfio_load_setup,
>       .load_cleanup = vfio_load_cleanup,
>       .load_state = vfio_load_state,
> +    .load_state_buffer = vfio_load_state_buffer,
> +    .load_finish = vfio_load_finish,
>       .switchover_ack_needed = vfio_switchover_ack_needed,
>   };
>
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 2407720c3530..08cb56d27a05 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3378,6 +3378,8 @@ static Property vfio_pci_dev_properties[] = {
>                       VFIO_FEATURE_ENABLE_IGD_OPREGION_BIT, false),
>       DEFINE_PROP_ON_OFF_AUTO("enable-migration", VFIOPCIDevice,
>                               vbasedev.enable_migration, ON_OFF_AUTO_AUTO),
> +    DEFINE_PROP_UINT64("x-migration-max-queued-buffers", VFIOPCIDevice,
> +                       vbasedev.migration_max_queued_buffers, UINT64_MAX),
>       DEFINE_PROP_BOOL("migration-events", VFIOPCIDevice,
>                        vbasedev.migration_events, false),
>       DEFINE_PROP_BOOL("x-no-mmap", VFIOPCIDevice, vbasedev.no_mmap, false),
> diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
> index 013c602f30fa..9d2519a28a7e 100644
> --- a/hw/vfio/trace-events
> +++ b/hw/vfio/trace-events
> @@ -149,9 +149,16 @@ vfio_display_edid_write_error(void) ""
>
>   # migration.c
>   vfio_load_cleanup(const char *name) " (%s)"
> -vfio_load_device_config_state(const char *name) " (%s)"
> +vfio_load_device_config_state_start(const char *name) " (%s)"
> +vfio_load_device_config_state_end(const char *name) " (%s)"
>   vfio_load_state(const char *name, uint64_t data) " (%s) data 0x%"PRIx64
>   vfio_load_state_device_data(const char *name, uint64_t data_size, int ret) " (%s) size 0x%"PRIx64" ret %d"
> +vfio_load_state_device_buffer_incoming(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_start(const char *name) " (%s)"
> +vfio_load_state_device_buffer_starved(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_start(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_end(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_end(const char *name) " (%s)"
>   vfio_migration_realize(const char *name) " (%s)"
>   vfio_migration_set_device_state(const char *name, const char *state) " (%s) state %s"
>   vfio_migration_set_state(const char *name, const char *new_state, const char *recover_state) " (%s) new state %s, recover state %s"
> diff --git a/include/hw/vfio/vfio-common.h b/include/hw/vfio/vfio-common.h
> index 32d58e3e025b..ba5b9464e79a 100644
> --- a/include/hw/vfio/vfio-common.h
> +++ b/include/hw/vfio/vfio-common.h
> @@ -76,6 +76,22 @@ typedef struct VFIOMigration {
>
>       bool save_iterate_run;
>       bool save_iterate_empty_hit;
> +
> +    QemuThread load_bufs_thread;
> +    Error *load_bufs_thread_errp;
> +    bool load_bufs_thread_started;
> +    bool load_bufs_thread_finished;
> +    bool load_bufs_thread_want_exit;
> +
> +    GArray *load_bufs;
> +    bool load_bufs_device_ready;
> +    QemuCond load_bufs_device_ready_cond;
> +    QemuCond load_bufs_buffer_ready_cond;
> +    QemuMutex load_bufs_mutex;
> +    uint32_t load_buf_idx;
> +    uint32_t load_buf_idx_last;
> +    uint32_t load_buf_queued_pending_buffers;
> +    bool config_state_loaded_to_dev;
>   } VFIOMigration;
>
>   struct VFIOGroup;
> @@ -134,6 +150,7 @@ typedef struct VFIODevice {
>       bool ram_block_discard_allowed;
>       OnOffAuto enable_migration;
>       bool migration_events;
> +    uint64_t migration_max_queued_buffers;
>       VFIODeviceOps *ops;
>       unsigned int num_irqs;
>       unsigned int num_regions;
Maciej S. Szmigiero Sept. 9, 2024, 6:06 p.m. UTC | #2
On 9.09.2024 10:55, Avihai Horon wrote:
> 
> On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> The multifd received data needs to be reassembled since device state
>> packets sent via different multifd channels can arrive out-of-order.
>>
>> Therefore, each VFIO device state packet carries a header indicating
>> its position in the stream.
>>
>> The last such VFIO device state packet should have
>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config
>> state.
>>
>> Since it's important to finish loading device state transferred via
>> the main migration channel (via save_live_iterate handler) before
>> starting loading the data asynchronously transferred via multifd
>> a new VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE flag is introduced to
>> mark the end of the main migration channel data.
>>
>> The device state loading process waits until that flag is seen before
>> commencing loading of the multifd-transferred device state.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   hw/vfio/migration.c           | 338 +++++++++++++++++++++++++++++++++-
>>   hw/vfio/pci.c                 |   2 +
>>   hw/vfio/trace-events          |   9 +-
>>   include/hw/vfio/vfio-common.h |  17 ++
>>   4 files changed, 362 insertions(+), 4 deletions(-)
>>
>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>> index 24679d8c5034..57c1542528dc 100644
>> --- a/hw/vfio/migration.c
>> +++ b/hw/vfio/migration.c
>> @@ -15,6 +15,7 @@
>>   #include <linux/vfio.h>
>>   #include <sys/ioctl.h>
>>
>> +#include "io/channel-buffer.h"
>>   #include "sysemu/runstate.h"
>>   #include "hw/vfio/vfio-common.h"
>>   #include "migration/misc.h"
>> @@ -47,6 +48,7 @@
>>   #define VFIO_MIG_FLAG_DEV_SETUP_STATE   (0xffffffffef100003ULL)
>>   #define VFIO_MIG_FLAG_DEV_DATA_STATE    (0xffffffffef100004ULL)
>>   #define VFIO_MIG_FLAG_DEV_INIT_DATA_SENT (0xffffffffef100005ULL)
>> +#define VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE    (0xffffffffef100006ULL)
>>
>>   /*
>>    * This is an arbitrary size based on migration of mlx5 devices, where typically
>> @@ -55,6 +57,15 @@
>>    */
>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>>
>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>> +
>> +typedef struct VFIODeviceStatePacket {
>> +    uint32_t version;
>> +    uint32_t idx;
>> +    uint32_t flags;
>> +    uint8_t data[0];
>> +} QEMU_PACKED VFIODeviceStatePacket;
>> +
>>   static int64_t bytes_transferred;
>>
>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>> @@ -254,6 +265,188 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>>       return ret;
>>   }
>>
>> +typedef struct LoadedBuffer {
>> +    bool is_present;
>> +    char *data;
>> +    size_t len;
>> +} LoadedBuffer;
> 
> Maybe rename LoadedBuffer to a more specific name, like VFIOStateBuffer?

Will do.

> I also feel like LoadedBuffer deserves a separate commit.
> Plus, I think it will be good to add a full API for this, that wraps the g_array_* calls and holds the extra members.
> E.g, VFIOStateBuffer, VFIOStateArray (will hold load_buf_idx, load_buf_idx_last, etc.), vfio_state_array_destroy(), vfio_state_array_alloc(), vfio_state_array_get(), etc...
> IMHO, this will make it clearer.

Will think about wrapping GArray accesses in separate methods,
however wrapping a single line GArray call in a separate function
normally would seem a bit excessive.

>> +
>> +static void loaded_buffer_clear(gpointer data)
>> +{
>> +    LoadedBuffer *lb = data;
>> +
>> +    if (!lb->is_present) {
>> +        return;
>> +    }
>> +
>> +    g_clear_pointer(&lb->data, g_free);
>> +    lb->is_present = false;
>> +}
>> +
>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
>> +                                  Error **errp)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> 
> Move lock to where it's needed? I.e., after trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx)

It's a declaration of a new variable so I guess it should always be
at the top of the code block in the kernel / QEMU code style?

Also, these checks below are very unlikely to fail and even if they do,
I doubt a failed migration due to bit stream corruption is a scenario
worth optimizing run time performance for.

>> +    LoadedBuffer *lb;
>> +
>> +    if (data_size < sizeof(*packet)) {
>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>> +                   data_size, sizeof(*packet));
>> +        return -1;
>> +    }
>> +
>> +    if (packet->version != 0) {
>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>> +                   packet->version);
>> +        return -1;
>> +    }
>> +
>> +    if (packet->idx == UINT32_MAX) {
>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>> +                   packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
>> +
>> +    /* config state packet should be the last one in the stream */
>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>> +        migration->load_buf_idx_last = packet->idx;
>> +    }
>> +
>> +    assert(migration->load_bufs);
>> +    if (packet->idx >= migration->load_bufs->len) {
>> +        g_array_set_size(migration->load_bufs, packet->idx + 1);
>> +    }
>> +
>> +    lb = &g_array_index(migration->load_bufs, typeof(*lb), packet->idx);
>> +    if (lb->is_present) {
>> +        error_setg(errp, "state buffer %" PRIu32 " already filled", packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    assert(packet->idx >= migration->load_buf_idx);
>> +
>> +    migration->load_buf_queued_pending_buffers++;
>> +    if (migration->load_buf_queued_pending_buffers >
>> +        vbasedev->migration_max_queued_buffers) {
>> +        error_setg(errp,
>> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
>> +                   packet->idx, vbasedev->migration_max_queued_buffers);
>> +        return -1;
>> +    }
> 
> I feel like max_queued_buffers accounting/checking/configuration should be split to a separate patch that will come after this patch.
> Also, should we count bytes instead of buffers? Current buffer size is 1MB but this could change, and the normal user should not care or know what is the buffer size.
> So maybe rename to migration_max_pending_bytes or such?

Since it's Peter that asked for this limit to be introduced in the first place
I would like to ask him what his preference here.

@Peter: max queued buffers or bytes?

>> +
>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>> +    lb->len = data_size - sizeof(*packet);
>> +    lb->is_present = true;
>> +
>> +    qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
> 
> There is only one thread waiting, shouldn't signal be enough?

Will change this to _signal() since it clearly doesn't
make sense to have a future-proof API here - it's an
implementation detail.

>> +
>> +    return 0;
>> +}
>> +
>> +static void *vfio_load_bufs_thread(void *opaque)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    Error **errp = &migration->load_bufs_thread_errp;
>> +    g_autoptr(QemuLockable) locker = qemu_lockable_auto_lock(
>> +        QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));
> 
> Any special reason to use QemuLockable?

I prefer automatic lock management (RAII-like) for the same reason
I prefer automatic memory management: it makes it much harder to
forget to unlock the lock (or free memory) in some error path.

That's the reason these primitives were introduced in QEMU in the
first place (apparently modeled after its Glib equivalents) and
why these are being (slowly) introduced to Linux kernel too.

>> +    LoadedBuffer *lb;
>> +
>> +    while (!migration->load_bufs_device_ready &&
>> +           !migration->load_bufs_thread_want_exit) {
>> +        qemu_cond_wait(&migration->load_bufs_device_ready_cond, &migration->load_bufs_mutex);
>> +    }
>> +
>> +    while (!migration->load_bufs_thread_want_exit) {
>> +        bool starved;
>> +        ssize_t ret;
>> +
>> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
>> +
>> +        if (migration->load_buf_idx >= migration->load_bufs->len) {
>> +            assert(migration->load_buf_idx == migration->load_bufs->len);
>> +            starved = true;
>> +        } else {
>> +            lb = &g_array_index(migration->load_bufs, typeof(*lb), migration->load_buf_idx);
>> +            starved = !lb->is_present;
>> +        }
>> +
>> +        if (starved) {
>> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name, migration->load_buf_idx);
>> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond, &migration->load_bufs_mutex);
>> +            continue;
>> +        }
>> +
>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>> +            break;
>> +        }
>> +
>> +        if (migration->load_buf_idx == 0) {
>> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
>> +        }
>> +
>> +        if (lb->len) {
>> +            g_autofree char *buf = NULL;
>> +            size_t buf_len;
>> +            int errno_save;
>> +
>> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>> +                                                           migration->load_buf_idx);
>> +
>> +            /* lb might become re-allocated when we drop the lock */
>> +            buf = g_steal_pointer(&lb->data);
>> +            buf_len = lb->len;
>> +
>> +            /* Loading data to the device takes a while, drop the lock during this process */
>> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
>> +            ret = write(migration->data_fd, buf, buf_len);
>> +            errno_save = errno;
>> +            qemu_mutex_lock(&migration->load_bufs_mutex);
>> +
>> +            if (ret < 0) {
>> +                error_setg(errp, "write to state buffer %" PRIu32 " failed with %d",
>> +                           migration->load_buf_idx, errno_save);
>> +                break;
>> +            } else if (ret < buf_len) {
>> +                error_setg(errp, "write to state buffer %" PRIu32 " incomplete %zd / %zu",
>> +                           migration->load_buf_idx, ret, buf_len);
>> +                break;
>> +            }
>> +
>> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
>> +                                                         migration->load_buf_idx);
>> +        }
>> +
>> +        assert(migration->load_buf_queued_pending_buffers > 0);
>> +        migration->load_buf_queued_pending_buffers--;
>> +
>> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
>> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
>> +        }
>> +
>> +        migration->load_buf_idx++;
>> +    }
>> +
>> +    if (migration->load_bufs_thread_want_exit &&
>> +        !*errp) {
>> +        error_setg(errp, "load bufs thread asked to quit");
>> +    }
>> +
>> +    g_clear_pointer(&locker, qemu_lockable_auto_unlock);
>> +
>> +    qemu_loadvm_load_finish_ready_lock();
>> +    migration->load_bufs_thread_finished = true;
>> +    qemu_loadvm_load_finish_ready_broadcast();
>> +    qemu_loadvm_load_finish_ready_unlock();
>> +
>> +    return NULL;
>> +}
>> +
>>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>>                                            Error **errp)
>>   {
>> @@ -285,6 +478,8 @@ static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
>>       VFIODevice *vbasedev = opaque;
>>       uint64_t data;
>>
>> +    trace_vfio_load_device_config_state_start(vbasedev->name);
> 
> Maybe split this and below trace_vfio_load_device_config_state_end to a separate patch?

I guess you mean to add these trace points in a separate patch?
Can do.

>> +
>>       if (vbasedev->ops && vbasedev->ops->vfio_load_config) {
>>           int ret;
>>
>> @@ -303,7 +498,7 @@ static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
>>           return -EINVAL;
>>       }
>>
>> -    trace_vfio_load_device_config_state(vbasedev->name);
>> +    trace_vfio_load_device_config_state_end(vbasedev->name);
>>       return qemu_file_get_error(f);
>>   }
>>
>> @@ -687,16 +882,70 @@ static void vfio_save_state(QEMUFile *f, void *opaque)
>>   static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>>   {
>>       VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    int ret;
>> +
>> +    ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>> +                                   vbasedev->migration->device_state, errp);
>> +    if (ret) {
>> +        return ret;
>> +    }
>> +
>> +    assert(!migration->load_bufs);
>> +    migration->load_bufs = g_array_new(FALSE, TRUE, sizeof(LoadedBuffer));
>> +    g_array_set_clear_func(migration->load_bufs, loaded_buffer_clear);
>> +
>> +    qemu_mutex_init(&migration->load_bufs_mutex);
>> +
>> +    migration->load_bufs_device_ready = false;
>> +    qemu_cond_init(&migration->load_bufs_device_ready_cond);
>> +
>> +    migration->load_buf_idx = 0;
>> +    migration->load_buf_idx_last = UINT32_MAX;
>> +    migration->load_buf_queued_pending_buffers = 0;
>> +    qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
>> +
>> +    migration->config_state_loaded_to_dev = false;
>> +
>> +    assert(!migration->load_bufs_thread_started);
> 
> Maybe do all these allocations (and de-allocations) only if multifd device state is supported and enabled?
> Extracting this to its own function could also be good.

Sure, will try to avoid unnecessarily allocating multifd device state
related things if this functionality is unavailable anyway.
  
>>
>> -    return vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>> -                                    vbasedev->migration->device_state, errp);
>> +    migration->load_bufs_thread_finished = false;
>> +    migration->load_bufs_thread_want_exit = false;
>> +    qemu_thread_create(&migration->load_bufs_thread, "vfio-load-bufs",
>> +                       vfio_load_bufs_thread, opaque, QEMU_THREAD_JOINABLE);
> 
> The device state save threads are manged by migration core thread pool. Don't we want to apply the same thread management scheme for the load flow as well?

I think that (in contrast with the device state saving threads)
the buffer loading / reordering thread is an implementation detail
of the VFIO driver, so I don't think it really makes sense for multifd code
to manage it.

>> +
>> +    migration->load_bufs_thread_started = true;
>> +
>> +    return 0;
>>   }
>>
>>   static int vfio_load_cleanup(void *opaque)
>>   {
>>       VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +
>> +    if (migration->load_bufs_thread_started) {
>> +        qemu_mutex_lock(&migration->load_bufs_mutex);
>> +        migration->load_bufs_thread_want_exit = true;
>> +        qemu_mutex_unlock(&migration->load_bufs_mutex);
>> +
>> +        qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
>> +        qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
>> +
>> +        qemu_thread_join(&migration->load_bufs_thread);
>> +
>> +        assert(migration->load_bufs_thread_finished);
>> +
>> +        migration->load_bufs_thread_started = false;
>> +    }
>>
>>       vfio_migration_cleanup(vbasedev);
>> +
>> +    g_clear_pointer(&migration->load_bufs, g_array_unref);
>> +    qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
>> +    qemu_cond_destroy(&migration->load_bufs_device_ready_cond);
>> +    qemu_mutex_destroy(&migration->load_bufs_mutex);
>> +
>>       trace_vfio_load_cleanup(vbasedev->name);
>>
>>       return 0;
>> @@ -705,6 +954,7 @@ static int vfio_load_cleanup(void *opaque)
>>   static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>>   {
>>       VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>>       int ret = 0;
>>       uint64_t data;
>>
>> @@ -716,6 +966,7 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>>           switch (data) {
>>           case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
>>           {
>> +            migration->config_state_loaded_to_dev = true;
>>               return vfio_load_device_config_state(f, opaque);
>>           }
>>           case VFIO_MIG_FLAG_DEV_SETUP_STATE:
>> @@ -742,6 +993,15 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>>               }
>>               break;
>>           }
>> +        case VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE:
>> +        {
>> +            QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>> +
>> +            migration->load_bufs_device_ready = true;
>> +            qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
>> +
>> +            break;
>> +        }
>>           case VFIO_MIG_FLAG_DEV_INIT_DATA_SENT:
>>           {
>>               if (!vfio_precopy_supported(vbasedev) ||
>> @@ -774,6 +1034,76 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>>       return ret;
>>   }
>>
>> +static int vfio_load_finish(void *opaque, bool *is_finished, Error **errp)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    g_autoptr(QemuLockable) locker = NULL;
> 
> Any special reason to use QemuLockable?

The same reason as for the automatic locking above.
  
> Thanks.
> 

Thanks,
Maciej
Avihai Horon Sept. 12, 2024, 8:20 a.m. UTC | #3
On 09/09/2024 21:06, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> On 9.09.2024 10:55, Avihai Horon wrote:
>>
>> On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
>>> External email: Use caution opening links or attachments
>>>
>>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> The multifd received data needs to be reassembled since device state
>>> packets sent via different multifd channels can arrive out-of-order.
>>>
>>> Therefore, each VFIO device state packet carries a header indicating
>>> its position in the stream.
>>>
>>> The last such VFIO device state packet should have
>>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config
>>> state.
>>>
>>> Since it's important to finish loading device state transferred via
>>> the main migration channel (via save_live_iterate handler) before
>>> starting loading the data asynchronously transferred via multifd
>>> a new VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE flag is introduced to
>>> mark the end of the main migration channel data.
>>>
>>> The device state loading process waits until that flag is seen before
>>> commencing loading of the multifd-transferred device state.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   hw/vfio/migration.c           | 338 
>>> +++++++++++++++++++++++++++++++++-
>>>   hw/vfio/pci.c                 |   2 +
>>>   hw/vfio/trace-events          |   9 +-
>>>   include/hw/vfio/vfio-common.h |  17 ++
>>>   4 files changed, 362 insertions(+), 4 deletions(-)
>>>
>>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>>> index 24679d8c5034..57c1542528dc 100644
>>> --- a/hw/vfio/migration.c
>>> +++ b/hw/vfio/migration.c
>>> @@ -15,6 +15,7 @@
>>>   #include <linux/vfio.h>
>>>   #include <sys/ioctl.h>
>>>
>>> +#include "io/channel-buffer.h"
>>>   #include "sysemu/runstate.h"
>>>   #include "hw/vfio/vfio-common.h"
>>>   #include "migration/misc.h"
>>> @@ -47,6 +48,7 @@
>>>   #define VFIO_MIG_FLAG_DEV_SETUP_STATE (0xffffffffef100003ULL)
>>>   #define VFIO_MIG_FLAG_DEV_DATA_STATE (0xffffffffef100004ULL)
>>>   #define VFIO_MIG_FLAG_DEV_INIT_DATA_SENT (0xffffffffef100005ULL)
>>> +#define VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE (0xffffffffef100006ULL)
>>>
>>>   /*
>>>    * This is an arbitrary size based on migration of mlx5 devices, 
>>> where typically
>>> @@ -55,6 +57,15 @@
>>>    */
>>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>>>
>>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>>> +
>>> +typedef struct VFIODeviceStatePacket {
>>> +    uint32_t version;
>>> +    uint32_t idx;
>>> +    uint32_t flags;
>>> +    uint8_t data[0];
>>> +} QEMU_PACKED VFIODeviceStatePacket;
>>> +
>>>   static int64_t bytes_transferred;
>>>
>>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>>> @@ -254,6 +265,188 @@ static int vfio_load_buffer(QEMUFile *f, 
>>> VFIODevice *vbasedev,
>>>       return ret;
>>>   }
>>>
>>> +typedef struct LoadedBuffer {
>>> +    bool is_present;
>>> +    char *data;
>>> +    size_t len;
>>> +} LoadedBuffer;
>>
>> Maybe rename LoadedBuffer to a more specific name, like VFIOStateBuffer?
>
> Will do.
>
>> I also feel like LoadedBuffer deserves a separate commit.
>> Plus, I think it will be good to add a full API for this, that wraps 
>> the g_array_* calls and holds the extra members.
>> E.g, VFIOStateBuffer, VFIOStateArray (will hold load_buf_idx, 
>> load_buf_idx_last, etc.), vfio_state_array_destroy(), 
>> vfio_state_array_alloc(), vfio_state_array_get(), etc...
>> IMHO, this will make it clearer.
>
> Will think about wrapping GArray accesses in separate methods,
> however wrapping a single line GArray call in a separate function
> normally would seem a bit excessive.

Sure, let's do it only if it makes the code cleaner.

>
>>> +
>>> +static void loaded_buffer_clear(gpointer data)
>>> +{
>>> +    LoadedBuffer *lb = data;
>>> +
>>> +    if (!lb->is_present) {
>>> +        return;
>>> +    }
>>> +
>>> +    g_clear_pointer(&lb->data, g_free);
>>> +    lb->is_present = false;
>>> +}
>>> +
>>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t 
>>> data_size,
>>> +                                  Error **errp)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>>
>> Move lock to where it's needed? I.e., after 
>> trace_vfio_load_state_device_buffer_incoming(vbasedev->name, 
>> packet->idx)
>
> It's a declaration of a new variable so I guess it should always be
> at the top of the code block in the kernel / QEMU code style?

Yes, but it's opaque to the user.
Looking at other QEMU_LOCK_GUARD call sites in the code and it seems 
like people are using it in the middle of code blocks as well.

>
> Also, these checks below are very unlikely to fail and even if they do,
> I doubt a failed migration due to bit stream corruption is a scenario
> worth optimizing run time performance for.

IMO, in this case it's more for readability, but we can go either way 
and let the maintainer decide.

>
>>> +    LoadedBuffer *lb;
>>> +
>>> +    if (data_size < sizeof(*packet)) {
>>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>>> +                   data_size, sizeof(*packet));
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->version != 0) {
>>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>>> +                   packet->version);
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->idx == UINT32_MAX) {
>>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>>> +                   packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> + trace_vfio_load_state_device_buffer_incoming(vbasedev->name, 
>>> packet->idx);
>>> +
>>> +    /* config state packet should be the last one in the stream */
>>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>>> +        migration->load_buf_idx_last = packet->idx;
>>> +    }
>>> +
>>> +    assert(migration->load_bufs);
>>> +    if (packet->idx >= migration->load_bufs->len) {
>>> +        g_array_set_size(migration->load_bufs, packet->idx + 1);
>>> +    }
>>> +
>>> +    lb = &g_array_index(migration->load_bufs, typeof(*lb), 
>>> packet->idx);
>>> +    if (lb->is_present) {
>>> +        error_setg(errp, "state buffer %" PRIu32 " already filled", 
>>> packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> +    assert(packet->idx >= migration->load_buf_idx);
>>> +
>>> +    migration->load_buf_queued_pending_buffers++;
>>> +    if (migration->load_buf_queued_pending_buffers >
>>> +        vbasedev->migration_max_queued_buffers) {
>>> +        error_setg(errp,
>>> +                   "queuing state buffer %" PRIu32 " would exceed 
>>> the max of %" PRIu64,
>>> +                   packet->idx, 
>>> vbasedev->migration_max_queued_buffers);
>>> +        return -1;
>>> +    }
>>
>> I feel like max_queued_buffers accounting/checking/configuration 
>> should be split to a separate patch that will come after this patch.
>> Also, should we count bytes instead of buffers? Current buffer size 
>> is 1MB but this could change, and the normal user should not care or 
>> know what is the buffer size.
>> So maybe rename to migration_max_pending_bytes or such?
>
> Since it's Peter that asked for this limit to be introduced in the 
> first place
> I would like to ask him what his preference here.
>
> @Peter: max queued buffers or bytes?
>
>>> +
>>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>>> +    lb->len = data_size - sizeof(*packet);
>>> +    lb->is_present = true;
>>> +
>>> + qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
>>
>> There is only one thread waiting, shouldn't signal be enough?
>
> Will change this to _signal() since it clearly doesn't
> make sense to have a future-proof API here - it's an
> implementation detail.
>
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static void *vfio_load_bufs_thread(void *opaque)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    Error **errp = &migration->load_bufs_thread_errp;
>>> +    g_autoptr(QemuLockable) locker = qemu_lockable_auto_lock(
>>> + QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));
>>
>> Any special reason to use QemuLockable?
>
> I prefer automatic lock management (RAII-like) for the same reason
> I prefer automatic memory management: it makes it much harder to
> forget to unlock the lock (or free memory) in some error path.
>
> That's the reason these primitives were introduced in QEMU in the
> first place (apparently modeled after its Glib equivalents) and
> why these are being (slowly) introduced to Linux kernel too.

Agree, I guess what I really meant is why not use QEMU_LOCK_GUARD()?

>
>>> +    LoadedBuffer *lb;
>>> +
>>> +    while (!migration->load_bufs_device_ready &&
>>> +           !migration->load_bufs_thread_want_exit) {
>>> + qemu_cond_wait(&migration->load_bufs_device_ready_cond, 
>>> &migration->load_bufs_mutex);
>>> +    }
>>> +
>>> +    while (!migration->load_bufs_thread_want_exit) {
>>> +        bool starved;
>>> +        ssize_t ret;
>>> +
>>> +        assert(migration->load_buf_idx <= 
>>> migration->load_buf_idx_last);
>>> +
>>> +        if (migration->load_buf_idx >= migration->load_bufs->len) {
>>> +            assert(migration->load_buf_idx == 
>>> migration->load_bufs->len);
>>> +            starved = true;
>>> +        } else {
>>> +            lb = &g_array_index(migration->load_bufs, typeof(*lb), 
>>> migration->load_buf_idx);
>>> +            starved = !lb->is_present;
>>> +        }
>>> +
>>> +        if (starved) {
>>> + trace_vfio_load_state_device_buffer_starved(vbasedev->name, 
>>> migration->load_buf_idx);
>>> + qemu_cond_wait(&migration->load_bufs_buffer_ready_cond, 
>>> &migration->load_bufs_mutex);
>>> +            continue;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>>> +            break;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == 0) {
>>> + trace_vfio_load_state_device_buffer_start(vbasedev->name);
>>> +        }
>>> +
>>> +        if (lb->len) {
>>> +            g_autofree char *buf = NULL;
>>> +            size_t buf_len;
>>> +            int errno_save;
>>> +
>>> + trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>>> + migration->load_buf_idx);
>>> +
>>> +            /* lb might become re-allocated when we drop the lock */
>>> +            buf = g_steal_pointer(&lb->data);
>>> +            buf_len = lb->len;
>>> +
>>> +            /* Loading data to the device takes a while, drop the 
>>> lock during this process */
>>> + qemu_mutex_unlock(&migration->load_bufs_mutex);
>>> +            ret = write(migration->data_fd, buf, buf_len);
>>> +            errno_save = errno;
>>> + qemu_mutex_lock(&migration->load_bufs_mutex);
>>> +
>>> +            if (ret < 0) {
>>> +                error_setg(errp, "write to state buffer %" PRIu32 " 
>>> failed with %d",
>>> +                           migration->load_buf_idx, errno_save);
>>> +                break;
>>> +            } else if (ret < buf_len) {
>>> +                error_setg(errp, "write to state buffer %" PRIu32 " 
>>> incomplete %zd / %zu",
>>> +                           migration->load_buf_idx, ret, buf_len);
>>> +                break;
>>> +            }
>>> +
>>> + trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
>>> + migration->load_buf_idx);
>>> +        }
>>> +
>>> +        assert(migration->load_buf_queued_pending_buffers > 0);
>>> +        migration->load_buf_queued_pending_buffers--;
>>> +
>>> +        if (migration->load_buf_idx == migration->load_buf_idx_last 
>>> - 1) {
>>> + trace_vfio_load_state_device_buffer_end(vbasedev->name);
>>> +        }
>>> +
>>> +        migration->load_buf_idx++;
>>> +    }
>>> +
>>> +    if (migration->load_bufs_thread_want_exit &&
>>> +        !*errp) {
>>> +        error_setg(errp, "load bufs thread asked to quit");
>>> +    }
>>> +
>>> +    g_clear_pointer(&locker, qemu_lockable_auto_unlock);
>>> +
>>> +    qemu_loadvm_load_finish_ready_lock();
>>> +    migration->load_bufs_thread_finished = true;
>>> +    qemu_loadvm_load_finish_ready_broadcast();
>>> +    qemu_loadvm_load_finish_ready_unlock();
>>> +
>>> +    return NULL;
>>> +}
>>> +
>>>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>>>                                            Error **errp)
>>>   {
>>> @@ -285,6 +478,8 @@ static int 
>>> vfio_load_device_config_state(QEMUFile *f, void *opaque)
>>>       VFIODevice *vbasedev = opaque;
>>>       uint64_t data;
>>>
>>> + trace_vfio_load_device_config_state_start(vbasedev->name);
>>
>> Maybe split this and below trace_vfio_load_device_config_state_end to 
>> a separate patch?
>
> I guess you mean to add these trace points in a separate patch?
> Can do.
>
>>> +
>>>       if (vbasedev->ops && vbasedev->ops->vfio_load_config) {
>>>           int ret;
>>>
>>> @@ -303,7 +498,7 @@ static int 
>>> vfio_load_device_config_state(QEMUFile *f, void *opaque)
>>>           return -EINVAL;
>>>       }
>>>
>>> -    trace_vfio_load_device_config_state(vbasedev->name);
>>> + trace_vfio_load_device_config_state_end(vbasedev->name);
>>>       return qemu_file_get_error(f);
>>>   }
>>>
>>> @@ -687,16 +882,70 @@ static void vfio_save_state(QEMUFile *f, void 
>>> *opaque)
>>>   static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>>>   {
>>>       VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    int ret;
>>> +
>>> +    ret = vfio_migration_set_state(vbasedev, 
>>> VFIO_DEVICE_STATE_RESUMING,
>>> + vbasedev->migration->device_state, errp);
>>> +    if (ret) {
>>> +        return ret;
>>> +    }
>>> +
>>> +    assert(!migration->load_bufs);
>>> +    migration->load_bufs = g_array_new(FALSE, TRUE, 
>>> sizeof(LoadedBuffer));
>>> +    g_array_set_clear_func(migration->load_bufs, loaded_buffer_clear);
>>> +
>>> +    qemu_mutex_init(&migration->load_bufs_mutex);
>>> +
>>> +    migration->load_bufs_device_ready = false;
>>> + qemu_cond_init(&migration->load_bufs_device_ready_cond);
>>> +
>>> +    migration->load_buf_idx = 0;
>>> +    migration->load_buf_idx_last = UINT32_MAX;
>>> +    migration->load_buf_queued_pending_buffers = 0;
>>> + qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
>>> +
>>> +    migration->config_state_loaded_to_dev = false;
>>> +
>>> +    assert(!migration->load_bufs_thread_started);
>>
>> Maybe do all these allocations (and de-allocations) only if multifd 
>> device state is supported and enabled?
>> Extracting this to its own function could also be good.
>
> Sure, will try to avoid unnecessarily allocating multifd device state
> related things if this functionality is unavailable anyway.
>
>>>
>>> -    return vfio_migration_set_state(vbasedev, 
>>> VFIO_DEVICE_STATE_RESUMING,
>>> - vbasedev->migration->device_state, errp);
>>> +    migration->load_bufs_thread_finished = false;
>>> +    migration->load_bufs_thread_want_exit = false;
>>> +    qemu_thread_create(&migration->load_bufs_thread, "vfio-load-bufs",
>>> +                       vfio_load_bufs_thread, opaque, 
>>> QEMU_THREAD_JOINABLE);
>>
>> The device state save threads are manged by migration core thread 
>> pool. Don't we want to apply the same thread management scheme for 
>> the load flow as well?
>
> I think that (in contrast with the device state saving threads)
> the buffer loading / reordering thread is an implementation detail
> of the VFIO driver, so I don't think it really makes sense for multifd 
> code
> to manage it.

Hmm, yes I understand.

Thanks.
Cédric Le Goater Sept. 12, 2024, 8:45 a.m. UTC | #4
>>>>
>>>> -    return vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>>>> - vbasedev->migration->device_state, errp);
>>>> +    migration->load_bufs_thread_finished = false;
>>>> +    migration->load_bufs_thread_want_exit = false;
>>>> +    qemu_thread_create(&migration->load_bufs_thread, "vfio-load-bufs",
>>>> +                       vfio_load_bufs_thread, opaque, QEMU_THREAD_JOINABLE);
>>>
>>> The device state save threads are manged by migration core thread pool. Don't we want to apply the same thread management scheme for the load flow as well?
>>
>> I think that (in contrast with the device state saving threads)
>> the buffer loading / reordering thread is an implementation detail
>> of the VFIO driver, so I don't think it really makes sense for multifd code
>> to manage it.

Is it an optimisation then ? In that case, could the implementation not
use threads ?

VFIO is complex, migration is complex, VFIO migration is even more. TBH,
the idea of doing thread management in the VFIO subsystem makes me feel
uncomfortable.

Thanks,

C.
diff mbox series

Patch

diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
index 24679d8c5034..57c1542528dc 100644
--- a/hw/vfio/migration.c
+++ b/hw/vfio/migration.c
@@ -15,6 +15,7 @@ 
 #include <linux/vfio.h>
 #include <sys/ioctl.h>
 
+#include "io/channel-buffer.h"
 #include "sysemu/runstate.h"
 #include "hw/vfio/vfio-common.h"
 #include "migration/misc.h"
@@ -47,6 +48,7 @@ 
 #define VFIO_MIG_FLAG_DEV_SETUP_STATE   (0xffffffffef100003ULL)
 #define VFIO_MIG_FLAG_DEV_DATA_STATE    (0xffffffffef100004ULL)
 #define VFIO_MIG_FLAG_DEV_INIT_DATA_SENT (0xffffffffef100005ULL)
+#define VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE    (0xffffffffef100006ULL)
 
 /*
  * This is an arbitrary size based on migration of mlx5 devices, where typically
@@ -55,6 +57,15 @@ 
  */
 #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
 
+#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
+
+typedef struct VFIODeviceStatePacket {
+    uint32_t version;
+    uint32_t idx;
+    uint32_t flags;
+    uint8_t data[0];
+} QEMU_PACKED VFIODeviceStatePacket;
+
 static int64_t bytes_transferred;
 
 static const char *mig_state_to_str(enum vfio_device_mig_state state)
@@ -254,6 +265,188 @@  static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
     return ret;
 }
 
+typedef struct LoadedBuffer {
+    bool is_present;
+    char *data;
+    size_t len;
+} LoadedBuffer;
+
+static void loaded_buffer_clear(gpointer data)
+{
+    LoadedBuffer *lb = data;
+
+    if (!lb->is_present) {
+        return;
+    }
+
+    g_clear_pointer(&lb->data, g_free);
+    lb->is_present = false;
+}
+
+static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
+                                  Error **errp)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
+    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
+    LoadedBuffer *lb;
+
+    if (data_size < sizeof(*packet)) {
+        error_setg(errp, "packet too short at %zu (min is %zu)",
+                   data_size, sizeof(*packet));
+        return -1;
+    }
+
+    if (packet->version != 0) {
+        error_setg(errp, "packet has unknown version %" PRIu32,
+                   packet->version);
+        return -1;
+    }
+
+    if (packet->idx == UINT32_MAX) {
+        error_setg(errp, "packet has too high idx %" PRIu32,
+                   packet->idx);
+        return -1;
+    }
+
+    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
+
+    /* config state packet should be the last one in the stream */
+    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
+        migration->load_buf_idx_last = packet->idx;
+    }
+
+    assert(migration->load_bufs);
+    if (packet->idx >= migration->load_bufs->len) {
+        g_array_set_size(migration->load_bufs, packet->idx + 1);
+    }
+
+    lb = &g_array_index(migration->load_bufs, typeof(*lb), packet->idx);
+    if (lb->is_present) {
+        error_setg(errp, "state buffer %" PRIu32 " already filled", packet->idx);
+        return -1;
+    }
+
+    assert(packet->idx >= migration->load_buf_idx);
+
+    migration->load_buf_queued_pending_buffers++;
+    if (migration->load_buf_queued_pending_buffers >
+        vbasedev->migration_max_queued_buffers) {
+        error_setg(errp,
+                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
+                   packet->idx, vbasedev->migration_max_queued_buffers);
+        return -1;
+    }
+
+    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
+    lb->len = data_size - sizeof(*packet);
+    lb->is_present = true;
+
+    qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
+
+    return 0;
+}
+
+static void *vfio_load_bufs_thread(void *opaque)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    Error **errp = &migration->load_bufs_thread_errp;
+    g_autoptr(QemuLockable) locker = qemu_lockable_auto_lock(
+        QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));
+    LoadedBuffer *lb;
+
+    while (!migration->load_bufs_device_ready &&
+           !migration->load_bufs_thread_want_exit) {
+        qemu_cond_wait(&migration->load_bufs_device_ready_cond, &migration->load_bufs_mutex);
+    }
+
+    while (!migration->load_bufs_thread_want_exit) {
+        bool starved;
+        ssize_t ret;
+
+        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
+
+        if (migration->load_buf_idx >= migration->load_bufs->len) {
+            assert(migration->load_buf_idx == migration->load_bufs->len);
+            starved = true;
+        } else {
+            lb = &g_array_index(migration->load_bufs, typeof(*lb), migration->load_buf_idx);
+            starved = !lb->is_present;
+        }
+
+        if (starved) {
+            trace_vfio_load_state_device_buffer_starved(vbasedev->name, migration->load_buf_idx);
+            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond, &migration->load_bufs_mutex);
+            continue;
+        }
+
+        if (migration->load_buf_idx == migration->load_buf_idx_last) {
+            break;
+        }
+
+        if (migration->load_buf_idx == 0) {
+            trace_vfio_load_state_device_buffer_start(vbasedev->name);
+        }
+
+        if (lb->len) {
+            g_autofree char *buf = NULL;
+            size_t buf_len;
+            int errno_save;
+
+            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
+                                                           migration->load_buf_idx);
+
+            /* lb might become re-allocated when we drop the lock */
+            buf = g_steal_pointer(&lb->data);
+            buf_len = lb->len;
+
+            /* Loading data to the device takes a while, drop the lock during this process */
+            qemu_mutex_unlock(&migration->load_bufs_mutex);
+            ret = write(migration->data_fd, buf, buf_len);
+            errno_save = errno;
+            qemu_mutex_lock(&migration->load_bufs_mutex);
+
+            if (ret < 0) {
+                error_setg(errp, "write to state buffer %" PRIu32 " failed with %d",
+                           migration->load_buf_idx, errno_save);
+                break;
+            } else if (ret < buf_len) {
+                error_setg(errp, "write to state buffer %" PRIu32 " incomplete %zd / %zu",
+                           migration->load_buf_idx, ret, buf_len);
+                break;
+            }
+
+            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
+                                                         migration->load_buf_idx);
+        }
+
+        assert(migration->load_buf_queued_pending_buffers > 0);
+        migration->load_buf_queued_pending_buffers--;
+
+        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
+            trace_vfio_load_state_device_buffer_end(vbasedev->name);
+        }
+
+        migration->load_buf_idx++;
+    }
+
+    if (migration->load_bufs_thread_want_exit &&
+        !*errp) {
+        error_setg(errp, "load bufs thread asked to quit");
+    }
+
+    g_clear_pointer(&locker, qemu_lockable_auto_unlock);
+
+    qemu_loadvm_load_finish_ready_lock();
+    migration->load_bufs_thread_finished = true;
+    qemu_loadvm_load_finish_ready_broadcast();
+    qemu_loadvm_load_finish_ready_unlock();
+
+    return NULL;
+}
+
 static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
                                          Error **errp)
 {
@@ -285,6 +478,8 @@  static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
     VFIODevice *vbasedev = opaque;
     uint64_t data;
 
+    trace_vfio_load_device_config_state_start(vbasedev->name);
+
     if (vbasedev->ops && vbasedev->ops->vfio_load_config) {
         int ret;
 
@@ -303,7 +498,7 @@  static int vfio_load_device_config_state(QEMUFile *f, void *opaque)
         return -EINVAL;
     }
 
-    trace_vfio_load_device_config_state(vbasedev->name);
+    trace_vfio_load_device_config_state_end(vbasedev->name);
     return qemu_file_get_error(f);
 }
 
@@ -687,16 +882,70 @@  static void vfio_save_state(QEMUFile *f, void *opaque)
 static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
 {
     VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    int ret;
+
+    ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
+                                   vbasedev->migration->device_state, errp);
+    if (ret) {
+        return ret;
+    }
+
+    assert(!migration->load_bufs);
+    migration->load_bufs = g_array_new(FALSE, TRUE, sizeof(LoadedBuffer));
+    g_array_set_clear_func(migration->load_bufs, loaded_buffer_clear);
+
+    qemu_mutex_init(&migration->load_bufs_mutex);
+
+    migration->load_bufs_device_ready = false;
+    qemu_cond_init(&migration->load_bufs_device_ready_cond);
+
+    migration->load_buf_idx = 0;
+    migration->load_buf_idx_last = UINT32_MAX;
+    migration->load_buf_queued_pending_buffers = 0;
+    qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
+
+    migration->config_state_loaded_to_dev = false;
+
+    assert(!migration->load_bufs_thread_started);
 
-    return vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
-                                    vbasedev->migration->device_state, errp);
+    migration->load_bufs_thread_finished = false;
+    migration->load_bufs_thread_want_exit = false;
+    qemu_thread_create(&migration->load_bufs_thread, "vfio-load-bufs",
+                       vfio_load_bufs_thread, opaque, QEMU_THREAD_JOINABLE);
+
+    migration->load_bufs_thread_started = true;
+
+    return 0;
 }
 
 static int vfio_load_cleanup(void *opaque)
 {
     VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+
+    if (migration->load_bufs_thread_started) {
+        qemu_mutex_lock(&migration->load_bufs_mutex);
+        migration->load_bufs_thread_want_exit = true;
+        qemu_mutex_unlock(&migration->load_bufs_mutex);
+
+        qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
+        qemu_cond_broadcast(&migration->load_bufs_buffer_ready_cond);
+
+        qemu_thread_join(&migration->load_bufs_thread);
+
+        assert(migration->load_bufs_thread_finished);
+
+        migration->load_bufs_thread_started = false;
+    }
 
     vfio_migration_cleanup(vbasedev);
+
+    g_clear_pointer(&migration->load_bufs, g_array_unref);
+    qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
+    qemu_cond_destroy(&migration->load_bufs_device_ready_cond);
+    qemu_mutex_destroy(&migration->load_bufs_mutex);
+
     trace_vfio_load_cleanup(vbasedev->name);
 
     return 0;
@@ -705,6 +954,7 @@  static int vfio_load_cleanup(void *opaque)
 static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
 {
     VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
     int ret = 0;
     uint64_t data;
 
@@ -716,6 +966,7 @@  static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
         switch (data) {
         case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
         {
+            migration->config_state_loaded_to_dev = true;
             return vfio_load_device_config_state(f, opaque);
         }
         case VFIO_MIG_FLAG_DEV_SETUP_STATE:
@@ -742,6 +993,15 @@  static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
             }
             break;
         }
+        case VFIO_MIG_FLAG_DEV_DATA_STATE_COMPLETE:
+        {
+            QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
+
+            migration->load_bufs_device_ready = true;
+            qemu_cond_broadcast(&migration->load_bufs_device_ready_cond);
+
+            break;
+        }
         case VFIO_MIG_FLAG_DEV_INIT_DATA_SENT:
         {
             if (!vfio_precopy_supported(vbasedev) ||
@@ -774,6 +1034,76 @@  static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
     return ret;
 }
 
+static int vfio_load_finish(void *opaque, bool *is_finished, Error **errp)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    g_autoptr(QemuLockable) locker = NULL;
+    LoadedBuffer *lb;
+    g_autoptr(QIOChannelBuffer) bioc = NULL;
+    QEMUFile *f_out = NULL, *f_in = NULL;
+    uint64_t mig_header;
+    int ret;
+
+    if (migration->config_state_loaded_to_dev) {
+        *is_finished = true;
+        return 0;
+    }
+
+    if (!migration->load_bufs_thread_finished) {
+        assert(migration->load_bufs_thread_started);
+        *is_finished = false;
+        return 0;
+    }
+
+    if (migration->load_bufs_thread_errp) {
+        error_propagate(errp, g_steal_pointer(&migration->load_bufs_thread_errp));
+        return -1;
+    }
+
+    locker = qemu_lockable_auto_lock(QEMU_MAKE_LOCKABLE(&migration->load_bufs_mutex));
+
+    assert(migration->load_buf_idx == migration->load_buf_idx_last);
+    lb = &g_array_index(migration->load_bufs, typeof(*lb), migration->load_buf_idx);
+    assert(lb->is_present);
+
+    bioc = qio_channel_buffer_new(lb->len);
+    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
+
+    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
+    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
+
+    ret = qemu_fflush(f_out);
+    if (ret) {
+        error_setg(errp, "load device config state file flush failed with %d", ret);
+        g_clear_pointer(&f_out, qemu_fclose);
+        return -1;
+    }
+
+    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
+    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
+
+    mig_header = qemu_get_be64(f_in);
+    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
+        error_setg(errp, "load device config state invalid header %"PRIu64, mig_header);
+        g_clear_pointer(&f_out, qemu_fclose);
+        g_clear_pointer(&f_in, qemu_fclose);
+        return -1;
+    }
+
+    ret = vfio_load_device_config_state(f_in, opaque);
+    g_clear_pointer(&f_out, qemu_fclose);
+    g_clear_pointer(&f_in, qemu_fclose);
+    if (ret < 0) {
+        error_setg(errp, "load device config state failed with %d", ret);
+        return -1;
+    }
+
+    migration->config_state_loaded_to_dev = true;
+    *is_finished = true;
+    return 0;
+}
+
 static bool vfio_switchover_ack_needed(void *opaque)
 {
     VFIODevice *vbasedev = opaque;
@@ -794,6 +1124,8 @@  static const SaveVMHandlers savevm_vfio_handlers = {
     .load_setup = vfio_load_setup,
     .load_cleanup = vfio_load_cleanup,
     .load_state = vfio_load_state,
+    .load_state_buffer = vfio_load_state_buffer,
+    .load_finish = vfio_load_finish,
     .switchover_ack_needed = vfio_switchover_ack_needed,
 };
 
diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
index 2407720c3530..08cb56d27a05 100644
--- a/hw/vfio/pci.c
+++ b/hw/vfio/pci.c
@@ -3378,6 +3378,8 @@  static Property vfio_pci_dev_properties[] = {
                     VFIO_FEATURE_ENABLE_IGD_OPREGION_BIT, false),
     DEFINE_PROP_ON_OFF_AUTO("enable-migration", VFIOPCIDevice,
                             vbasedev.enable_migration, ON_OFF_AUTO_AUTO),
+    DEFINE_PROP_UINT64("x-migration-max-queued-buffers", VFIOPCIDevice,
+                       vbasedev.migration_max_queued_buffers, UINT64_MAX),
     DEFINE_PROP_BOOL("migration-events", VFIOPCIDevice,
                      vbasedev.migration_events, false),
     DEFINE_PROP_BOOL("x-no-mmap", VFIOPCIDevice, vbasedev.no_mmap, false),
diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
index 013c602f30fa..9d2519a28a7e 100644
--- a/hw/vfio/trace-events
+++ b/hw/vfio/trace-events
@@ -149,9 +149,16 @@  vfio_display_edid_write_error(void) ""
 
 # migration.c
 vfio_load_cleanup(const char *name) " (%s)"
-vfio_load_device_config_state(const char *name) " (%s)"
+vfio_load_device_config_state_start(const char *name) " (%s)"
+vfio_load_device_config_state_end(const char *name) " (%s)"
 vfio_load_state(const char *name, uint64_t data) " (%s) data 0x%"PRIx64
 vfio_load_state_device_data(const char *name, uint64_t data_size, int ret) " (%s) size 0x%"PRIx64" ret %d"
+vfio_load_state_device_buffer_incoming(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_start(const char *name) " (%s)"
+vfio_load_state_device_buffer_starved(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_load_start(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_load_end(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_end(const char *name) " (%s)"
 vfio_migration_realize(const char *name) " (%s)"
 vfio_migration_set_device_state(const char *name, const char *state) " (%s) state %s"
 vfio_migration_set_state(const char *name, const char *new_state, const char *recover_state) " (%s) new state %s, recover state %s"
diff --git a/include/hw/vfio/vfio-common.h b/include/hw/vfio/vfio-common.h
index 32d58e3e025b..ba5b9464e79a 100644
--- a/include/hw/vfio/vfio-common.h
+++ b/include/hw/vfio/vfio-common.h
@@ -76,6 +76,22 @@  typedef struct VFIOMigration {
 
     bool save_iterate_run;
     bool save_iterate_empty_hit;
+
+    QemuThread load_bufs_thread;
+    Error *load_bufs_thread_errp;
+    bool load_bufs_thread_started;
+    bool load_bufs_thread_finished;
+    bool load_bufs_thread_want_exit;
+
+    GArray *load_bufs;
+    bool load_bufs_device_ready;
+    QemuCond load_bufs_device_ready_cond;
+    QemuCond load_bufs_buffer_ready_cond;
+    QemuMutex load_bufs_mutex;
+    uint32_t load_buf_idx;
+    uint32_t load_buf_idx_last;
+    uint32_t load_buf_queued_pending_buffers;
+    bool config_state_loaded_to_dev;
 } VFIOMigration;
 
 struct VFIOGroup;
@@ -134,6 +150,7 @@  typedef struct VFIODevice {
     bool ram_block_discard_allowed;
     OnOffAuto enable_migration;
     bool migration_events;
+    uint64_t migration_max_queued_buffers;
     VFIODeviceOps *ops;
     unsigned int num_irqs;
     unsigned int num_regions;