diff mbox series

[RFC,2/4] migration/multifd/zero-copy: Merge header & pages send in a single write

Message ID 20221025044730.319941-3-leobras@redhat.com (mailing list archive)
State New, archived
Headers show
Series MultiFD zero-copy improvements | expand

Commit Message

Leonardo Bras Oct. 25, 2022, 4:47 a.m. UTC
When zero-copy-send is enabled, each loop iteration of the
multifd_send_thread will calls for qio_channel_write_*() twice: The first
one for sending the header without zero-copy flag and the second one for
sending the memory pages, with zero-copy flag enabled.

This ends up calling two syscalls per loop iteration, where one should be
enough.

Also, since the behavior for non-zero-copy write is synchronous, and the
behavior for zero-copy write is asynchronous, it ends up interleaving
synchronous and asynchronous writes, hurting performance that could
otherwise be improved.

The choice of sending the header without the zero-copy flag in a separated
write happened because the header memory area would be reused in the next
write, so it was almost certain to have changed before the kernel could
send the packet.

To send the packet with zero-copy, create an array of header area instead
of a single one, and use a different header area after each write. Also,
flush the sending queue after all the headers have been used.

To avoid adding a zero-copy conditional in multifd_send_fill_packet(),
add a packet parameter (the packet that should be filled). This way it's
simpler to pick which element of the array will be used as a header.

Suggested-by: Juan Quintela <quintela@redhat.com>
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 migration/multifd.h |  5 ++++-
 migration/multifd.c | 52 ++++++++++++++++++++++++---------------------
 2 files changed, 32 insertions(+), 25 deletions(-)

Comments

Juan Quintela Oct. 25, 2022, 9:51 a.m. UTC | #1
Leonardo Bras <leobras@redhat.com> wrote:
> When zero-copy-send is enabled, each loop iteration of the
> multifd_send_thread will calls for qio_channel_write_*() twice: The first
> one for sending the header without zero-copy flag and the second one for
> sending the memory pages, with zero-copy flag enabled.
>
> This ends up calling two syscalls per loop iteration, where one should be
> enough.
>
> Also, since the behavior for non-zero-copy write is synchronous, and the
> behavior for zero-copy write is asynchronous, it ends up interleaving
> synchronous and asynchronous writes, hurting performance that could
> otherwise be improved.
>
> The choice of sending the header without the zero-copy flag in a separated
> write happened because the header memory area would be reused in the next
> write, so it was almost certain to have changed before the kernel could
> send the packet.
>
> To send the packet with zero-copy, create an array of header area instead
> of a single one, and use a different header area after each write. Also,
> flush the sending queue after all the headers have been used.
>
> To avoid adding a zero-copy conditional in multifd_send_fill_packet(),
> add a packet parameter (the packet that should be filled). This way it's
> simpler to pick which element of the array will be used as a header.
>
> Suggested-by: Juan Quintela <quintela@redhat.com>
> Signed-off-by: Leonardo Bras <leobras@redhat.com>

