diff mbox series

[RFC,3/6] migration: Add multi-thread compress ops

Message ID 20201109090850.2424-4-jinzeyu@huawei.com (mailing list archive)
State New, archived
Headers show
Series migration: Multi-thread compression with zstd method | expand

Commit Message

Zeyu Jin Nov. 9, 2020, 9:08 a.m. UTC
Add the MigrationCompressOps and MigrationDecompressOps structures to make
the compression method configurable for multi-thread compression migration.

Signed-off-by: Zeyu Jin <jinzeyu@huawei.com>
Signed-off-by: Ying Fang <fangying1@huawei.com
---
 migration/migration.c |   9 ++
 migration/migration.h |   1 +
 migration/ram.c       | 273 +++++++++++++++++++++++++++++-------------
 3 files changed, 203 insertions(+), 80 deletions(-)
diff mbox series

Patch

diff --git a/migration/migration.c b/migration/migration.c
index d0da95fc0d..2c68012029 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -2356,6 +2356,15 @@  int migrate_decompress_threads(void)
     return s->parameters.decompress_threads;
 }
 
+CompressMethod migrate_compress_method(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters.compress_method;
+}
+
 bool migrate_dirty_bitmaps(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index d096b77f74..e22b2ef840 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -339,6 +339,7 @@  int migrate_compress_level(void);
 int migrate_compress_threads(void);
 int migrate_compress_wait_thread(void);
 int migrate_decompress_threads(void);
+CompressMethod migrate_compress_method(void);
 bool migrate_use_events(void);
 bool migrate_postcopy_blocktime(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 75504540c9..94a7422204 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -419,8 +419,11 @@  struct CompressParam {
     ram_addr_t offset;
 
     /* internally used fields */
-    z_stream stream;
     uint8_t *originbuf;
+
+    /* for zlib compression */
+    z_stream stream;
+
 };
 typedef struct CompressParam CompressParam;
 
@@ -432,12 +435,29 @@  struct DecompressParam {
     void *des;
     uint8_t *compbuf;
     int len;
+
+    /* for zlib compression */
     z_stream stream;
 };
 typedef struct DecompressParam DecompressParam;
 
+typedef struct {
+    int (*save_setup)(CompressParam *param);
+    void (*save_cleanup)(CompressParam *param);
+    ssize_t (*compress_data)(CompressParam *param, size_t size);
+} MigrationCompressOps;
+
+typedef struct {
+    int (*load_setup)(DecompressParam *param);
+    void (*load_cleanup)(DecompressParam *param);
+    int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size);
+    int (*check_len)(int len);
+} MigrationDecompressOps;
+
 static CompressParam *comp_param;
 static QemuThread *compress_threads;
+static MigrationCompressOps *compress_ops;
+static MigrationDecompressOps *decompress_ops;
 /* comp_done_cond is used to wake up the migration thread when
  * one of the compression threads has finished the compression.
  * comp_done_lock is used to co-work with comp_done_cond.
@@ -455,6 +475,157 @@  static QemuCond decomp_done_cond;
 
 static bool do_compress_ram_page(CompressParam *param, RAMBlock *block);
 
+static int zlib_save_setup(CompressParam *param)
+{
+    if (deflateInit(&param->stream,
+                    migrate_compress_level()) != Z_OK) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static ssize_t zlib_compress_data(CompressParam *param, size_t size)
+{
+    int err;
+    uint8_t *dest = NULL;
+    z_stream *stream = &param->stream;
+    uint8_t *p = param->originbuf;
+    QEMUFile *f = f = param->file;
+    ssize_t blen = qemu_put_compress_start(f, &dest);
+
+    if (blen < compressBound(size)) {
+        return -1;
+    }
+
+    err = deflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = size;
+    stream->next_in = p;
+    stream->avail_out = blen;
+    stream->next_out = dest;
+
+    err = deflate(stream, Z_FINISH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    blen = stream->next_out - dest;
+    if (blen < 0) {
+        return -1;
+    }
+
+    qemu_put_compress_end(f, blen);
+    return blen + sizeof(int32_t);
+}
+
+static void zlib_save_cleanup(CompressParam *param)
+{
+    deflateEnd(&param->stream);
+}
+
+static int zlib_load_setup(DecompressParam *param)
+{
+    if (inflateInit(&param->stream) != Z_OK) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static int
+zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size)
+{
+    int err;
+
+    z_stream *stream = &param->stream;
+
+    err = inflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = param->len;
+    stream->next_in = param->compbuf;
+    stream->avail_out = size;
+    stream->next_out = dest;
+
+    err = inflate(stream, Z_NO_FLUSH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->total_out;
+}
+
+static void zlib_load_cleanup(DecompressParam *param)
+{
+    inflateEnd(&param->stream);
+}
+
+static int zlib_check_len(int len)
+{
+    return len < 0 || len > compressBound(TARGET_PAGE_SIZE);
+}
+
+static int set_compress_ops(void)
+{
+   compress_ops = g_new0(MigrationCompressOps, 1);
+
+    switch (migrate_compress_method()) {
+    case COMPRESS_METHOD_ZLIB:
+        compress_ops->save_setup = zlib_save_setup;
+        compress_ops->save_cleanup = zlib_save_cleanup;
+        compress_ops->compress_data = zlib_compress_data;
+        break;
+    default:
+        return -1;
+    }
+
+    return 0;
+}
+
+static int set_decompress_ops(void)
+{
+   decompress_ops = g_new0(MigrationDecompressOps, 1);
+
+    switch (migrate_compress_method()) {
+    case COMPRESS_METHOD_ZLIB:
+        decompress_ops->load_setup = zlib_load_setup;
+        decompress_ops->load_cleanup = zlib_load_cleanup;
+        decompress_ops->decompress_data = zlib_decompress_data;
+        decompress_ops->check_len = zlib_check_len;
+        break;
+    default:
+        return -1;
+   }
+
+   return 0;
+}
+
+static void clean_compress_ops(void)
+{
+    compress_ops->save_setup = NULL;
+    compress_ops->save_cleanup = NULL;
+    compress_ops->compress_data = NULL;
+
+    g_free(compress_ops);
+    compress_ops = NULL;
+}
+
+static void clean_decompress_ops(void)
+{
+    decompress_ops->load_setup = NULL;
+    decompress_ops->load_cleanup = NULL;
+    decompress_ops->decompress_data = NULL;
+
+    g_free(decompress_ops);
+    decompress_ops = NULL;
+}
+
 static void *do_data_compress(void *opaque)
 {
     CompressParam *param = opaque;
@@ -511,7 +682,7 @@  static void compress_threads_save_cleanup(void)
         qemu_thread_join(compress_threads + i);
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
+        compress_ops->save_cleanup(&comp_param[i]);
         g_free(comp_param[i].originbuf);
         qemu_fclose(comp_param[i].file);
         comp_param[i].file = NULL;
@@ -522,6 +693,7 @@  static void compress_threads_save_cleanup(void)
     g_free(comp_param);
     compress_threads = NULL;
     comp_param = NULL;
+    clean_compress_ops();
 }
 
 static int compress_threads_save_setup(void)
@@ -531,6 +703,12 @@  static int compress_threads_save_setup(void)
     if (!migrate_use_compression()) {
         return 0;
     }
+
+    if (set_compress_ops() < 0) {
+        clean_compress_ops();
+        return -1;
+    }
+
     thread_count = migrate_compress_threads();
     compress_threads = g_new0(QemuThread, thread_count);
     comp_param = g_new0(CompressParam, thread_count);
@@ -542,8 +720,7 @@  static int compress_threads_save_setup(void)
             goto exit;
         }
 
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
+        if (compress_ops->save_setup(&comp_param[i]) < 0) {
             g_free(comp_param[i].originbuf);
             goto exit;
         }
@@ -1209,50 +1386,6 @@  static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
     return 1;
 }
 
-/*
- * Compress size bytes of data start at p and store the compressed
- * data to the buffer of f.
- *
- * Since the file is dummy file with empty_ops, return -1 if f has no space to
- * save the compressed data.
- */
-static ssize_t qemu_put_compression_data(CompressParam *param, size_t size)
-{
-    int err;
-    uint8_t *dest = NULL;
-    z_stream *stream = &param->stream;
-    uint8_t *p = param->originbuf;
-    QEMUFile *f = f = param->file;
-    ssize_t blen = qemu_put_compress_start(f, &dest);
-
-    if (blen < compressBound(size)) {
-        return -1;
-    }
-
-    err = deflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = size;
-    stream->next_in = p;
-    stream->avail_out = blen;
-    stream->next_out = dest;
-
-    err = deflate(stream, Z_FINISH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    blen = stream->next_out - dest;
-    if (blen < 0) {
-        return -1;
-    }
-
-    qemu_put_compress_end(f, blen);
-    return blen + sizeof(int32_t);
-}
-
 static bool do_compress_ram_page(CompressParam *param, RAMBlock *block)
 {
     RAMState *rs = ram_state;
@@ -1275,7 +1408,7 @@  static bool do_compress_ram_page(CompressParam *param, RAMBlock *block)
      * decompression
      */
     memcpy(param->originbuf, p, TARGET_PAGE_SIZE);
-    ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE);
+    ret = compress_ops->compress_data(param, TARGET_PAGE_SIZE);
     if (ret < 0) {
         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
         error_report("compressed data failed!");
@@ -2864,32 +2997,6 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
-/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize)
-{
-    int err;
-
-    z_stream *stream = &param->stream;
-
-    err = inflateReset(stream);
-    if (err != Z_OK) {
-        return -1;
-    }
-
-    stream->avail_in = param->len;
-    stream->next_in = param->compbuf;
-    stream->avail_out = pagesize;
-    stream->next_out = dest;
-
-    err = inflate(stream, Z_NO_FLUSH);
-    if (err != Z_STREAM_END) {
-        return -1;
-    }
-
-    return stream->total_out;
-}
-
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
@@ -2903,7 +3010,7 @@  static void *do_data_decompress(void *opaque)
             param->des = 0;
             qemu_mutex_unlock(&param->mutex);
 
-            ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE);
+            ret = decompress_ops->decompress_data(param, des, TARGET_PAGE_SIZE);
             if (ret < 0 && migrate_get_current()->decompress_error_check) {
                 error_report("decompress data failed");
                 qemu_file_set_error(decomp_file, ret);
@@ -2973,7 +3080,7 @@  static void compress_threads_load_cleanup(void)
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
-        inflateEnd(&decomp_param[i].stream);
+        decompress_ops->load_cleanup(&decomp_param[i]);
         g_free(decomp_param[i].compbuf);
         decomp_param[i].compbuf = NULL;
     }
@@ -2982,6 +3089,7 @@  static void compress_threads_load_cleanup(void)
     decompress_threads = NULL;
     decomp_param = NULL;
     decomp_file = NULL;
+    clean_decompress_ops();
 }
 
 static int compress_threads_load_setup(QEMUFile *f)
