diff mbox

[05/13] migration: create multifd migration threads

Message ID 1477078935-7182-6-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Oct. 21, 2016, 7:42 p.m. UTC
Creation of the threads, nothing inside yet.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 include/migration/migration.h |   4 ++
 migration/migration.c         |   6 ++
 migration/ram.c               | 148 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 158 insertions(+)

Comments

Dr. David Alan Gilbert Oct. 26, 2016, 6:43 p.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> Creation of the threads, nothing inside yet.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  include/migration/migration.h |   4 ++
>  migration/migration.c         |   6 ++
>  migration/ram.c               | 148 ++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 158 insertions(+)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 709355e..80ab8c0 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -241,6 +241,10 @@ bool migration_in_postcopy_after_devices(MigrationState *);
>  MigrationState *migrate_get_current(void);
> 
>  int migrate_multifd_threads(void);
> +void migrate_multifd_send_threads_create(void);
> +void migrate_multifd_send_threads_join(void);
> +void migrate_multifd_recv_threads_create(void);
> +void migrate_multifd_recv_threads_join(void);
> 
>  void migrate_compress_threads_create(void);
>  void migrate_compress_threads_join(void);
> diff --git a/migration/migration.c b/migration/migration.c
> index 217ccbc..a4615f5 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -336,6 +336,7 @@ static void process_incoming_migration_bh(void *opaque)
>                            MIGRATION_STATUS_FAILED);
>          error_report_err(local_err);
>          migrate_decompress_threads_join();
> +        migrate_multifd_recv_threads_join();
>          exit(EXIT_FAILURE);
>      }
> 
> @@ -360,6 +361,7 @@ static void process_incoming_migration_bh(void *opaque)
>          runstate_set(global_state_get_runstate());
>      }
>      migrate_decompress_threads_join();
> +    migrate_multifd_recv_threads_join();
>      /*
>       * This must happen after any state changes since as soon as an external
>       * observer sees this event they might start to prod at the VM assuming
> @@ -413,6 +415,7 @@ static void process_incoming_migration_co(void *opaque)
>                            MIGRATION_STATUS_FAILED);
>          error_report("load of migration failed: %s", strerror(-ret));
>          migrate_decompress_threads_join();
> +        migrate_multifd_recv_threads_join();
>          exit(EXIT_FAILURE);
>      }
> 
> @@ -425,6 +428,7 @@ void migration_fd_process_incoming(QEMUFile *f)
>      Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);
> 
>      migrate_decompress_threads_create();
> +    migrate_multifd_recv_threads_create();
>      qemu_file_set_blocking(f, false);
>      qemu_coroutine_enter(co);
>  }
> @@ -916,6 +920,7 @@ static void migrate_fd_cleanup(void *opaque)
>          qemu_mutex_lock_iothread();
> 
>          migrate_compress_threads_join();
> +        migrate_multifd_send_threads_join();
>          qemu_fclose(s->to_dst_file);
>          s->to_dst_file = NULL;
>      }
> @@ -1922,6 +1927,7 @@ void migrate_fd_connect(MigrationState *s)
>      }
> 
>      migrate_compress_threads_create();
> +    migrate_multifd_send_threads_create();
>      qemu_thread_create(&s->thread, "migration", migration_thread, s,
>                         QEMU_THREAD_JOINABLE);
>      s->migration_thread_running = true;
> diff --git a/migration/ram.c b/migration/ram.c
> index 495a931..78d400e 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -389,6 +389,154 @@ void migrate_compress_threads_create(void)
>      }
>  }
> 
> +/* Multiple fd's */
> +
> +struct MultiFDSendParams {
> +    QemuThread thread;
> +    QemuCond cond;
> +    QemuMutex mutex;
> +    bool quit;
> +};
> +typedef struct MultiFDSendParams MultiFDSendParams;
> +
> +static MultiFDSendParams *multifd_send;
> +
> +static void *multifd_send_thread(void *opaque)
> +{
> +    MultiFDSendParams *params = opaque;
> +
> +    qemu_mutex_lock(&params->mutex);
> +    while (!params->quit){
> +        qemu_cond_wait(&params->cond, &params->mutex);
> +    }
> +    qemu_mutex_unlock(&params->mutex);
> +
> +    return NULL;
> +}
> +
> +static void terminate_multifd_send_threads(void)
> +{
> +    int i, thread_count;
> +
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&multifd_send[i].mutex);
> +        multifd_send[i].quit = true;
> +        qemu_cond_signal(&multifd_send[i].cond);
> +        qemu_mutex_unlock(&multifd_send[i].mutex);
> +    }
> +}
> +
> +void migrate_multifd_send_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){

You've missed the space prior to the  { (and then copied
it everywhere in this patch).

