diff mbox

[13/13] migration: flush receive queue

Message ID 1477078935-7182-14-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Oct. 21, 2016, 7:42 p.m. UTC
From: Juan Quintela <quintela@trasno.org>

Each time that we sync the bitmap, it is a possiblity that we receive
a page that is being processed by a different thread.  We fix this
problem just making sure that we wait for all receiving threads to
finish its work before we procedeed with the next stage.

I tried to make a migration command for it, but it don't work because
we sync the bitmap sometimes when we have already sent the beggining
of the section, so I just added a new page flag.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |  1 +
 migration/ram.c               | 40 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+)

Comments

Dr. David Alan Gilbert Oct. 26, 2016, 7:10 p.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> From: Juan Quintela <quintela@trasno.org>
> 
> Each time that we sync the bitmap, it is a possiblity that we receive
> a page that is being processed by a different thread.  We fix this
> problem just making sure that we wait for all receiving threads to
> finish its work before we procedeed with the next stage.
> 
> I tried to make a migration command for it, but it don't work because
> we sync the bitmap sometimes when we have already sent the beggining
> of the section, so I just added a new page flag.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |  1 +
>  migration/ram.c               | 40 ++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 41 insertions(+)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index afdc7ec..49e2ec6 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -251,6 +251,7 @@ void migrate_multifd_send_threads_create(void);
>  void migrate_multifd_send_threads_join(void);
>  void migrate_multifd_recv_threads_create(void);
>  void migrate_multifd_recv_threads_join(void);
> +void qemu_savevm_send_multifd_flush(QEMUFile *f);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/ram.c b/migration/ram.c
> index 9a20f63..bf2022e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -69,6 +69,7 @@ static uint64_t bitmap_sync_count;
>  /* 0x80 is reserved in migration.h start with 0x100 next */
>  #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
>  #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
> +#define RAM_SAVE_FLAG_MULTIFD_FLUSH    0x400

Now you really have run out of flags; there are architectures
with 1kB target pages.

However, we really just shouldn't be gobbling these flags up;
the flush could be MULTIFD_PAGE | one of the other flags.

Dave

>  static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
> 
> @@ -398,6 +399,11 @@ void migrate_compress_threads_create(void)
> 
>  /* Multiple fd's */
> 
> +
> +/* Indicates if we have synced the bitmap and we need to assure that
> +   target has processeed all previous pages */
> +bool multifd_needs_flush = false;
> +
>  struct MultiFDSendParams {
>      /* not changed */
>      QemuThread thread;
> @@ -713,6 +719,25 @@ static void multifd_recv_page(uint8_t *address, int fd_num)
>      qemu_mutex_unlock(&params->mutex);
>  }
> 
> +
> +static int multifd_flush(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()) {
> +        return 0;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_recv_mutex);
> +    for (i = 0; i < thread_count; i++) {
> +        while(!multifd_recv[i].done) {
> +            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_recv_mutex);
> +    return 0;
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> @@ -729,6 +754,11 @@ static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
>  {
>      size_t size, len;
> 
> +    if (multifd_needs_flush) {
> +        offset |= RAM_SAVE_FLAG_MULTIFD_FLUSH;
> +        multifd_needs_flush = false;
> +    }
> +
>      qemu_put_be64(f, offset);
>      size = 8;
> 
> @@ -2399,6 +2429,9 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
> 
>      if (!migration_in_postcopy(migrate_get_current())) {
>          migration_bitmap_sync();
> +        if (migrate_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>      }
> 
>      ram_control_before_iterate(f, RAM_CONTROL_FINISH);
> @@ -2440,6 +2473,9 @@ static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
>          qemu_mutex_lock_iothread();
>          rcu_read_lock();
>          migration_bitmap_sync();
> +        if (migrate_multifd()) {
> +            multifd_needs_flush = true;
> +        }
>          rcu_read_unlock();
>          qemu_mutex_unlock_iothread();
>          remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
> @@ -2851,6 +2887,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          flags = addr & ~TARGET_PAGE_MASK;
>          addr &= TARGET_PAGE_MASK;
> 
> +        if (flags & RAM_SAVE_FLAG_MULTIFD_FLUSH) {
> +            multifd_flush();
> +            flags = flags & (~RAM_SAVE_FLAG_MULTIFD_FLUSH);
> +        }
>          if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
>                       RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
>                       RAM_SAVE_FLAG_MULTIFD_PAGE)) {
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index afdc7ec..49e2ec6 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -251,6 +251,7 @@  void migrate_multifd_send_threads_create(void);
 void migrate_multifd_send_threads_join(void);
 void migrate_multifd_recv_threads_create(void);
 void migrate_multifd_recv_threads_join(void);
+void qemu_savevm_send_multifd_flush(QEMUFile *f);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/ram.c b/migration/ram.c
index 9a20f63..bf2022e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -69,6 +69,7 @@  static uint64_t bitmap_sync_count;
 /* 0x80 is reserved in migration.h start with 0x100 next */
 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
 #define RAM_SAVE_FLAG_MULTIFD_PAGE     0x200
+#define RAM_SAVE_FLAG_MULTIFD_FLUSH    0x400

 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];

@@ -398,6 +399,11 @@  void migrate_compress_threads_create(void)

 /* Multiple fd's */

+
+/* Indicates if we have synced the bitmap and we need to assure that
+   target has processeed all previous pages */
+bool multifd_needs_flush = false;
+
 struct MultiFDSendParams {
     /* not changed */
     QemuThread thread;
@@ -713,6 +719,25 @@  static void multifd_recv_page(uint8_t *address, int fd_num)
     qemu_mutex_unlock(&params->mutex);
 }

+
+static int multifd_flush(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()) {
+        return 0;
+    }
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_recv_mutex);
+    for (i = 0; i < thread_count; i++) {
+        while(!multifd_recv[i].done) {
+            qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_recv_mutex);
+    return 0;
+}
+
 /**
  * save_page_header: Write page header to wire
  *
@@ -729,6 +754,11 @@  static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
 {
     size_t size, len;

+    if (multifd_needs_flush) {
+        offset |= RAM_SAVE_FLAG_MULTIFD_FLUSH;
+        multifd_needs_flush = false;
+    }
+
     qemu_put_be64(f, offset);
     size = 8;

@@ -2399,6 +2429,9 @@  static int ram_save_complete(QEMUFile *f, void *opaque)

     if (!migration_in_postcopy(migrate_get_current())) {
         migration_bitmap_sync();
+        if (migrate_multifd()) {
+            multifd_needs_flush = true;
+        }
     }

     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
@@ -2440,6 +2473,9 @@  static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
         qemu_mutex_lock_iothread();
         rcu_read_lock();
         migration_bitmap_sync();
+        if (migrate_multifd()) {
+            multifd_needs_flush = true;
+        }
         rcu_read_unlock();
         qemu_mutex_unlock_iothread();
         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
@@ -2851,6 +2887,10 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
         flags = addr & ~TARGET_PAGE_MASK;
         addr &= TARGET_PAGE_MASK;

+        if (flags & RAM_SAVE_FLAG_MULTIFD_FLUSH) {
+            multifd_flush();
+            flags = flags & (~RAM_SAVE_FLAG_MULTIFD_FLUSH);
+        }
         if (flags & (RAM_SAVE_FLAG_COMPRESS | RAM_SAVE_FLAG_PAGE |
                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE |
                      RAM_SAVE_FLAG_MULTIFD_PAGE)) {