diff mbox series

[v4,19/34] migration/multifd: Allow receiving pages without packets

Message ID 20240220224138.24759-20-farosas@suse.de (mailing list archive)
State New, archived
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas Feb. 20, 2024, 10:41 p.m. UTC
Currently multifd does not need to have knowledge of pages on the
receiving side because all the information needed is within the
packets that come in the stream.

We're about to add support to fixed-ram migration, which cannot use
packets because it expects the ramblock section in the migration file
to contain only the guest pages data.

Add a data structure to transfer pages between the ram migration code
and the multifd receiving threads.

We don't want to reuse MultiFDPages_t for two reasons:

a) multifd threads don't really need to know about the data they're
   receiving.

b) the receiving side has to be stopped to load the pages, which means
   we can experiment with larger granularities than page size when
   transferring data.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
@Peter: a 'quit' flag cannot be used instead of pending_job. The
receiving thread needs know there's no more data coming. If the
migration thread sets a 'quit' flag, the multifd thread would see the
flag right away and exit. The only way is to clear pending_job on the
thread and spin once more.
---
 migration/file.c    |   1 +
 migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
 migration/multifd.h |  15 ++++++
 3 files changed, 131 insertions(+), 7 deletions(-)

Comments

Peter Xu Feb. 26, 2024, 6:58 a.m. UTC | #1
On Tue, Feb 20, 2024 at 07:41:23PM -0300, Fabiano Rosas wrote:
> Currently multifd does not need to have knowledge of pages on the
> receiving side because all the information needed is within the
> packets that come in the stream.
> 
> We're about to add support to fixed-ram migration, which cannot use
> packets because it expects the ramblock section in the migration file
> to contain only the guest pages data.
> 
> Add a data structure to transfer pages between the ram migration code
> and the multifd receiving threads.
> 
> We don't want to reuse MultiFDPages_t for two reasons:
> 
> a) multifd threads don't really need to know about the data they're
>    receiving.
> 
> b) the receiving side has to be stopped to load the pages, which means
>    we can experiment with larger granularities than page size when
>    transferring data.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> @Peter: a 'quit' flag cannot be used instead of pending_job. The
> receiving thread needs know there's no more data coming. If the
> migration thread sets a 'quit' flag, the multifd thread would see the
> flag right away and exit.

Hmm.. isn't this exactly what we want?  I'll comment for this inline below.

