diff mbox series

[RFC,v1,16/26] migration/multifd: Allow multifd without packets

Message ID 20230330180336.2791-17-farosas@suse.de (mailing list archive)
State New, archived
Headers show
Series migration: File based migration with multifd and fixed-ram | expand

Commit Message

Fabiano Rosas March 30, 2023, 6:03 p.m. UTC
For the upcoming support to the new 'fixed-ram' migration stream
format, we cannot use multifd packets because each write into the
ramblock section in the migration file is expected to contain only the
guest pages. They are written at their respective offsets relative to
the ramblock section header.

There is no space for the packet information and the expected gains
from the new approach come partly from being able to write the pages
sequentially without extraneous data in between.

The new format also doesn't need the packets and all necessary
information can be taken from the standard migration headers with some
(future) changes to multifd code.

Use the presence of the fixed-ram capability to decide whether to send
packets. For now this has no effect as fixed-ram cannot yet be enabled
with multifd.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/migration.c |   5 ++
 migration/migration.h |   2 +-
 migration/multifd.c   | 119 ++++++++++++++++++++++++++----------------
 3 files changed, 80 insertions(+), 46 deletions(-)
diff mbox series

Patch

diff --git a/migration/migration.c b/migration/migration.c
index 17b26c1808..c647fbffa6 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2764,6 +2764,11 @@  int migrate_fixed_ram(void)
     return migrate_get_current()->enabled_capabilities[MIGRATION_CAPABILITY_FIXED_RAM];
 }
 
+bool migrate_multifd_use_packets(void)
+{
+    return !migrate_fixed_ram();
+}
+
 int migrate_multifd_zlib_level(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index 01c8201cfa..d7a014ce57 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -421,7 +421,7 @@  bool migrate_dirty_bitmaps(void);
 bool migrate_ignore_shared(void);
 bool migrate_validate_uuid(void);
 int migrate_fixed_ram(void);
-
+bool migrate_multifd_use_packets(void);
 bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 bool migrate_pause_before_switchover(void);
diff --git a/migration/multifd.c b/migration/multifd.c
index e613d85e24..9f6b2787ed 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -659,18 +659,22 @@  static void *multifd_send_thread(void *opaque)
     Error *local_err = NULL;
     int ret = 0;
     bool use_zero_copy_send = migrate_use_zero_copy_send();
+    bool use_packets = migrate_multifd_use_packets();
 
     thread = MigrationThreadAdd(p->name, qemu_get_thread_id());
 
     trace_multifd_send_thread_start(p->id);
     rcu_register_thread();
 
-    if (multifd_send_initial_packet(p, &local_err) < 0) {
-        ret = -1;
-        goto out;
+    if (use_packets) {
+        if (multifd_send_initial_packet(p, &local_err) < 0) {
+            ret = -1;
+            goto out;
+        }
+
+        /* initial packet */
+        p->num_packets = 1;
     }
-    /* initial packet */
-    p->num_packets = 1;
 
     while (true) {
         qemu_sem_wait(&p->sem);
@@ -681,11 +685,10 @@  static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            uint64_t packet_num = p->packet_num;
             uint32_t flags;
             p->normal_num = 0;
 
-            if (use_zero_copy_send) {
+            if (!use_packets || use_zero_copy_send) {
                 p->iovs_num = 0;
             } else {
                 p->iovs_num = 1;
@@ -703,16 +706,20 @@  static void *multifd_send_thread(void *opaque)
                     break;
                 }
             }
-            multifd_send_fill_packet(p);
+
+            if (use_packets) {
+                multifd_send_fill_packet(p);
+                p->num_packets++;
+            }
+
             flags = p->flags;
             p->flags = 0;
-            p->num_packets++;
             p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+            trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
             if (use_zero_copy_send) {
@@ -722,7 +729,7 @@  static void *multifd_send_thread(void *opaque)
                 if (ret != 0) {
                     break;
                 }
-            } else {
+            } else if (use_packets) {
                 /* Send header using the same writev call */
                 p->iov[0].iov_len = p->packet_len;
                 p->iov[0].iov_base = p->packet;
@@ -919,6 +926,7 @@  int multifd_save_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    bool use_packets = migrate_multifd_use_packets();
     uint8_t i;
 
     if (!migrate_use_multifd()) {
@@ -943,14 +951,20 @@  int multifd_save_setup(Error **errp)
         p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+        if (use_packets) {
+            p->packet_len = sizeof(MultiFDPacket_t)
+                          + sizeof(uint64_t) * page_count;
+            p->packet = g_malloc0(p->packet_len);
+            p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+            p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+            /* We need one extra place for the packet header */
+            p->iov = g_new0(struct iovec, page_count + 1);
+        } else {
+            p->iov = g_new0(struct iovec, page_count);
+        }
         p->name = g_strdup_printf("multifdsend_%d", i);
-        /* We need one extra place for the packet header */
-        p->iov = g_new0(struct iovec, page_count + 1);
         p->normal = g_new0(ram_addr_t, page_count);
         p->page_size = qemu_target_page_size();
         p->page_count = page_count;
@@ -1082,7 +1096,7 @@  void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_use_multifd() || !migrate_multifd_use_packets()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -1109,6 +1123,7 @@  static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
     Error *local_err = NULL;
+    bool use_packets = migrate_multifd_use_packets();
     int ret;
 
     trace_multifd_recv_thread_start(p->id);
@@ -1121,17 +1136,20 @@  static void *multifd_recv_thread(void *opaque)
             break;
         }
 
-        ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                       p->packet_len, &local_err);
-        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
-            break;
-        }
+        if (use_packets) {
+            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+                                           p->packet_len, &local_err);
+            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
+                break;
+            }
 
-        qemu_mutex_lock(&p->mutex);
-        ret = multifd_recv_unfill_packet(p, &local_err);
-        if (ret) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
+            qemu_mutex_lock(&p->mutex);
+            ret = multifd_recv_unfill_packet(p, &local_err);
+            if (ret) {
+                qemu_mutex_unlock(&p->mutex);
+                break;
+            }
+            p->num_packets++;
         }
 
         flags = p->flags;
