diff mbox series

[RFC,03/14] migration/rdma: Create multiFd migration threads

Message ID 20200213093755.370-4-fengzhimin1@huawei.com (mailing list archive)
State New, archived
Headers show
Series *** multifd for RDMA v2 *** | expand

Commit Message

fengzhimin Feb. 13, 2020, 9:37 a.m. UTC
Creation of the multifd send threads for RDMA migration,
nothing inside yet.

Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
---
 migration/multifd.c   | 33 +++++++++++++---
 migration/multifd.h   |  2 +
 migration/qemu-file.c |  5 +++
 migration/qemu-file.h |  1 +
 migration/rdma.c      | 88 ++++++++++++++++++++++++++++++++++++++++++-
 migration/rdma.h      |  3 ++
 6 files changed, 125 insertions(+), 7 deletions(-)

Comments

Juan Quintela Feb. 13, 2020, 10:12 a.m. UTC | #1
Zhimin Feng <fengzhimin1@huawei.com> wrote:
> Creation of the multifd send threads for RDMA migration,
> nothing inside yet.
>
> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
> ---
>  migration/multifd.c   | 33 +++++++++++++---
>  migration/multifd.h   |  2 +
>  migration/qemu-file.c |  5 +++
>  migration/qemu-file.h |  1 +
>  migration/rdma.c      | 88 ++++++++++++++++++++++++++++++++++++++++++-
>  migration/rdma.h      |  3 ++
>  6 files changed, 125 insertions(+), 7 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b3e8ae9bcc..63678d7fdd 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)
>  {
>      int i;
>  
> -    if (!migrate_use_multifd()) {
> +    if (!migrate_use_multifd() || migrate_use_rdma()) {

You don't need sync with main channel on rdma?

> +static void rdma_send_channel_create(MultiFDSendParams *p)
> +{
> +    Error *local_err = NULL;
> +
> +    if (p->quit) {
> +        error_setg(&local_err, "multifd: send id %d already quit", p->id);
> +        return ;
> +    }
> +    p->running = true;
> +
> +    qemu_thread_create(&p->thread, p->name, multifd_rdma_send_thread, p,
> +                       QEMU_THREAD_JOINABLE);
> +}
> +
>  static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>  {
>      MultiFDSendParams *p = opaque;
> @@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
>          p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>          p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        socket_send_channel_create(multifd_new_send_channel_async, p);
> +        if (!migrate_use_rdma()) {
> +            socket_send_channel_create(multifd_new_send_channel_async, p);
> +        } else {
> +            rdma_send_channel_create(p);
> +        }

This is what we are trying to avoid.  Just create a struct ops, where we
have a

ops->create_channel(new_channel_async, p)

or whatever, and fill it differently for rdma and for tcp.


>      }
>      return 0;
>  }
> @@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)
>  {
>      int i;
>  
> -    if (!migrate_use_multifd()) {
> +    if (!migrate_use_multifd() || migrate_use_rdma()) {
>          return;
>      }

Ok. you can just put an empty function for you here.

>      for (i = 0; i < migrate_multifd_channels(); i++) {
> @@ -890,8 +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>      p->num_packets = 1;
>  
>      p->running = true;
> -    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                       QEMU_THREAD_JOINABLE);
> +    if (!migrate_use_rdma()) {
> +        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +    } else {
> +        qemu_thread_create(&p->thread, p->name, multifd_rdma_recv_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +    }

new_recv_chanel() member function.

>      atomic_inc(&multifd_recv_state->count);
>      return atomic_read(&multifd_recv_state->count) ==
>             migrate_multifd_channels();
> diff --git a/migration/multifd.h b/migration/multifd.h
> index d8b0205977..c9c11ad140 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +#include "migration/rdma.h"
> +
>  int multifd_save_setup(Error **errp);
>  void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);

You are not exporting anything rdma related from here, are you?

> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 1c3a358a14..f0ed8f1381 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
>      f->iovcnt = 0;
>  }
>  
> +void *getQIOChannel(QEMUFile *f)
> +{
> +    return f->opaque;
> +}
> +

We really want this to return a void?  and not a better type?
> +static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
> +{
> +    MigrationIncomingState *mis = migration_incoming_get_current();
> +    Error *local_err = NULL;
> +    QIOChannel *ioc = NULL;
> +    bool start_migration;
> +
> +    if (!mis->from_src_file) {
> +        mis->from_src_file = f;
> +        qemu_file_set_blocking(f, false);
> +
> +        start_migration = migrate_use_multifd();
> +    } else {
> +        ioc = QIO_CHANNEL(getQIOChannel(f));
> +        /* Multiple connections */
> +        assert(migrate_use_multifd());

I am not sure that you can make this incompatible change.
You need to have *both*, old method and new multifd one.

I would have been happy to remove old precopy tcp method, but we
*assure* backwards compatibility.

> @@ -4003,8 +4032,12 @@ static void rdma_accept_incoming_migration(void *opaque)
>          return;
>      }
>  
> -    rdma->migration_started_on_destination = 1;
> -    migration_fd_process_incoming(f, errp);
> +    if (migrate_use_multifd()) {
> +        migration_rdma_process_incoming(f, errp);
> +    } else {
> +        rdma->migration_started_on_destination = 1;
> +        migration_fd_process_incoming(f, errp);
> +    }

But here you allow that multifd is not defined?




> +
> +void *multifd_rdma_recv_thread(void *opaque)
> +{

Why can't you use the multifd_recv_thread() directly, just creating
different ops when you need them?

Later, Juan.
fengzhimin Feb. 14, 2020, 9:51 a.m. UTC | #2
Thanks for your review. I will fix these errors in the next version(V3).

Due to migration data transfer using RDMA WRITE operation, we don't need to receive data in the destination.
We only need to poll the CQE in the destination, so multifd_recv_thread() can't be used directly.

-----Original Message-----
From: Juan Quintela [mailto:quintela@redhat.com] 
Sent: Thursday, February 13, 2020 6:13 PM
To: fengzhimin <fengzhimin1@huawei.com>
Cc: dgilbert@redhat.com; armbru@redhat.com; eblake@redhat.com; qemu-devel@nongnu.org; Zhanghailiang <zhang.zhanghailiang@huawei.com>; jemmy858585@gmail.com
Subject: Re: [PATCH RFC 03/14] migration/rdma: Create multiFd migration threads

Zhimin Feng <fengzhimin1@huawei.com> wrote:
> Creation of the multifd send threads for RDMA migration, nothing 
> inside yet.
>
> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
> ---
>  migration/multifd.c   | 33 +++++++++++++---
>  migration/multifd.h   |  2 +
>  migration/qemu-file.c |  5 +++
>  migration/qemu-file.h |  1 +
>  migration/rdma.c      | 88 ++++++++++++++++++++++++++++++++++++++++++-
>  migration/rdma.h      |  3 ++
>  6 files changed, 125 insertions(+), 7 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c index 
> b3e8ae9bcc..63678d7fdd 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)  {
>      int i;
>  
> -    if (!migrate_use_multifd()) {
> +    if (!migrate_use_multifd() || migrate_use_rdma()) {

You don't need sync with main channel on rdma?

> +static void rdma_send_channel_create(MultiFDSendParams *p) {
> +    Error *local_err = NULL;
> +
> +    if (p->quit) {
> +        error_setg(&local_err, "multifd: send id %d already quit", p->id);
> +        return ;
> +    }
> +    p->running = true;
> +
> +    qemu_thread_create(&p->thread, p->name, multifd_rdma_send_thread, p,
> +                       QEMU_THREAD_JOINABLE); }
> +
>  static void multifd_new_send_channel_async(QIOTask *task, gpointer 
> opaque)  {
>      MultiFDSendParams *p = opaque;
> @@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
>          p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>          p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>          p->name = g_strdup_printf("multifdsend_%d", i);
> -        socket_send_channel_create(multifd_new_send_channel_async, p);
> +        if (!migrate_use_rdma()) {
> +            socket_send_channel_create(multifd_new_send_channel_async, p);
> +        } else {
> +            rdma_send_channel_create(p);
> +        }

This is what we are trying to avoid.  Just create a struct ops, where we have a

ops->create_channel(new_channel_async, p)

or whatever, and fill it differently for rdma and for tcp.


>      }
>      return 0;
>  }
> @@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)  {
>      int i;
>  
> -    if (!migrate_use_multifd()) {
> +    if (!migrate_use_multifd() || migrate_use_rdma()) {
>          return;
>      }

Ok. you can just put an empty function for you here.

>      for (i = 0; i < migrate_multifd_channels(); i++) { @@ -890,8 
> +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>      p->num_packets = 1;
>  
>      p->running = true;
> -    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> -                       QEMU_THREAD_JOINABLE);
> +    if (!migrate_use_rdma()) {
> +        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +    } else {
> +        qemu_thread_create(&p->thread, p->name, multifd_rdma_recv_thread, p,
> +                           QEMU_THREAD_JOINABLE);
> +    }

new_recv_chanel() member function.

>      atomic_inc(&multifd_recv_state->count);
>      return atomic_read(&multifd_recv_state->count) ==
>             migrate_multifd_channels(); diff --git 
> a/migration/multifd.h b/migration/multifd.h index 
> d8b0205977..c9c11ad140 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +#include "migration/rdma.h"
> +
>  int multifd_save_setup(Error **errp);  void 
> multifd_save_cleanup(void);  int multifd_load_setup(Error **errp);

You are not exporting anything rdma related from here, are you?

> diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 
> 1c3a358a14..f0ed8f1381 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
>      f->iovcnt = 0;
>  }
>  
> +void *getQIOChannel(QEMUFile *f)
> +{
> +    return f->opaque;
> +}
> +

We really want this to return a void?  and not a better type?
> +static void migration_rdma_process_incoming(QEMUFile *f, Error 
> +**errp) {
> +    MigrationIncomingState *mis = migration_incoming_get_current();
> +    Error *local_err = NULL;
> +    QIOChannel *ioc = NULL;
> +    bool start_migration;
> +
> +    if (!mis->from_src_file) {
> +        mis->from_src_file = f;
> +        qemu_file_set_blocking(f, false);
> +
> +        start_migration = migrate_use_multifd();
> +    } else {
> +        ioc = QIO_CHANNEL(getQIOChannel(f));
> +        /* Multiple connections */
> +        assert(migrate_use_multifd());

I am not sure that you can make this incompatible change.
You need to have *both*, old method and new multifd one.

I would have been happy to remove old precopy tcp method, but we
*assure* backwards compatibility.

> @@ -4003,8 +4032,12 @@ static void rdma_accept_incoming_migration(void *opaque)
>          return;
>      }
>  
> -    rdma->migration_started_on_destination = 1;
> -    migration_fd_process_incoming(f, errp);
> +    if (migrate_use_multifd()) {
> +        migration_rdma_process_incoming(f, errp);
> +    } else {
> +        rdma->migration_started_on_destination = 1;
> +        migration_fd_process_incoming(f, errp);
> +    }

But here you allow that multifd is not defined?




> +
> +void *multifd_rdma_recv_thread(void *opaque) {

Why can't you use the multifd_recv_thread() directly, just creating different ops when you need them?

Later, Juan.
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index b3e8ae9bcc..63678d7fdd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -424,7 +424,7 @@  void multifd_send_sync_main(QEMUFile *f)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_use_multifd() || migrate_use_rdma()) {
         return;
     }
     if (multifd_send_state->pages->used) {
@@ -562,6 +562,20 @@  out:
     return NULL;
 }
 
+static void rdma_send_channel_create(MultiFDSendParams *p)
+{
+    Error *local_err = NULL;
+
+    if (p->quit) {
+        error_setg(&local_err, "multifd: send id %d already quit", p->id);
+        return ;
+    }
+    p->running = true;
+
+    qemu_thread_create(&p->thread, p->name, multifd_rdma_send_thread, p,
+                       QEMU_THREAD_JOINABLE);
+}
+
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
     MultiFDSendParams *p = opaque;
@@ -621,7 +635,11 @@  int multifd_save_setup(Error **errp)
         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
         p->name = g_strdup_printf("multifdsend_%d", i);
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        if (!migrate_use_rdma()) {
+            socket_send_channel_create(multifd_new_send_channel_async, p);
+        } else {
+            rdma_send_channel_create(p);
+        }
     }
     return 0;
 }
@@ -720,7 +738,7 @@  void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_use_multifd() || migrate_use_rdma()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -890,8 +908,13 @@  bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
     p->num_packets = 1;
 
     p->running = true;
-    qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
-                       QEMU_THREAD_JOINABLE);
+    if (!migrate_use_rdma()) {
+        qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
+                           QEMU_THREAD_JOINABLE);
+    } else {
+        qemu_thread_create(&p->thread, p->name, multifd_rdma_recv_thread, p,
+                           QEMU_THREAD_JOINABLE);
+    }
     atomic_inc(&multifd_recv_state->count);
     return atomic_read(&multifd_recv_state->count) ==
            migrate_multifd_channels();
diff --git a/migration/multifd.h b/migration/multifd.h
index d8b0205977..c9c11ad140 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@ 
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+#include "migration/rdma.h"
+
 int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
 int multifd_load_setup(Error **errp);
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1c3a358a14..f0ed8f1381 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -248,6 +248,11 @@  void qemu_fflush(QEMUFile *f)
     f->iovcnt = 0;
 }
 
+void *getQIOChannel(QEMUFile *f)
+{
+    return f->opaque;
+}
+
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
 {
     int ret = 0;
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6ccb7..fc656a3b72 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -161,6 +161,7 @@  int qemu_file_shutdown(QEMUFile *f);
 QEMUFile *qemu_file_get_return_path(QEMUFile *f);
 void qemu_fflush(QEMUFile *f);
 void qemu_file_set_blocking(QEMUFile *f, bool block);
+void *getQIOChannel(QEMUFile *f);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration/rdma.c b/migration/rdma.c
index 2379b8345b..f086ab5a82 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -34,6 +34,7 @@ 
 #include <arpa/inet.h>
 #include <rdma/rdma_cma.h>
 #include "trace.h"
+#include "multifd.h"
 
 /*
  * Print and error on both the Monitor and the Log file.
@@ -3975,6 +3976,34 @@  static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
     return rioc->file;
 }
 
+static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+    Error *local_err = NULL;
+    QIOChannel *ioc = NULL;
+    bool start_migration;
+
+    if (!mis->from_src_file) {
+        mis->from_src_file = f;
+        qemu_file_set_blocking(f, false);
+
+        start_migration = migrate_use_multifd();
+    } else {
+        ioc = QIO_CHANNEL(getQIOChannel(f));
+        /* Multiple connections */
+        assert(migrate_use_multifd());
+        start_migration = multifd_recv_new_channel(ioc, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            return;
+        }
+    }
+
+    if (start_migration) {
+        migration_incoming_process();
+    }
+}
+
 static void rdma_accept_incoming_migration(void *opaque)
 {
     RDMAContext *rdma = opaque;
@@ -4003,8 +4032,12 @@  static void rdma_accept_incoming_migration(void *opaque)
         return;
     }
 