> The only way is to clear pending_job on the
> thread and spin once more.
> ---
>  migration/file.c    |   1 +
>  migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
>  migration/multifd.h |  15 ++++++
>  3 files changed, 131 insertions(+), 7 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 5d4975f43e..22d052a71f 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -6,6 +6,7 @@
>   */
>  
>  #include "qemu/osdep.h"
> +#include "exec/ramblock.h"
>  #include "qemu/cutils.h"
>  #include "qapi/error.h"
>  #include "channel.h"
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 0a5279314d..45a0c7aaa8 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -81,9 +81,15 @@ struct {
>  
>  struct {
>      MultiFDRecvParams *params;
> +    MultiFDRecvData *data;
>      /* number of created threads */
>      int count;
> -    /* syncs main thread and channels */
> +    /*
> +     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
> +     *
> +     * For files: this is only posted at the end of the file load to mark
> +     *            completion of the load process.
> +     */
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> @@ -1110,6 +1116,53 @@ bool multifd_send_setup(void)
>      return true;
>  }
>  
> +bool multifd_recv(void)
> +{
> +    int i;
> +    static int next_recv_channel;
> +    MultiFDRecvParams *p = NULL;
> +    MultiFDRecvData *data = multifd_recv_state->data;

[1]

> +
> +    /*
> +     * next_channel can remain from a previous migration that was
> +     * using more channels, so ensure it doesn't overflow if the
> +     * limit is lower now.
> +     */
> +    next_recv_channel %= migrate_multifd_channels();
> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        if (multifd_recv_should_exit()) {
> +            return false;
> +        }
> +
> +        p = &multifd_recv_state->params[i];
> +
> +        /*
> +         * Safe to read atomically without a lock because the flag is
> +         * only set by this function below. Reading an old value of
> +         * true is not an issue because it would only send us looking
> +         * for the next idle channel.
> +         */
> +        if (qatomic_read(&p->pending_job) == false) {
> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +    }

IIUC you'll need an smp_mb_acquire() here.  The ordering of "reading
pending_job" and below must be guaranteed, similar to the sender side.

> +
> +    assert(!p->data->size);
> +    multifd_recv_state->data = p->data;

[2]

> +    p->data = data;
> +
> +    qatomic_set(&p->pending_job, true);

Then here:

       qatomic_store_release(&p->pending_job, true);

Please consider add comment above all acquire/releases pairs like sender
too.

> +    qemu_sem_post(&p->sem);
> +
> +    return true;
> +}
> +
> +MultiFDRecvData *multifd_get_recv_data(void)
> +{
> +    return multifd_recv_state->data;
> +}

Can also use it above [1].

I'm thinking maybe we can do something like:

#define  MULTIFD_RECV_DATA_GLOBAL  (multifd_recv_state->data)

Then we can also use it at [2], and replace multifd_get_recv_data()?

> +
>  static void multifd_recv_terminate_threads(Error *err)
>  {
>      int i;
> @@ -1134,11 +1187,26 @@ static void multifd_recv_terminate_threads(Error *err)
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          /*
> -         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
> -         * however try to wakeup it without harm in cleanup phase.
> +         * The migration thread and channels interact differently
> +         * depending on the presence of packets.
>           */
>          if (multifd_use_packets()) {
> +            /*
> +             * The channel receives as long as there are packets. When
> +             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
> +             * channel waits for the migration thread to sync. If the
> +             * sync never happens, do it here.
> +             */
>              qemu_sem_post(&p->sem_sync);
> +        } else {
> +            /*
> +             * The channel waits for the migration thread to give it
> +             * work. When the migration thread runs out of work, it
> +             * releases the channel and waits for any pending work to
> +             * finish. If we reach here (e.g. due to error) before the
> +             * work runs out, release the channel.
> +             */
> +            qemu_sem_post(&p->sem);
>          }
>  
>          /*
> @@ -1167,6 +1235,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>      p->c = NULL;
>      qemu_mutex_destroy(&p->mutex);
>      qemu_sem_destroy(&p->sem_sync);
> +    qemu_sem_destroy(&p->sem);
>      g_free(p->name);
>      p->name = NULL;
>      p->packet_len = 0;
> @@ -1184,6 +1253,8 @@ static void multifd_recv_cleanup_state(void)
>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>      g_free(multifd_recv_state->params);
>      multifd_recv_state->params = NULL;
> +    g_free(multifd_recv_state->data);
> +    multifd_recv_state->data = NULL;
>      g_free(multifd_recv_state);
>      multifd_recv_state = NULL;
>  }
> @@ -1251,11 +1322,11 @@ static void *multifd_recv_thread(void *opaque)
>          bool has_data = false;
>          p->normal_num = 0;
>  
> -        if (multifd_recv_should_exit()) {
> -            break;
> -        }
> -
>          if (use_packets) {
> +            if (multifd_recv_should_exit()) {
> +                break;
> +            }
> +
>              ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>                                             p->packet_len, &local_err);
>              if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> @@ -1274,6 +1345,26 @@ static void *multifd_recv_thread(void *opaque)
>              p->flags &= ~MULTIFD_FLAG_SYNC;
>              has_data = !!p->normal_num;
>              qemu_mutex_unlock(&p->mutex);
> +        } else {
> +            /*
> +             * No packets, so we need to wait for the vmstate code to
> +             * give us work.
> +             */
> +            qemu_sem_wait(&p->sem);
> +
> +            if (multifd_recv_should_exit()) {
> +                break;
> +            }
> +
> +            /*
> +             * Migration thread did not send work, break and signal
> +             * sem_sync so it knows we're not lagging behind.
> +             */
> +            if (!qatomic_read(&p->pending_job)) {
> +                break;
> +            }

In reality, this _must_ be true when reaching here, right?  Since AFAIU
recv side p->sem is posted only in two conditions:

  1) when there is work (pending_job==true)
  2) when terminating threads (multifd_recv_should_exit==true)

Then if 2) is checked above, I assume 1) must be the case here?