@@ -1139,7 +1157,7 @@  static void *multifd_recv_thread(void *opaque)
         p->flags &= ~MULTIFD_FLAG_SYNC;
         trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
                            p->next_packet_size);
-        p->num_packets++;
+
         p->total_normal_pages += p->normal_num;
         qemu_mutex_unlock(&p->mutex);
 
@@ -1174,6 +1192,7 @@  int multifd_load_setup(Error **errp)
 {
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+    bool use_packets = migrate_multifd_use_packets();
     uint8_t i;
 
     /*
@@ -1198,9 +1217,12 @@  int multifd_load_setup(Error **errp)
         qemu_sem_init(&p->sem_sync, 0);
         p->quit = false;
         p->id = i;
-        p->packet_len = sizeof(MultiFDPacket_t)
-                      + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
+
+        if (use_packets) {
+            p->packet_len = sizeof(MultiFDPacket_t)
+                + sizeof(uint64_t) * page_count;
+            p->packet = g_malloc0(p->packet_len);
+        }
         p->name = g_strdup_printf("multifdrecv_%d", i);
         p->iov = g_new0(struct iovec, page_count);
         p->normal = g_new0(ram_addr_t, page_count);
@@ -1246,18 +1268,26 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
 {
     MultiFDRecvParams *p;
     Error *local_err = NULL;
-    int id;
+    bool use_packets = migrate_multifd_use_packets();
+    int id, num_packets = 0;
 
-    id = multifd_recv_initial_packet(ioc, &local_err);
-    if (id < 0) {
-        multifd_recv_terminate_threads(local_err);
-        error_propagate_prepend(errp, local_err,
-                                "failed to receive packet"
-                                " via multifd channel %d: ",
-                                qatomic_read(&multifd_recv_state->count));
-        return;
+    if (use_packets) {
+        id = multifd_recv_initial_packet(ioc, &local_err);
+        if (id < 0) {
+            multifd_recv_terminate_threads(local_err);
+            error_propagate_prepend(errp, local_err,
+                                    "failed to receive packet"
+                                    " via multifd channel %d: ",
+                                    qatomic_read(&multifd_recv_state->count));
+            return;
+        }
+        trace_multifd_recv_new_channel(id);
+
+        /* initial packet */
+        num_packets = 1;
+    } else {
+        id = 0;
     }
-    trace_multifd_recv_new_channel(id);
 
     p = &multifd_recv_state->params[id];
     if (p->c != NULL) {
@@ -1268,9 +1298,8 @@  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
         return;
     }
     p->c = ioc;
+    p->num_packets = num_packets;
     object_ref(OBJECT(ioc));
-    /* initial packet */
-    p->num_packets = 1;
 
     p->running = true;
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,