>  
> +            if (use_zero_copy_send) {
> +                p->packet_idx = (p->packet_idx + 1) % HEADER_ARR_SZ;
> +
> +                if (!p->packet_idx && (multifd_zero_copy_flush(p->c) < 0)) {
> +                    break;
> +                }
> +                header = (void *)p->packet + p->packet_idx * p->packet_len;

Isn't this equivalent to?

      header = &(p->packet[p->packet_idx]);

>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> +        int j;

For new code you can:

>          qemu_mutex_init(&p->mutex);
>          qemu_sem_init(&p->sem, 0);
> @@ -940,9 +940,13 @@ int multifd_save_setup(Error **errp)
>          p->pages = multifd_pages_init(page_count);
>          p->packet_len = sizeof(MultiFDPacket_t)
>                        + sizeof(uint64_t) * page_count;
> -        p->packet = g_malloc0(p->packet_len);
> -        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> -        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> +        p->packet = g_malloc0_n(HEADER_ARR_SZ, p->packet_len);
> +        for (j = 0; j < HEADER_ARR_SZ ; j++) {

           for (int j = 0; j < HEADER_ARR_SZ ; j++) {

> +            MultiFDPacket_t *packet = (void *)p->packet + j * p->packet_len;
> +            packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> +            packet->version = cpu_to_be32(MULTIFD_VERSION);

Can't you use here:

            packet[j].magic = cpu_to_be32(MULTIFD_MAGIC);
            packet[j].version = cpu_to_be32(MULTIFD_VERSION);

And call it a day?

The rest is fine for me.  Thanks for the effort.

Later, Juan.
Leonardo Bras Oct. 25, 2022, 1:28 p.m. UTC | #2
On Tue, 2022-10-25 at 11:51 +0200, Juan Quintela wrote:
> Leonardo Bras <leobras@redhat.com> wrote:
> > When zero-copy-send is enabled, each loop iteration of the
> > multifd_send_thread will calls for qio_channel_write_*() twice: The first
> > one for sending the header without zero-copy flag and the second one for
> > sending the memory pages, with zero-copy flag enabled.
> > 
> > This ends up calling two syscalls per loop iteration, where one should be
> > enough.
> > 
> > Also, since the behavior for non-zero-copy write is synchronous, and the
> > behavior for zero-copy write is asynchronous, it ends up interleaving
> > synchronous and asynchronous writes, hurting performance that could
> > otherwise be improved.
> > 
> > The choice of sending the header without the zero-copy flag in a separated
> > write happened because the header memory area would be reused in the next
> > write, so it was almost certain to have changed before the kernel could
> > send the packet.
> > 
> > To send the packet with zero-copy, create an array of header area instead
> > of a single one, and use a different header area after each write. Also,
> > flush the sending queue after all the headers have been used.
> > 
> > To avoid adding a zero-copy conditional in multifd_send_fill_packet(),
> > add a packet parameter (the packet that should be filled). This way it's
> > simpler to pick which element of the array will be used as a header.
> > 
> > Suggested-by: Juan Quintela <quintela@redhat.com>
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
> 
> >  
> > +            if (use_zero_copy_send) {
> > +                p->packet_idx = (p->packet_idx + 1) % HEADER_ARR_SZ;
> > +
> > +                if (!p->packet_idx && (multifd_zero_copy_flush(p->c) < 0)) {
> > +                    break;
> > +                }
> > +                header = (void *)p->packet + p->packet_idx * p->packet_len;
> 
> Isn't this equivalent to?
> 
>       header = &(p->packet[p->packet_idx]);


So, that's the thing:
MultiFDPacket_t does contain a flexible array member (offset[]) at the end of
the struct, which is used together with dynamic allocation to create the
perception of a variable size struct. 

This is used to make sure the packet will be able to carry the offset for each
memory page getting sent, independent of it's size:

multifd_save_setup (){ 
	uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
	[...]
	p->packet_len = sizeof(MultiFDPacket_t)
                      + sizeof(uint64_t) * page_count;
	p->packet = g_malloc0(p->packet_len);
	[...]
}

So the size of the header is actually p->packet_len, and not
sizeof(MultiFDPacket_t). This means using the default pointer arithmetic does
not work for this case, and

header = &(p->packet[p->packet_idx]) 

will actually point to the wrong place in memory, since it does not consider the
the actual size of offset[] array.

> 
> >      for (i = 0; i < thread_count; i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> > +        int j;
> 
> For new code you can:
> 
> >          qemu_mutex_init(&p->mutex);
> >          qemu_sem_init(&p->sem, 0);
> > @@ -940,9 +940,13 @@ int multifd_save_setup(Error **errp)
> >          p->pages = multifd_pages_init(page_count);
> >          p->packet_len = sizeof(MultiFDPacket_t)
> >                        + sizeof(uint64_t) * page_count;
> > -        p->packet = g_malloc0(p->packet_len);
> > -        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> > -        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
> > +        p->packet = g_malloc0_n(HEADER_ARR_SZ, p->packet_len);
> > +        for (j = 0; j < HEADER_ARR_SZ ; j++) {
> 
>            for (int j = 0; j < HEADER_ARR_SZ ; j++) {

Oh, sweet. I am very fond of C99, just not used to it getting accepted upstream.
Thanks for the tip :)

> 
> > +            MultiFDPacket_t *packet = (void *)p->packet + j * p->packet_len;
> > +            packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> > +            packet->version = cpu_to_be32(MULTIFD_VERSION);
> 
> Can't you use here:
> 
>             packet[j].magic = cpu_to_be32(MULTIFD_MAGIC);
>             packet[j].version = cpu_to_be32(MULTIFD_VERSION);
> 
> And call it a day?

Not actually. The size of this struct is different from sizeof(MultiFDPacket_t),
so it does not work. See above.

> 
> The rest is fine for me.  Thanks for the effort.

Thanks for reviewing Juan!

Leo

> 
> Later, Juan.
>
diff mbox series

Patch

diff --git a/migration/multifd.h b/migration/multifd.h
index 519f498643..7f200c0286 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -90,6 +90,7 @@  typedef struct {
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
+
     /* is this channel thread running */
     bool running;
     /* should this thread finish */
@@ -109,8 +110,10 @@  typedef struct {
 
     /* thread local variables. No locking required */
 
-    /* pointer to the packet */
+    /* pointer to the packet array */
     MultiFDPacket_t *packet;
+    /* index of the packet array */
+    uint32_t packet_idx;
     /* size of the next packet that contains pages */
     uint32_t next_packet_size;
     /* packets sent through this channel */
diff --git a/migration/multifd.c b/migration/multifd.c
index 509bbbe3bf..aa18c47141 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -34,6 +34,8 @@ 
 #define MULTIFD_MAGIC 0x11223344U
 #define MULTIFD_VERSION 1
 
+#define HEADER_ARR_SZ 1024
+
 typedef struct {
     uint32_t magic;
     uint32_t version;
@@ -255,9 +257,9 @@  static void multifd_pages_clear(MultiFDPages_t *pages)
     g_free(pages);
 }
 
-static void multifd_send_fill_packet(MultiFDSendParams *p)
+static void multifd_send_fill_packet(MultiFDSendParams *p,
+                                     MultiFDPacket_t *packet)
 {
-    MultiFDPacket_t *packet = p->packet;
     int i;
 
     packet->flags = cpu_to_be32(p->flags);
@@ -547,6 +549,7 @@  void multifd_save_cleanup(void)
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
+        p->packet_idx = 0;
         g_free(p->iov);
         p->iov = NULL;
         g_free(p->normal);
@@ -651,6 +654,7 @@  int multifd_send_sync_main(QEMUFile *f)
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
+    MultiFDPacket_t *header = p->packet;
     Error *local_err = NULL;
     int ret = 0;
     bool use_zero_copy_send = migrate_use_zero_copy_send();
@@ -676,13 +680,9 @@  static void *multifd_send_thread(void *opaque)
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
-            p->normal_num = 0;
 
-            if (use_zero_copy_send) {
-                p->iovs_num = 0;
-            } else {
-                p->iovs_num = 1;
-            }
+            p->normal_num = 0;
+            p->iovs_num = 1;
 
             for (int i = 0; i < p->pages->num; i++) {
                 p->normal[p->normal_num] = p->pages->offset[i];
@@ -696,7 +696,8 @@  static void *multifd_send_thread(void *opaque)
                     break;
                 }
             }
-            multifd_send_fill_packet(p);
+
+            multifd_send_fill_packet(p, header);
             p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += p->normal_num;
@@ -707,18 +708,8 @@  static void *multifd_send_thread(void *opaque)
             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
-            if (use_zero_copy_send) {
-                /* Send header first, without zerocopy */
-                ret = qio_channel_write_all(p->c, (void *)p->packet,
-                                            p->packet_len, &local_err);
-                if (ret != 0) {
-                    break;
-                }
-            } else {
-                /* Send header using the same writev call */
-                p->iov[0].iov_len = p->packet_len;
-                p->iov[0].iov_base = p->packet;
-            }
+            p->iov[0].iov_len = p->packet_len;
+            p->iov[0].iov_base = header;
 
             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                               0, p->write_flags, &local_err);
@@ -726,6 +717,14 @@  static void *multifd_send_thread(void *opaque)
                 break;
             }
 
+            if (use_zero_copy_send) {
+                p->packet_idx = (p->packet_idx + 1) % HEADER_ARR_SZ;
+
+                if (!p->packet_idx && (multifd_zero_copy_flush(p->c) < 0)) {
+                    break;
+                }
+                header = (void *)p->packet + p->packet_idx * p->packet_len;
+            }
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
@@ -930,6 +929,7 @@  int multifd_save_setup(Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
+        int j;
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
@@ -940,9 +940,13 @@  int multifd_save_setup(Error **errp)
         p->pages = multifd_pages_init(page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
-        p->packet = g_malloc0(p->packet_len);
-        p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-        p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+        p->packet = g_malloc0_n(HEADER_ARR_SZ, p->packet_len);
+        for (j = 0; j < HEADER_ARR_SZ ; j++) {
+            MultiFDPacket_t *packet = (void *)p->packet + j * p->packet_len;
+            packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+            packet->version = cpu_to_be32(MULTIFD_VERSION);
+        }
+        p->packet_idx = 0;
         p->name = g_strdup_printf("multifdsend_%d", i);
         /* We need one extra place for the packet header */
         p->iov = g_new0(struct iovec, page_count + 1);