> +
> +            has_data = !!p->data->size;
>          }
>  
>          if (has_data) {
> @@ -1288,9 +1379,17 @@ static void *multifd_recv_thread(void *opaque)
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
>              }
> +        } else {
> +            p->total_normal_pages += p->data->size / qemu_target_page_size();
> +            p->data->size = 0;
> +            qatomic_set(&p->pending_job, false);

I think it needs to be:

  qatomic_store_release(&p->pending_job, false);

?

So as to guarantee when the other side sees pending_job==false, size must
already have been reset.

>          }
>      }
>  
> +    if (!use_packets) {
> +        qemu_sem_post(&p->sem_sync);
> +    }
> +
>      if (local_err) {
>          multifd_recv_terminate_threads(local_err);
>          error_free(local_err);
> @@ -1320,6 +1419,10 @@ int multifd_recv_setup(Error **errp)
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> +
> +    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
> +    multifd_recv_state->data->size = 0;
> +
>      qatomic_set(&multifd_recv_state->count, 0);
>      qatomic_set(&multifd_recv_state->exiting, 0);
>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
> @@ -1330,8 +1433,13 @@ int multifd_recv_setup(Error **errp)
>  
>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem_sync, 0);
> +        qemu_sem_init(&p->sem, 0);
> +        p->pending_job = false;
>          p->id = i;
>  
> +        p->data = g_new0(MultiFDRecvData, 1);
> +        p->data->size = 0;
> +
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
>                  + sizeof(uint64_t) * page_count;
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 9a6a7a72df..19188815a3 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +typedef struct MultiFDRecvData MultiFDRecvData;
> +
>  bool multifd_send_setup(void);
>  void multifd_send_shutdown(void);
>  int multifd_recv_setup(Error **errp);
> @@ -23,6 +25,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>  void multifd_recv_sync_main(void);
>  int multifd_send_sync_main(void);
>  bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
> +bool multifd_recv(void);
> +MultiFDRecvData *multifd_get_recv_data(void);
>  
>  /* Multifd Compression flags */
>  #define MULTIFD_FLAG_SYNC (1 << 0)
> @@ -63,6 +67,13 @@ typedef struct {
>      RAMBlock *block;
>  } MultiFDPages_t;
>  
> +struct MultiFDRecvData {
> +    void *opaque;
> +    size_t size;
> +    /* for preadv */
> +    off_t file_offset;
> +};
> +
>  typedef struct {
>      /* Fields are only written at creating/deletion time */
>      /* No lock required for them, they are read only */
> @@ -154,6 +165,8 @@ typedef struct {
>  
>      /* syncs main thread and channels */
>      QemuSemaphore sem_sync;
> +    /* sem where to wait for more work */
> +    QemuSemaphore sem;
>  
>      /* this mutex protects the following parameters */
>      QemuMutex mutex;
> @@ -163,6 +176,8 @@ typedef struct {
>      uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    int pending_job;
> +    MultiFDRecvData *data;
>  
>      /* thread local variables. No locking required */
>  
> -- 
> 2.35.3
>
Fabiano Rosas Feb. 26, 2024, 7:19 p.m. UTC | #2
Peter Xu <peterx@redhat.com> writes:

> On Tue, Feb 20, 2024 at 07:41:23PM -0300, Fabiano Rosas wrote:
>> Currently multifd does not need to have knowledge of pages on the
>> receiving side because all the information needed is within the
>> packets that come in the stream.
>> 
>> We're about to add support to fixed-ram migration, which cannot use
>> packets because it expects the ramblock section in the migration file
>> to contain only the guest pages data.
>> 
>> Add a data structure to transfer pages between the ram migration code
>> and the multifd receiving threads.
>> 
>> We don't want to reuse MultiFDPages_t for two reasons:
>> 
>> a) multifd threads don't really need to know about the data they're
>>    receiving.
>> 
>> b) the receiving side has to be stopped to load the pages, which means
>>    we can experiment with larger granularities than page size when
>>    transferring data.
>> 
>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> ---
>> @Peter: a 'quit' flag cannot be used instead of pending_job. The
>> receiving thread needs know there's no more data coming. If the
>> migration thread sets a 'quit' flag, the multifd thread would see the
>> flag right away and exit.
>
> Hmm.. isn't this exactly what we want?  I'll comment for this inline below.
>
>> The only way is to clear pending_job on the
>> thread and spin once more.
>> ---
>>  migration/file.c    |   1 +
>>  migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
>>  migration/multifd.h |  15 ++++++
>>  3 files changed, 131 insertions(+), 7 deletions(-)
>> 
>> diff --git a/migration/file.c b/migration/file.c
>> index 5d4975f43e..22d052a71f 100644
>> --- a/migration/file.c
>> +++ b/migration/file.c
>> @@ -6,6 +6,7 @@
>>   */
>>  
>>  #include "qemu/osdep.h"
>> +#include "exec/ramblock.h"
>>  #include "qemu/cutils.h"
>>  #include "qapi/error.h"
>>  #include "channel.h"
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 0a5279314d..45a0c7aaa8 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -81,9 +81,15 @@ struct {
>>  
>>  struct {
>>      MultiFDRecvParams *params;
>> +    MultiFDRecvData *data;
>>      /* number of created threads */
>>      int count;
>> -    /* syncs main thread and channels */
>> +    /*
>> +     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
>> +     *
>> +     * For files: this is only posted at the end of the file load to mark
>> +     *            completion of the load process.
>> +     */
>>      QemuSemaphore sem_sync;
>>      /* global number of generated multifd packets */
>>      uint64_t packet_num;
>> @@ -1110,6 +1116,53 @@ bool multifd_send_setup(void)
>>      return true;
>>  }
>>  
>> +bool multifd_recv(void)
>> +{
>> +    int i;
>> +    static int next_recv_channel;
>> +    MultiFDRecvParams *p = NULL;
>> +    MultiFDRecvData *data = multifd_recv_state->data;
>
> [1]
>
>> +
>> +    /*
>> +     * next_channel can remain from a previous migration that was
>> +     * using more channels, so ensure it doesn't overflow if the
>> +     * limit is lower now.
>> +     */
>> +    next_recv_channel %= migrate_multifd_channels();
>> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
>> +        if (multifd_recv_should_exit()) {
>> +            return false;
>> +        }
>> +
>> +        p = &multifd_recv_state->params[i];
>> +
>> +        /*
>> +         * Safe to read atomically without a lock because the flag is
>> +         * only set by this function below. Reading an old value of
>> +         * true is not an issue because it would only send us looking
>> +         * for the next idle channel.
>> +         */
>> +        if (qatomic_read(&p->pending_job) == false) {
>> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
>> +            break;
>> +        }
>> +    }
>
> IIUC you'll need an smp_mb_acquire() here.  The ordering of "reading
> pending_job" and below must be guaranteed, similar to the sender side.
>

I've been thinking about this even on the sending side.

We shouldn't need the barrier here because there's a control flow
dependency on breaking the loop. I think pending_job *must* be read
prior to here, otherwise the program is just wrong. Does that make
sense?

>> +
>> +    assert(!p->data->size);
>> +    multifd_recv_state->data = p->data;
>
> [2]
>
>> +    p->data = data;
>> +
>> +    qatomic_set(&p->pending_job, true);
>
> Then here:
>
>        qatomic_store_release(&p->pending_job, true);

Ok.

>
> Please consider add comment above all acquire/releases pairs like sender
> too.
>
>> +    qemu_sem_post(&p->sem);
>> +
>> +    return true;
>> +}
>> +
>> +MultiFDRecvData *multifd_get_recv_data(void)
>> +{
>> +    return multifd_recv_state->data;
>> +}
>
> Can also use it above [1].
>
> I'm thinking maybe we can do something like:
>
> #define  MULTIFD_RECV_DATA_GLOBAL  (multifd_recv_state->data)
>
> Then we can also use it at [2], and replace multifd_get_recv_data()?
>

We need the helper because multifd_recv_state->data needs to be
accessible from ram.c in patch 24.

>> + static void multifd_recv_terminate_threads(Error *err) { int i; @@
>> -1134,11 +1187,26 @@ static void multifd_recv_terminate_threads(Error
>> *err) MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>  
>>          /*
>> -         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
>> -         * however try to wakeup it without harm in cleanup phase.
>> +         * The migration thread and channels interact differently
>> +         * depending on the presence of packets.
>>           */
>>          if (multifd_use_packets()) {
>> +            /*
>> +             * The channel receives as long as there are packets. When
>> +             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
>> +             * channel waits for the migration thread to sync. If the
>> +             * sync never happens, do it here.
>> +             */
>>              qemu_sem_post(&p->sem_sync);
>> +        } else {
>> +            /*
>> +             * The channel waits for the migration thread to give it
>> +             * work. When the migration thread runs out of work, it
>> +             * releases the channel and waits for any pending work to
>> +             * finish. If we reach here (e.g. due to error) before the
>> +             * work runs out, release the channel.
>> +             */
>> +            qemu_sem_post(&p->sem);
>>          }
>>  
>>          /*
>> @@ -1167,6 +1235,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>>      p->c = NULL;
>>      qemu_mutex_destroy(&p->mutex);
>>      qemu_sem_destroy(&p->sem_sync);
>> +    qemu_sem_destroy(&p->sem);
>>      g_free(p->name);
>>      p->name = NULL;
>>      p->packet_len = 0;
>> @@ -1184,6 +1253,8 @@ static void multifd_recv_cleanup_state(void)
>>      qemu_sem_destroy(&multifd_recv_state->sem_sync);
>>      g_free(multifd_recv_state->params);
>>      multifd_recv_state->params = NULL;
>> +    g_free(multifd_recv_state->data);
>> +    multifd_recv_state->data = NULL;
>>      g_free(multifd_recv_state);
>>      multifd_recv_state = NULL;
>>  }
>> @@ -1251,11 +1322,11 @@ static void *multifd_recv_thread(void *opaque)
>>          bool has_data = false;
>>          p->normal_num = 0;
>>  
>> -        if (multifd_recv_should_exit()) {
>> -            break;
>> -        }
>> -
>>          if (use_packets) {
>> +            if (multifd_recv_should_exit()) {
>> +                break;
>> +            }
>> +
>>              ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>>                                             p->packet_len, &local_err);
>>              if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>> @@ -1274,6 +1345,26 @@ static void *multifd_recv_thread(void *opaque)
>>              p->flags &= ~MULTIFD_FLAG_SYNC;
>>              has_data = !!p->normal_num;
>>              qemu_mutex_unlock(&p->mutex);
>> +        } else {
>> +            /*
>> +             * No packets, so we need to wait for the vmstate code to
>> +             * give us work.
>> +             */
>> +            qemu_sem_wait(&p->sem);
>> +
>> +            if (multifd_recv_should_exit()) {
>> +                break;
>> +            }
>> +
>> +            /*
>> +             * Migration thread did not send work, break and signal
>> +             * sem_sync so it knows we're not lagging behind.
>> +             */
>> +            if (!qatomic_read(&p->pending_job)) {
>> +                break;
>> +            }
>
> In reality, this _must_ be true when reaching here, right?  Since AFAIU
> recv side p->sem is posted only in two conditions:
>
>   1) when there is work (pending_job==true)
>   2) when terminating threads (multifd_recv_should_exit==true)

    3) at multifd_recv_sync_main (pending_job state is unknown)