> +        return;
> +    }
> +    terminate_multifd_send_threads();
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(&multifd_send[i].thread);
> +        qemu_mutex_destroy(&multifd_send[i].mutex);
> +        qemu_cond_destroy(&multifd_send[i].cond);
> +    }
> +    g_free(multifd_send);
> +    multifd_send = NULL;
> +}
> +
> +void migrate_multifd_send_threads_create(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_init(&multifd_send[i].mutex);
> +        qemu_cond_init(&multifd_send[i].cond);
> +        multifd_send[i].quit = false;
> +        qemu_thread_create(&multifd_send[i].thread, "multifd_send",

You could make the name of the thread include the thread number,
that way you could easily see in top if any one of the threads
was getting particularly busy (although be careful with the length
I think linux will ignore the name if it's over 14 characters).

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

> +                           multifd_send_thread, &multifd_send[i],
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
> +struct MultiFDRecvParams {
> +    QemuThread thread;
> +    QemuCond cond;
> +    QemuMutex mutex;
> +    bool quit;
> +};
> +typedef struct MultiFDRecvParams MultiFDRecvParams;
> +
> +static MultiFDRecvParams *multifd_recv;
> +
> +static void *multifd_recv_thread(void *opaque)
> +{
> +    MultiFDRecvParams *params = opaque;
> + 
> +    qemu_mutex_lock(&params->mutex);
> +    while (!params->quit){
> +        qemu_cond_wait(&params->cond, &params->mutex);
> +    }
> +    qemu_mutex_unlock(&params->mutex);
> +
> +    return NULL;
> +}
> +
> +static void terminate_multifd_recv_threads(void)
> +{
> +    int i, thread_count;
> +
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_lock(&multifd_recv[i].mutex);
> +        multifd_recv[i].quit = true;
> +        qemu_cond_signal(&multifd_recv[i].cond);
> +        qemu_mutex_unlock(&multifd_recv[i].mutex);
> +    }
> +}
> +
> +void migrate_multifd_recv_threads_join(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    terminate_multifd_recv_threads();
> +    thread_count = migrate_multifd_threads();
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_thread_join(&multifd_recv[i].thread);
> +        qemu_mutex_destroy(&multifd_recv[i].mutex);
> +        qemu_cond_destroy(&multifd_recv[i].cond);
> +    }
> +    g_free(multifd_recv);
> +    multifd_recv = NULL;
> +}
> +
> +void migrate_multifd_recv_threads_create(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_multifd()){
> +        return;
> +    }
> +    thread_count = migrate_multifd_threads();
> +    multifd_recv = g_new0(MultiFDRecvParams, thread_count);
> +    for (i = 0; i < thread_count; i++) {
> +        qemu_mutex_init(&multifd_recv[i].mutex);
> +        qemu_cond_init(&multifd_recv[i].cond);
> +        multifd_recv[i].quit = false;
> +        qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
> +                           multifd_recv_thread, &multifd_recv[i],
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +}
> +
>  /**
>   * save_page_header: Write page header to wire
>   *
> -- 
> 2.7.4
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela Jan. 23, 2017, 5:15 p.m. UTC | #2
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> Creation of the threads, nothing inside yet.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>

>> +void migrate_multifd_send_threads_join(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_multifd()){
>
> You've missed the space prior to the  { (and then copied
> it everywhere in this patch).

Fixed.  As guessed, copy & paste O:-)


>> +    thread_count = migrate_multifd_threads();
>> +    multifd_send = g_new0(MultiFDSendParams, thread_count);
>> +    for (i = 0; i < thread_count; i++) {
>> +        qemu_mutex_init(&multifd_send[i].mutex);
>> +        qemu_cond_init(&multifd_send[i].cond);
>> +        multifd_send[i].quit = false;
>> +        qemu_thread_create(&multifd_send[i].thread, "multifd_send",
>
> You could make the name of the thread include the thread number,
> that way you could easily see in top if any one of the threads
> was getting particularly busy (although be careful with the length
> I think linux will ignore the name if it's over 14 characters).

Until 99 thread will be show correctly :p
diff mbox

Patch

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 709355e..80ab8c0 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -241,6 +241,10 @@  bool migration_in_postcopy_after_devices(MigrationState *);
 MigrationState *migrate_get_current(void);

 int migrate_multifd_threads(void);
+void migrate_multifd_send_threads_create(void);
+void migrate_multifd_send_threads_join(void);
+void migrate_multifd_recv_threads_create(void);
+void migrate_multifd_recv_threads_join(void);

 void migrate_compress_threads_create(void);
 void migrate_compress_threads_join(void);
diff --git a/migration/migration.c b/migration/migration.c
index 217ccbc..a4615f5 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -336,6 +336,7 @@  static void process_incoming_migration_bh(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report_err(local_err);
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -360,6 +361,7 @@  static void process_incoming_migration_bh(void *opaque)
         runstate_set(global_state_get_runstate());
     }
     migrate_decompress_threads_join();
+    migrate_multifd_recv_threads_join();
     /*
      * This must happen after any state changes since as soon as an external
      * observer sees this event they might start to prod at the VM assuming
@@ -413,6 +415,7 @@  static void process_incoming_migration_co(void *opaque)
                           MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
         migrate_decompress_threads_join();
+        migrate_multifd_recv_threads_join();
         exit(EXIT_FAILURE);
     }

@@ -425,6 +428,7 @@  void migration_fd_process_incoming(QEMUFile *f)
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, f);

     migrate_decompress_threads_create();
+    migrate_multifd_recv_threads_create();
     qemu_file_set_blocking(f, false);
     qemu_coroutine_enter(co);
 }
@@ -916,6 +920,7 @@  static void migrate_fd_cleanup(void *opaque)
         qemu_mutex_lock_iothread();

         migrate_compress_threads_join();
+        migrate_multifd_send_threads_join();
         qemu_fclose(s->to_dst_file);
         s->to_dst_file = NULL;
     }
@@ -1922,6 +1927,7 @@  void migrate_fd_connect(MigrationState *s)
     }

     migrate_compress_threads_create();
+    migrate_multifd_send_threads_create();
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
     s->migration_thread_running = true;
diff --git a/migration/ram.c b/migration/ram.c
index 495a931..78d400e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -389,6 +389,154 @@  void migrate_compress_threads_create(void)
     }
 }

+/* Multiple fd's */
+
+struct MultiFDSendParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDSendParams MultiFDSendParams;
+
+static MultiFDSendParams *multifd_send;
+
+static void *multifd_send_thread(void *opaque)
+{
+    MultiFDSendParams *params = opaque;
+
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_send_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_send[i].mutex);
+        multifd_send[i].quit = true;
+        qemu_cond_signal(&multifd_send[i].cond);
+        qemu_mutex_unlock(&multifd_send[i].mutex);
+    }
+}
+
+void migrate_multifd_send_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    terminate_multifd_send_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_send[i].thread);
+        qemu_mutex_destroy(&multifd_send[i].mutex);
+        qemu_cond_destroy(&multifd_send[i].cond);
+    }
+    g_free(multifd_send);
+    multifd_send = NULL;
+}
+
+void migrate_multifd_send_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_send = g_new0(MultiFDSendParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&multifd_send[i].mutex);
+        qemu_cond_init(&multifd_send[i].cond);
+        multifd_send[i].quit = false;
+        qemu_thread_create(&multifd_send[i].thread, "multifd_send",
+                           multifd_send_thread, &multifd_send[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
+struct MultiFDRecvParams {
+    QemuThread thread;
+    QemuCond cond;
+    QemuMutex mutex;
+    bool quit;
+};
+typedef struct MultiFDRecvParams MultiFDRecvParams;
+
+static MultiFDRecvParams *multifd_recv;
+
+static void *multifd_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *params = opaque;
+ 
+    qemu_mutex_lock(&params->mutex);
+    while (!params->quit){
+        qemu_cond_wait(&params->cond, &params->mutex);
+    }
+    qemu_mutex_unlock(&params->mutex);
+
+    return NULL;
+}
+
+static void terminate_multifd_recv_threads(void)
+{
+    int i, thread_count;
+
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_lock(&multifd_recv[i].mutex);
+        multifd_recv[i].quit = true;
+        qemu_cond_signal(&multifd_recv[i].cond);
+        qemu_mutex_unlock(&multifd_recv[i].mutex);
+    }
+}
+
+void migrate_multifd_recv_threads_join(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    terminate_multifd_recv_threads();
+    thread_count = migrate_multifd_threads();
+    for (i = 0; i < thread_count; i++) {
+        qemu_thread_join(&multifd_recv[i].thread);
+        qemu_mutex_destroy(&multifd_recv[i].mutex);
+        qemu_cond_destroy(&multifd_recv[i].cond);
+    }
+    g_free(multifd_recv);
+    multifd_recv = NULL;
+}
+
+void migrate_multifd_recv_threads_create(void)
+{
+    int i, thread_count;
+
+    if (!migrate_multifd()){
+        return;
+    }
+    thread_count = migrate_multifd_threads();
+    multifd_recv = g_new0(MultiFDRecvParams, thread_count);
+    for (i = 0; i < thread_count; i++) {
+        qemu_mutex_init(&multifd_recv[i].mutex);
+        qemu_cond_init(&multifd_recv[i].cond);
+        multifd_recv[i].quit = false;
+        qemu_thread_create(&multifd_recv[i].thread, "multifd_recv",
+                           multifd_recv_thread, &multifd_recv[i],
+                           QEMU_THREAD_JOINABLE);
+    }
+}
+
 /**
  * save_page_header: Write page header to wire
  *