-    rdma->migration_started_on_destination = 1;
-    migration_fd_process_incoming(f, errp);
+    if (migrate_use_multifd()) {
+        migration_rdma_process_incoming(f, errp);
+    } else {
+        rdma->migration_started_on_destination = 1;
+        migration_fd_process_incoming(f, errp);
+    }
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -4048,6 +4081,15 @@  void rdma_start_incoming_migration(const char *host_port, Error **errp)
         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
     }
 
+    if (multifd_load_setup(&local_err) != 0) {
+        /*
+         * We haven't been able to create multifd threads
+         * nothing better to do
+         */
+        error_report_err(local_err);
+        goto err;
+    }
+
     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                         NULL, (void *)(intptr_t)rdma);
     return;
@@ -4118,3 +4160,45 @@  err:
     g_free(rdma);
     g_free(rdma_return_path);
 }
+
+void *multifd_rdma_recv_thread(void *opaque)
+{
+    MultiFDRecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem_sync);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
+void *multifd_rdma_send_thread(void *opaque)
+{
+    MultiFDSendParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
diff --git a/migration/rdma.h b/migration/rdma.h
index de2ba09dc5..3a00573083 100644
--- a/migration/rdma.h
+++ b/migration/rdma.h
@@ -17,6 +17,9 @@ 
 #ifndef QEMU_MIGRATION_RDMA_H
 #define QEMU_MIGRATION_RDMA_H
 
+void *multifd_rdma_recv_thread(void *opaque);
+void *multifd_rdma_send_thread(void *opaque);
+
 void rdma_start_outgoing_migration(void *opaque, const char *host_port,
                                    Error **errp);