>
> Then if 2) is checked above, I assume 1) must be the case here?
>

The issue is that 'exiting' is global while p->pending_job is
per-channel. Whenever we set 'exiting', there's no guarantee that all
channels have already passed the should_exit check. Some of them could
still have pending_job=true by the time they see the exiting flag.

We queue all the jobs and immediately call recv_sync_main. It doesn't
matter that all jobs are queued and that we know for sure the work is
done. What matters is that each channel gets to finish its work before
it sees the exit flag. And that depends on checking pending_job.

>> +
>> +            has_data = !!p->data->size;
>>          }
>>  
>>          if (has_data) {
>> @@ -1288,9 +1379,17 @@ static void *multifd_recv_thread(void *opaque)
>>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>>                  qemu_sem_wait(&p->sem_sync);
>>              }
>> +        } else {
>> +            p->total_normal_pages += p->data->size / qemu_target_page_size();
>> +            p->data->size = 0;
>> +            qatomic_set(&p->pending_job, false);
>
> I think it needs to be:
>
>   qatomic_store_release(&p->pending_job, false);
>
> ?
>
> So as to guarantee when the other side sees pending_job==false, size must
> already have been reset.
>

Ok.

>>          }
>>      }
>>  
>> +    if (!use_packets) {
>> +        qemu_sem_post(&p->sem_sync);
>> +    }
>> +
>>      if (local_err) {
>>          multifd_recv_terminate_threads(local_err);
>>          error_free(local_err);
>> @@ -1320,6 +1419,10 @@ int multifd_recv_setup(Error **errp)
>>      thread_count = migrate_multifd_channels();
>>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>> +
>> +    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
>> +    multifd_recv_state->data->size = 0;
>> +
>>      qatomic_set(&multifd_recv_state->count, 0);
>>      qatomic_set(&multifd_recv_state->exiting, 0);
>>      qemu_sem_init(&multifd_recv_state->sem_sync, 0);
>> @@ -1330,8 +1433,13 @@ int multifd_recv_setup(Error **errp)
>>  
>>          qemu_mutex_init(&p->mutex);
>>          qemu_sem_init(&p->sem_sync, 0);
>> +        qemu_sem_init(&p->sem, 0);
>> +        p->pending_job = false;
>>          p->id = i;
>>  
>> +        p->data = g_new0(MultiFDRecvData, 1);
>> +        p->data->size = 0;
>> +
>>          if (use_packets) {
>>              p->packet_len = sizeof(MultiFDPacket_t)
>>                  + sizeof(uint64_t) * page_count;
>> diff --git a/migration/multifd.h b/migration/multifd.h
>> index 9a6a7a72df..19188815a3 100644
>> --- a/migration/multifd.h
>> +++ b/migration/multifd.h
>> @@ -13,6 +13,8 @@
>>  #ifndef QEMU_MIGRATION_MULTIFD_H
>>  #define QEMU_MIGRATION_MULTIFD_H
>>  
>> +typedef struct MultiFDRecvData MultiFDRecvData;
>> +
>>  bool multifd_send_setup(void);
>>  void multifd_send_shutdown(void);
>>  int multifd_recv_setup(Error **errp);
>> @@ -23,6 +25,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>>  void multifd_recv_sync_main(void);
>>  int multifd_send_sync_main(void);
>>  bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
>> +bool multifd_recv(void);
>> +MultiFDRecvData *multifd_get_recv_data(void);
>>  
>>  /* Multifd Compression flags */
>>  #define MULTIFD_FLAG_SYNC (1 << 0)
>> @@ -63,6 +67,13 @@ typedef struct {
>>      RAMBlock *block;
>>  } MultiFDPages_t;
>>  
>> +struct MultiFDRecvData {
>> +    void *opaque;
>> +    size_t size;
>> +    /* for preadv */
>> +    off_t file_offset;
>> +};
>> +
>>  typedef struct {
>>      /* Fields are only written at creating/deletion time */
>>      /* No lock required for them, they are read only */
>> @@ -154,6 +165,8 @@ typedef struct {
>>  
>>      /* syncs main thread and channels */
>>      QemuSemaphore sem_sync;
>> +    /* sem where to wait for more work */
>> +    QemuSemaphore sem;
>>  
>>      /* this mutex protects the following parameters */
>>      QemuMutex mutex;
>> @@ -163,6 +176,8 @@ typedef struct {
>>      uint32_t flags;
>>      /* global number of generated multifd packets */
>>      uint64_t packet_num;
>> +    int pending_job;
>> +    MultiFDRecvData *data;
>>  
>>      /* thread local variables. No locking required */
>>  
>> -- 
>> 2.35.3
>>
Fabiano Rosas Feb. 26, 2024, 8:54 p.m. UTC | #3
Fabiano Rosas <farosas@suse.de> writes:

> Peter Xu <peterx@redhat.com> writes:
>
>> On Tue, Feb 20, 2024 at 07:41:23PM -0300, Fabiano Rosas wrote:
>>> Currently multifd does not need to have knowledge of pages on the
>>> receiving side because all the information needed is within the
>>> packets that come in the stream.
>>> 
>>> We're about to add support to fixed-ram migration, which cannot use
>>> packets because it expects the ramblock section in the migration file
>>> to contain only the guest pages data.
>>> 
>>> Add a data structure to transfer pages between the ram migration code
>>> and the multifd receiving threads.
>>> 
>>> We don't want to reuse MultiFDPages_t for two reasons:
>>> 
>>> a) multifd threads don't really need to know about the data they're
>>>    receiving.
>>> 
>>> b) the receiving side has to be stopped to load the pages, which means
>>>    we can experiment with larger granularities than page size when
>>>    transferring data.
>>> 
>>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>>> ---
>>> @Peter: a 'quit' flag cannot be used instead of pending_job. The
>>> receiving thread needs know there's no more data coming. If the
>>> migration thread sets a 'quit' flag, the multifd thread would see the
>>> flag right away and exit.
>>
>> Hmm.. isn't this exactly what we want?  I'll comment for this inline below.
>>
>>> The only way is to clear pending_job on the
>>> thread and spin once more.
>>> ---
>>>  migration/file.c    |   1 +
>>>  migration/multifd.c | 122 +++++++++++++++++++++++++++++++++++++++++---
>>>  migration/multifd.h |  15 ++++++
>>>  3 files changed, 131 insertions(+), 7 deletions(-)
>>> 
>>> diff --git a/migration/file.c b/migration/file.c
>>> index 5d4975f43e..22d052a71f 100644
>>> --- a/migration/file.c
>>> +++ b/migration/file.c
>>> @@ -6,6 +6,7 @@
>>>   */
>>>  
>>>  #include "qemu/osdep.h"
>>> +#include "exec/ramblock.h"
>>>  #include "qemu/cutils.h"
>>>  #include "qapi/error.h"
>>>  #include "channel.h"
>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>> index 0a5279314d..45a0c7aaa8 100644
>>> --- a/migration/multifd.c
>>> +++ b/migration/multifd.c
>>> @@ -81,9 +81,15 @@ struct {
>>>  
>>>  struct {
>>>      MultiFDRecvParams *params;
>>> +    MultiFDRecvData *data;
>>>      /* number of created threads */
>>>      int count;
>>> -    /* syncs main thread and channels */
>>> +    /*
>>> +     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
>>> +     *
>>> +     * For files: this is only posted at the end of the file load to mark
>>> +     *            completion of the load process.
>>> +     */
>>>      QemuSemaphore sem_sync;
>>>      /* global number of generated multifd packets */
>>>      uint64_t packet_num;
>>> @@ -1110,6 +1116,53 @@ bool multifd_send_setup(void)
>>>      return true;
>>>  }
>>>  
>>> +bool multifd_recv(void)
>>> +{
>>> +    int i;
>>> +    static int next_recv_channel;
>>> +    MultiFDRecvParams *p = NULL;
>>> +    MultiFDRecvData *data = multifd_recv_state->data;
>>
>> [1]
>>
>>> +
>>> +    /*
>>> +     * next_channel can remain from a previous migration that was
>>> +     * using more channels, so ensure it doesn't overflow if the
>>> +     * limit is lower now.
>>> +     */
>>> +    next_recv_channel %= migrate_multifd_channels();
>>> +    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
>>> +        if (multifd_recv_should_exit()) {
>>> +            return false;
>>> +        }
>>> +
>>> +        p = &multifd_recv_state->params[i];
>>> +
>>> +        /*
>>> +         * Safe to read atomically without a lock because the flag is
>>> +         * only set by this function below. Reading an old value of
>>> +         * true is not an issue because it would only send us looking
>>> +         * for the next idle channel.
>>> +         */
>>> +        if (qatomic_read(&p->pending_job) == false) {
>>> +            next_recv_channel = (i + 1) % migrate_multifd_channels();
>>> +            break;
>>> +        }
>>> +    }
>>
>> IIUC you'll need an smp_mb_acquire() here.  The ordering of "reading
>> pending_job" and below must be guaranteed, similar to the sender side.
>>
>
> I've been thinking about this even on the sending side.
>
> We shouldn't need the barrier here because there's a control flow
> dependency on breaking the loop. I think pending_job *must* be read
> prior to here, otherwise the program is just wrong. Does that make
> sense?

Hm, nevermind actually. We need to order this against data->size update
on the other thread anyway.
diff mbox series

Patch

diff --git a/migration/file.c b/migration/file.c
index 5d4975f43e..22d052a71f 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -6,6 +6,7 @@ 
  */
 
 #include "qemu/osdep.h"
+#include "exec/ramblock.h"
 #include "qemu/cutils.h"
 #include "qapi/error.h"
 #include "channel.h"
diff --git a/migration/multifd.c b/migration/multifd.c
index 0a5279314d..45a0c7aaa8 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -81,9 +81,15 @@  struct {
 
 struct {
     MultiFDRecvParams *params;
+    MultiFDRecvData *data;
     /* number of created threads */
     int count;
-    /* syncs main thread and channels */
+    /*
+     * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag.
+     *
+     * For files: this is only posted at the end of the file load to mark
+     *            completion of the load process.
+     */
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint64_t packet_num;
@@ -1110,6 +1116,53 @@  bool multifd_send_setup(void)
     return true;
 }
 
+bool multifd_recv(void)
+{
+    int i;
+    static int next_recv_channel;
+    MultiFDRecvParams *p = NULL;
+    MultiFDRecvData *data = multifd_recv_state->data;
+
+    /*
+     * next_channel can remain from a previous migration that was
+     * using more channels, so ensure it doesn't overflow if the
+     * limit is lower now.
+     */
+    next_recv_channel %= migrate_multifd_channels();
+    for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        if (multifd_recv_should_exit()) {
+            return false;
+        }
+
+        p = &multifd_recv_state->params[i];
+
+        /*
+         * Safe to read atomically without a lock because the flag is
+         * only set by this function below. Reading an old value of
+         * true is not an issue because it would only send us looking
+         * for the next idle channel.
+         */
+        if (qatomic_read(&p->pending_job) == false) {
+            next_recv_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+    }
+
+    assert(!p->data->size);
+    multifd_recv_state->data = p->data;
+    p->data = data;
+
+    qatomic_set(&p->pending_job, true);
+    qemu_sem_post(&p->sem);
+
+    return true;
+}
+
+MultiFDRecvData *multifd_get_recv_data(void)
+{
+    return multifd_recv_state->data;
+}
+
 static void multifd_recv_terminate_threads(Error *err)
 {
     int i;
@@ -1134,11 +1187,26 @@  static void multifd_recv_terminate_threads(Error *err)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         /*
-         * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
-         * however try to wakeup it without harm in cleanup phase.
+         * The migration thread and channels interact differently
+         * depending on the presence of packets.
          */
         if (multifd_use_packets()) {
+            /*
+             * The channel receives as long as there are packets. When
+             * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
+             * channel waits for the migration thread to sync. If the
+             * sync never happens, do it here.
+             */
             qemu_sem_post(&p->sem_sync);
+        } else {
+            /*
+             * The channel waits for the migration thread to give it
+             * work. When the migration thread runs out of work, it
+             * releases the channel and waits for any pending work to
+             * finish. If we reach here (e.g. due to error) before the
+             * work runs out, release the channel.
+             */
+            qemu_sem_post(&p->sem);
         }
 
         /*
@@ -1167,6 +1235,7 @@  static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
     p->c = NULL;
     qemu_mutex_destroy(&p->mutex);
     qemu_sem_destroy(&p->sem_sync);
+    qemu_sem_destroy(&p->sem);
     g_free(p->name);
     p->name = NULL;
     p->packet_len = 0;
@@ -1184,6 +1253,8 @@  static void multifd_recv_cleanup_state(void)
     qemu_sem_destroy(&multifd_recv_state->sem_sync);
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    g_free(multifd_recv_state->data);
+    multifd_recv_state->data = NULL;
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 }
@@ -1251,11 +1322,11 @@  static void *multifd_recv_thread(void *opaque)
         bool has_data = false;
         p->normal_num = 0;
 
-        if (multifd_recv_should_exit()) {
-            break;
-        }
-
         if (use_packets) {
+            if (multifd_recv_should_exit()) {
+                break;
+            }
+
             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
                                            p->packet_len, &local_err);
             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
@@ -1274,6 +1345,26 @@  static void *multifd_recv_thread(void *opaque)
             p->flags &= ~MULTIFD_FLAG_SYNC;
             has_data = !!p->normal_num;
             qemu_mutex_unlock(&p->mutex);
+        } else {
+            /*
+             * No packets, so we need to wait for the vmstate code to
+             * give us work.
+             */
+            qemu_sem_wait(&p->sem);
+
+            if (multifd_recv_should_exit()) {
+                break;
+            }
+
+            /*
+             * Migration thread did not send work, break and signal
+             * sem_sync so it knows we're not lagging behind.
+             */
+            if (!qatomic_read(&p->pending_job)) {
+                break;
+            }
+
+            has_data = !!p->data->size;
         }
 
         if (has_data) {
@@ -1288,9 +1379,17 @@  static void *multifd_recv_thread(void *opaque)
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);
             }
+        } else {
+            p->total_normal_pages += p->data->size / qemu_target_page_size();
+            p->data->size = 0;
+            qatomic_set(&p->pending_job, false);
         }
     }
 
+    if (!use_packets) {
+        qemu_sem_post(&p->sem_sync);
+    }
+
     if (local_err) {
         multifd_recv_terminate_threads(local_err);
         error_free(local_err);
@@ -1320,6 +1419,10 @@  int multifd_recv_setup(Error **errp)
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
+
+    multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
+    multifd_recv_state->data->size = 0;
+
     qatomic_set(&multifd_recv_state->count, 0);
     qatomic_set(&multifd_recv_state->exiting, 0);
     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
@@ -1330,8 +1433,13 @@  int multifd_recv_setup(Error **errp)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem_sync, 0);
+        qemu_sem_init(&p->sem, 0);
+        p->pending_job = false;
         p->id = i;
 
+        p->data = g_new0(MultiFDRecvData, 1);
+        p->data->size = 0;
+
         if (use_packets) {
             p->packet_len = sizeof(MultiFDPacket_t)
                 + sizeof(uint64_t) * page_count;
diff --git a/migration/multifd.h b/migration/multifd.h
index 9a6a7a72df..19188815a3 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@ 
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+typedef struct MultiFDRecvData MultiFDRecvData;
+
 bool multifd_send_setup(void);
 void multifd_send_shutdown(void);
 int multifd_recv_setup(Error **errp);
@@ -23,6 +25,8 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
 void multifd_recv_sync_main(void);
 int multifd_send_sync_main(void);
 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset);
+bool multifd_recv(void);
+MultiFDRecvData *multifd_get_recv_data(void);
 
 /* Multifd Compression flags */
 #define MULTIFD_FLAG_SYNC (1 << 0)
@@ -63,6 +67,13 @@  typedef struct {
     RAMBlock *block;
 } MultiFDPages_t;
 
+struct MultiFDRecvData {
+    void *opaque;
+    size_t size;
+    /* for preadv */
+    off_t file_offset;
+};
+
 typedef struct {
     /* Fields are only written at creating/deletion time */
     /* No lock required for them, they are read only */
@@ -154,6 +165,8 @@  typedef struct {
 
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
@@ -163,6 +176,8 @@  typedef struct {
     uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    int pending_job;
+    MultiFDRecvData *data;
 
     /* thread local variables. No locking required */