@@ -2992,6 +3100,11 @@  static int compress_threads_load_setup(QEMUFile *f)
         return 0;
     }
 
+    if (set_decompress_ops() < 0) {
+        clean_decompress_ops();
+        return -1;
+    }
+
     thread_count = migrate_decompress_threads();
     decompress_threads = g_new0(QemuThread, thread_count);
     decomp_param = g_new0(DecompressParam, thread_count);
@@ -2999,7 +3112,7 @@  static int compress_threads_load_setup(QEMUFile *f)
     qemu_cond_init(&decomp_done_cond);
     decomp_file = f;
     for (i = 0; i < thread_count; i++) {
-        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+        if (decompress_ops->load_setup(&decomp_param[i]) < 0) {
             goto exit;
         }
 
@@ -3323,7 +3436,7 @@  static int ram_load_postcopy(QEMUFile *f)
         case RAM_SAVE_FLAG_COMPRESS_PAGE:
             all_zero = false;
             len = qemu_get_be32(f);
-            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+            if (decompress_ops->check_len(len)) {
                 error_report("Invalid compressed data length: %d", len);
                 ret = -EINVAL;
                 break;
@@ -3590,7 +3703,7 @@  static int ram_load_precopy(QEMUFile *f)
 
         case RAM_SAVE_FLAG_COMPRESS_PAGE:
             len = qemu_get_be32(f);
-            if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+            if (decompress_ops->check_len(len)) {
                 error_report("Invalid compressed data length: %d", len);
                 ret = -EINVAL;
                 break;