diff mbox series

[v4,24/34] migration/multifd: Support incoming fixed-ram stream format

Message ID 20240220224138.24759-25-farosas@suse.de (mailing list archive)
State New, archived
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas Feb. 20, 2024, 10:41 p.m. UTC
For the incoming fixed-ram migration we need to read the ramblock
headers, get the pages bitmap and send the host address of each
non-zero page to the multifd channel thread for writing.

Usage on HMP is:

(qemu) migrate_set_capability multifd on
(qemu) migrate_set_capability fixed-ram on
(qemu) migrate_incoming file:migfile

(the ram.h include needs to move because we've been previously relying
on it being included from migration.c. Now file.h will start including
multifd.h before migration.o is processed)

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/file.c    | 25 ++++++++++++++++++++++++-
 migration/file.h    |  2 ++
 migration/multifd.c | 34 ++++++++++++++++++++++++++++++----
 migration/multifd.h |  2 ++
 migration/ram.c     | 36 +++++++++++++++++++++++++++++++++---
 5 files changed, 91 insertions(+), 8 deletions(-)

Comments

Peter Xu Feb. 26, 2024, 8:30 a.m. UTC | #1
On Tue, Feb 20, 2024 at 07:41:28PM -0300, Fabiano Rosas wrote:
> For the incoming fixed-ram migration we need to read the ramblock
> headers, get the pages bitmap and send the host address of each
> non-zero page to the multifd channel thread for writing.
> 
> Usage on HMP is:
> 
> (qemu) migrate_set_capability multifd on
> (qemu) migrate_set_capability fixed-ram on
> (qemu) migrate_incoming file:migfile
> 
> (the ram.h include needs to move because we've been previously relying
> on it being included from migration.c. Now file.h will start including
> multifd.h before migration.o is processed)
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
>  migration/file.c    | 25 ++++++++++++++++++++++++-
>  migration/file.h    |  2 ++
>  migration/multifd.c | 34 ++++++++++++++++++++++++++++++----
>  migration/multifd.h |  2 ++
>  migration/ram.c     | 36 +++++++++++++++++++++++++++++++++---
>  5 files changed, 91 insertions(+), 8 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 94e8e08363..1a18e608fc 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -13,7 +13,6 @@
>  #include "channel.h"
>  #include "file.h"
>  #include "migration.h"
> -#include "multifd.h"
>  #include "io/channel-file.h"
>  #include "io/channel-util.h"
>  #include "options.h"
> @@ -195,3 +194,27 @@ int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
>  
>      return (ret < 0) ? -1 : 0;
>  }
> +
> +int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp)
> +{
> +    MultiFDRecvData *data = p->data;
> +    size_t ret;
> +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +
> +    if (flags != MULTIFD_FLAG_NOCOMP) {
> +        error_setg(errp, "multifd %u: flags received %x flags expected %x",
> +                   p->id, flags, MULTIFD_FLAG_NOCOMP);
> +        return -1;
> +    }

This chunk can be dropped?  There's no packet in fixed-ram, this check is
no-op only because MULTIFD_FLAG_NOCOMP==0, iiuc.

The check should be done OTOH to make sure fixed-ram don't run together
with any multifd compressions enabled.  I remember we discussed this
before.  Is it still missing?  Note that multifd compression has its own
parameter (rather than the COMPRESS capability), it should be a check
against migrate_multifd_compression()==MULTIFD_COMPRESSION_NONE.

> +
> +    ret = qio_channel_pread(p->c, (char *) data->opaque,
> +                            data->size, data->file_offset, errp);
> +    if (ret != data->size) {
> +        error_prepend(errp,
> +                      "multifd recv (%u): read 0x%zx, expected 0x%zx",
> +                      p->id, ret, data->size);
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> diff --git a/migration/file.h b/migration/file.h
> index 390dcc6821..9fe8af73fc 100644
> --- a/migration/file.h
> +++ b/migration/file.h
> @@ -11,6 +11,7 @@
>  #include "qapi/qapi-types-migration.h"
>  #include "io/task.h"
>  #include "channel.h"
> +#include "multifd.h"
>  
>  void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp);
>  
> @@ -22,4 +23,5 @@ bool file_send_channel_create(gpointer opaque, Error **errp);
>  int file_send_channel_destroy(QIOChannel *ioc);
>  int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
>                              int niov, RAMBlock *block, Error **errp);
> +int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp);
>  #endif
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b251c58ec2..a0202b5661 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -18,7 +18,6 @@
>  #include "qemu/error-report.h"
>  #include "qapi/error.h"
>  #include "file.h"
> -#include "ram.h"
>  #include "migration.h"
>  #include "migration-stats.h"
>  #include "socket.h"
> @@ -251,9 +250,9 @@ static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
>              p->iov[i].iov_len = p->page_size;
>          }
>          return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
> +    } else {
> +        return multifd_file_recv_data(p, errp);
>      }
> -
> -    return 0;
>  }
>  
>  static MultiFDMethods multifd_nocomp_ops = {
> @@ -1317,13 +1316,40 @@ void multifd_recv_cleanup(void)
>      multifd_recv_cleanup_state();
>  }
>  
> +
> +/*
> + * Wait until all channels have finished receiving data. Once this
> + * function returns, cleanup routines are safe to run.
> + */
> +static void multifd_file_recv_sync(void)
> +{
> +    int i;
> +
> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDRecvParams *p = &multifd_recv_state->params[i];
> +
> +        trace_multifd_recv_sync_main_wait(p->id);
> +
> +        qemu_sem_post(&p->sem);
> +
> +        trace_multifd_recv_sync_main_signal(p->id);
> +        qemu_sem_wait(&p->sem_sync);
> +    }
> +    return;
> +}
> +
>  void multifd_recv_sync_main(void)
>  {
>      int i;
>  
> -    if (!migrate_multifd() || !multifd_use_packets()) {
> +    if (!migrate_multifd()) {
>          return;
>      }
> +
> +    if (!multifd_use_packets()) {
> +        return multifd_file_recv_sync();
> +    }
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 135f6ed098..8f89199721 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +#include "ram.h"
> +
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  
>  bool multifd_send_setup(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index ad540ae9ce..826ac745a0 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -111,6 +111,7 @@
>   * pages region in the migration file at a time.
>   */
>  #define FIXED_RAM_LOAD_BUF_SIZE 0x100000
> +#define FIXED_RAM_MULTIFD_LOAD_BUF_SIZE 0x100000
>  
>  XBZRLECacheStats xbzrle_counters;
>  
> @@ -3950,6 +3951,27 @@ void colo_flush_ram_cache(void)
>      trace_colo_flush_ram_cache_end();
>  }
>  
> +static size_t ram_load_multifd_pages(void *host_addr, size_t size,
> +                                     uint64_t offset)
> +{
> +    MultiFDRecvData *data = multifd_get_recv_data();
> +
> +    /*
> +     * Pointing the opaque directly to the host buffer, no
> +     * preprocessing needed.
> +     */
> +    data->opaque = host_addr;
> +

nit: unneeded newline?  There's a similar one in send side.  I'd drop the
comment altogether as it's not extremely helpful.  Maybe we can directly
use data->host_addr already (as it always reads chunk into a host buffer)?

> +    data->file_offset = offset;
> +    data->size = size;
> +
> +    if (!multifd_recv()) {
> +        return 0;
> +    }
> +
> +    return size;
> +}
> +
>  static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
>                                      long num_pages, unsigned long *bitmap,
>                                      Error **errp)
> @@ -3959,6 +3981,8 @@ static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
>      ram_addr_t offset;
>      void *host;
>      size_t read, unread, size;
> +    size_t buf_size = (migrate_multifd() ? FIXED_RAM_MULTIFD_LOAD_BUF_SIZE :
> +                       FIXED_RAM_LOAD_BUF_SIZE);

Are they the same?  Maybe we don't need the new one until we want to make
it different?

>  
>      for (set_bit_idx = find_first_bit(bitmap, num_pages);
>           set_bit_idx < num_pages;
> @@ -3977,10 +4001,16 @@ static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
>                  return false;
>              }
>  
> -            size = MIN(unread, FIXED_RAM_LOAD_BUF_SIZE);
> +            size = MIN(unread, buf_size);
> +
> +            if (migrate_multifd()) {
> +                read = ram_load_multifd_pages(host, size,
> +                                              block->pages_offset + offset);
> +            } else {
> +                read = qemu_get_buffer_at(f, host, size,
> +                                          block->pages_offset + offset);
> +            }
>  
> -            read = qemu_get_buffer_at(f, host, size,
> -                                      block->pages_offset + offset);
>              if (!read) {
>                  goto err;
>              }
> -- 
> 2.35.3
>
diff mbox series

Patch

diff --git a/migration/file.c b/migration/file.c
index 94e8e08363..1a18e608fc 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -13,7 +13,6 @@ 
 #include "channel.h"
 #include "file.h"
 #include "migration.h"
-#include "multifd.h"
 #include "io/channel-file.h"
 #include "io/channel-util.h"
 #include "options.h"
@@ -195,3 +194,27 @@  int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
 
     return (ret < 0) ? -1 : 0;
 }
