diff mbox series

[v2,8/8] multifd: rest of zlib compression (WIP)

Message ID 20190403114958.3705-9-quintela@redhat.com (mailing list archive)
State New, archived
Headers show
Series WIP: Multifd compression support | expand

Commit Message

Juan Quintela April 3, 2019, 11:49 a.m. UTC
This is still a work in progress, but get everything sent as expected
and it is faster than the code that is already there.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 88 insertions(+), 5 deletions(-)
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 06b25ac66d..1b3b88d711 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1118,7 +1118,41 @@  static void *multifd_send_thread(void *opaque)
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
 
-            p->next_packet_size = used * qemu_target_page_size();
+            if (used) {
+                if (migrate_use_multifd_zlib()) {
+                    struct iovec *iov = p->pages->iov;
+                    z_stream *zs = &p->zs;
+                    uint32_t out_size = 0;
+                    int i;
+
+                    for (i = 0; i < used; i++ ) {
+                        uint32_t available = p->zbuff_len - out_size;
+                        int flush = Z_NO_FLUSH;
+
+                        if (i == used  - 1) {
+                            flush = Z_SYNC_FLUSH;
+                        }
+
+                        zs->avail_in = iov[i].iov_len;
+                        zs->next_in = iov[i].iov_base;
+
+                        zs->avail_out = available;
+                        zs->next_out = p->zbuff + out_size;
+
+                        ret = deflate(zs, flush);
+                        if (ret != Z_OK) {
+                            printf("problem with deflate? %d\n", ret);
+                            qemu_mutex_unlock(&p->mutex);
+                            break;
+                        }
+                        out_size += available - zs->avail_out;
+                    }
+                    p->next_packet_size = out_size;
+                } else {
+                    p->next_packet_size = used * qemu_target_page_size();
+                }
+            }
+
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -1136,8 +1170,13 @@  static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                if (migrate_use_multifd_zlib()) {
+                    ret = qio_channel_write_all(p->c, (void *)p->zbuff,
+                                               p->next_packet_size, &local_err);
+                } else {
+                    ret = qio_channel_writev_all(p->c, p->pages->iov,
+                                                 used, &local_err);
+                }
                 if (ret != 0) {
                     break;
                 }
@@ -1384,8 +1423,52 @@  static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            uint32_t in_size = p->next_packet_size;
+            uint32_t out_size = 0;
+            uint32_t expected_size = used * qemu_target_page_size();
+            int i;
+
+            if (migrate_use_multifd_zlib()) {
+                z_stream *zs = &p->zs;
+
+                ret = qio_channel_read_all(p->c, (void *)p->zbuff,
+                                           in_size, &local_err);
+
+                if (ret != 0) {
+                    break;
+                }
+
+                zs->avail_in = in_size;
+                zs->next_in = p->zbuff;
+
+                for (i = 0; i < used; i++ ) {
+                    struct iovec *iov = &p->pages->iov[i];
+                    int flush = Z_NO_FLUSH;
+
+                    if (i == used  - 1) {
+                        flush = Z_SYNC_FLUSH;
+                    }
+
+                    zs->avail_out = iov->iov_len;
+                    zs->next_out = iov->iov_base;
+
+                    ret = inflate(zs, flush);
+                    if (ret != Z_OK) {
+                        printf("%d: problem with inflate? %d\n", p->id, ret);
+                        qemu_mutex_unlock(&p->mutex);
+                        break;
+                    }
+                    out_size += iov->iov_len;
+                }
+                if (out_size != expected_size) {
+                    printf("out size %d expected size %d\n",
+                           out_size, expected_size);
+                    break;
+                }
+            } else {
+                ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                            used, &local_err);
+            }
             if (ret != 0) {
                 break;
             }