+
+int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDRecvData *data = p->data;
+    size_t ret;
+    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+
+    if (flags != MULTIFD_FLAG_NOCOMP) {
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_NOCOMP);
+        return -1;
+    }
+
+    ret = qio_channel_pread(p->c, (char *) data->opaque,
+                            data->size, data->file_offset, errp);
+    if (ret != data->size) {
+        error_prepend(errp,
+                      "multifd recv (%u): read 0x%zx, expected 0x%zx",
+                      p->id, ret, data->size);
+        return -1;
+    }
+
+    return 0;
+}
diff --git a/migration/file.h b/migration/file.h
index 390dcc6821..9fe8af73fc 100644
--- a/migration/file.h
+++ b/migration/file.h
@@ -11,6 +11,7 @@ 
 #include "qapi/qapi-types-migration.h"
 #include "io/task.h"
 #include "channel.h"
+#include "multifd.h"
 
 void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp);
 
@@ -22,4 +23,5 @@  bool file_send_channel_create(gpointer opaque, Error **errp);
 int file_send_channel_destroy(QIOChannel *ioc);
 int file_write_ramblock_iov(QIOChannel *ioc, const struct iovec *iov,
                             int niov, RAMBlock *block, Error **errp);
+int multifd_file_recv_data(MultiFDRecvParams *p, Error **errp);
 #endif
diff --git a/migration/multifd.c b/migration/multifd.c
index b251c58ec2..a0202b5661 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -18,7 +18,6 @@ 
 #include "qemu/error-report.h"
 #include "qapi/error.h"
 #include "file.h"
-#include "ram.h"
 #include "migration.h"
 #include "migration-stats.h"
 #include "socket.h"
@@ -251,9 +250,9 @@  static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
             p->iov[i].iov_len = p->page_size;
         }
         return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
+    } else {
+        return multifd_file_recv_data(p, errp);
     }
-
-    return 0;
 }
 
 static MultiFDMethods multifd_nocomp_ops = {
@@ -1317,13 +1316,40 @@  void multifd_recv_cleanup(void)
     multifd_recv_cleanup_state();
 }
 
+
+/*
+ * Wait until all channels have finished receiving data. Once this
+ * function returns, cleanup routines are safe to run.
+ */
+static void multifd_file_recv_sync(void)
+{
+    int i;
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDRecvParams *p = &multifd_recv_state->params[i];
+
+        trace_multifd_recv_sync_main_wait(p->id);
+
+        qemu_sem_post(&p->sem);
+
+        trace_multifd_recv_sync_main_signal(p->id);
+        qemu_sem_wait(&p->sem_sync);
+    }
+    return;
+}
+
 void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_multifd() || !multifd_use_packets()) {
+    if (!migrate_multifd()) {
         return;
     }
+
+    if (!multifd_use_packets()) {
+        return multifd_file_recv_sync();
+    }
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
diff --git a/migration/multifd.h b/migration/multifd.h
index 135f6ed098..8f89199721 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@ 
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+#include "ram.h"
+
 typedef struct MultiFDRecvData MultiFDRecvData;
 
 bool multifd_send_setup(void);
diff --git a/migration/ram.c b/migration/ram.c
index ad540ae9ce..826ac745a0 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -111,6 +111,7 @@ 
  * pages region in the migration file at a time.
  */
 #define FIXED_RAM_LOAD_BUF_SIZE 0x100000
+#define FIXED_RAM_MULTIFD_LOAD_BUF_SIZE 0x100000
 
 XBZRLECacheStats xbzrle_counters;
 
@@ -3950,6 +3951,27 @@  void colo_flush_ram_cache(void)
     trace_colo_flush_ram_cache_end();
 }
 
+static size_t ram_load_multifd_pages(void *host_addr, size_t size,
+                                     uint64_t offset)
+{
+    MultiFDRecvData *data = multifd_get_recv_data();
+
+    /*
+     * Pointing the opaque directly to the host buffer, no
+     * preprocessing needed.
+     */
+    data->opaque = host_addr;
+
+    data->file_offset = offset;
+    data->size = size;
+
+    if (!multifd_recv()) {
+        return 0;
+    }
+
+    return size;
+}
+
 static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
                                     long num_pages, unsigned long *bitmap,
                                     Error **errp)
@@ -3959,6 +3981,8 @@  static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
     ram_addr_t offset;
     void *host;
     size_t read, unread, size;
+    size_t buf_size = (migrate_multifd() ? FIXED_RAM_MULTIFD_LOAD_BUF_SIZE :
+                       FIXED_RAM_LOAD_BUF_SIZE);
 
     for (set_bit_idx = find_first_bit(bitmap, num_pages);
          set_bit_idx < num_pages;
@@ -3977,10 +4001,16 @@  static bool read_ramblock_fixed_ram(QEMUFile *f, RAMBlock *block,
                 return false;
             }
 
-            size = MIN(unread, FIXED_RAM_LOAD_BUF_SIZE);
+            size = MIN(unread, buf_size);
+
+            if (migrate_multifd()) {
+                read = ram_load_multifd_pages(host, size,
+                                              block->pages_offset + offset);
+            } else {
+                read = qemu_get_buffer_at(f, host, size,
+                                          block->pages_offset + offset);
+            }
 
-            read = qemu_get_buffer_at(f, host, size,
-                                      block->pages_offset + offset);
             if (!read) {
                 goto err;
             }