diff mbox

[2/8] migration: stop allocating and freeing memory frequently

Message ID 20180313075739.11194-3-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show

Commit Message

Xiao Guangrong March 13, 2018, 7:57 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

Current code uses compress2()/uncompress() to compress/decompress
memory, these two function manager memory allocation and release
internally, that causes huge memory is allocated and freed very
frequently

More worse, frequently returning memory to kernel will flush TLBs
and trigger invalidation callbacks on mmu-notification which
interacts with KVM MMU, that dramatically reduce the performance
of VM

So, we maintain the memory by ourselves and reuse it for each
compression and decompression

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/qemu-file.c |  34 ++++++++++--
 migration/qemu-file.h |   6 ++-
 migration/ram.c       | 142 +++++++++++++++++++++++++++++++++++++-------------
 3 files changed, 140 insertions(+), 42 deletions(-)

Comments

Dr. David Alan Gilbert March 15, 2018, 11:03 a.m. UTC | #1
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Current code uses compress2()/uncompress() to compress/decompress
> memory, these two function manager memory allocation and release
> internally, that causes huge memory is allocated and freed very
> frequently
> 
> More worse, frequently returning memory to kernel will flush TLBs
> and trigger invalidation callbacks on mmu-notification which
> interacts with KVM MMU, that dramatically reduce the performance
> of VM
> 
> So, we maintain the memory by ourselves and reuse it for each
> compression and decompression

I think
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/qemu-file.c |  34 ++++++++++--
>  migration/qemu-file.h |   6 ++-
>  migration/ram.c       | 142 +++++++++++++++++++++++++++++++++++++-------------
>  3 files changed, 140 insertions(+), 42 deletions(-)
> 
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 2ab2bf362d..1ff33a1ffb 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f)
>      return v;
>  }
>  
> +/* return the size after compression, or negative value on error */
> +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
> +                              const uint8_t *source, size_t source_len)
> +{
> +    int err;
> +
> +    err = deflateReset(stream);
> +    if (err != Z_OK) {
> +        return -1;
> +    }
> +
> +    stream->avail_in = source_len;
> +    stream->next_in = (uint8_t *)source;
> +    stream->avail_out = dest_len;
> +    stream->next_out = dest;
> +
> +    err = deflate(stream, Z_FINISH);
> +    if (err != Z_STREAM_END) {
> +        return -1;
> +    }
> +
> +    return stream->next_out - dest;
> +}
> +
>  /* Compress size bytes of data start at p with specific compression
>   * level and store the compressed data to the buffer of f.
>   *
> @@ -668,8 +692,8 @@ uint64_t qemu_get_be64(QEMUFile *f)
>   * data, return -1.
>   */
>  
> -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
> -                                  int level)
> +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
> +                                  const uint8_t *p, size_t size)
>  {
>      ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
>  
> @@ -683,8 +707,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
>              return -1;
>          }
>      }
> -    if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
> -                  (Bytef *)p, size, level) != Z_OK) {
> +
> +    blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
> +                              blen, p, size);
> +    if (blen < 0) {
>          error_report("Compress Failed!");
>          return 0;
>      }
> diff --git a/migration/qemu-file.h b/migration/qemu-file.h
> index aae4e5ed36..d123b21ca8 100644
> --- a/migration/qemu-file.h
> +++ b/migration/qemu-file.h
> @@ -25,6 +25,8 @@
>  #ifndef MIGRATION_QEMU_FILE_H
>  #define MIGRATION_QEMU_FILE_H
>  
> +#include <zlib.h>
> +
>  /* Read a chunk of data from a file at the given position.  The pos argument
>   * can be ignored if the file is only be used for streaming.  The number of
>   * bytes actually read should be returned.
> @@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f);
>  
>  size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
>  size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
> -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
> -                                  int level);
> +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
> +                                  const uint8_t *p, size_t size);
>  int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
>  
>  /*
> diff --git a/migration/ram.c b/migration/ram.c
> index 615693f180..fff3f31e90 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -264,6 +264,7 @@ struct CompressParam {
>      QemuCond cond;
>      RAMBlock *block;
>      ram_addr_t offset;
> +    z_stream stream;
>  };
>  typedef struct CompressParam CompressParam;
>  
> @@ -275,6 +276,7 @@ struct DecompressParam {
>      void *des;
>      uint8_t *compbuf;
>      int len;
> +    z_stream stream;
>  };
>  typedef struct DecompressParam DecompressParam;
>  
> @@ -294,7 +296,7 @@ static QemuThread *decompress_threads;
>  static QemuMutex decomp_done_lock;
>  static QemuCond decomp_done_cond;
>  
> -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
> +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>                                  ram_addr_t offset);
>  
>  static void *do_data_compress(void *opaque)
> @@ -311,7 +313,7 @@ static void *do_data_compress(void *opaque)
>              param->block = NULL;
>              qemu_mutex_unlock(&param->mutex);
>  
> -            do_compress_ram_page(param->file, block, offset);
> +            do_compress_ram_page(param->file, &param->stream, block, offset);
>  
>              qemu_mutex_lock(&comp_done_lock);
>              param->done = true;
> @@ -352,10 +354,17 @@ static void compress_threads_save_cleanup(void)
>      terminate_compression_threads();
>      thread_count = migrate_compress_threads();
>      for (i = 0; i < thread_count; i++) {
> +        /* something in compress_threads_save_setup() is wrong. */
> +        if (!comp_param[i].stream.opaque) {
> +            break;
> +        }
> +
>          qemu_thread_join(compress_threads + i);
>          qemu_fclose(comp_param[i].file);
>          qemu_mutex_destroy(&comp_param[i].mutex);
>          qemu_cond_destroy(&comp_param[i].cond);
> +        deflateEnd(&comp_param[i].stream);
> +        comp_param[i].stream.opaque = NULL;
>      }
>      qemu_mutex_destroy(&comp_done_lock);
>      qemu_cond_destroy(&comp_done_cond);
> @@ -365,12 +374,12 @@ static void compress_threads_save_cleanup(void)
>      comp_param = NULL;
>  }
>  
> -static void compress_threads_save_setup(void)
> +static int compress_threads_save_setup(void)
>  {
>      int i, thread_count;
>  
>      if (!migrate_use_compression()) {
> -        return;
> +        return 0;
>      }
>      thread_count = migrate_compress_threads();
>      compress_threads = g_new0(QemuThread, thread_count);
> @@ -378,6 +387,12 @@ static void compress_threads_save_setup(void)
>      qemu_cond_init(&comp_done_cond);
>      qemu_mutex_init(&comp_done_lock);
>      for (i = 0; i < thread_count; i++) {
> +        if (deflateInit(&comp_param[i].stream,
> +                           migrate_compress_level()) != Z_OK) {
> +            goto exit;
> +        }
> +        comp_param[i].stream.opaque = &comp_param[i];
> +
>          /* comp_param[i].file is just used as a dummy buffer to save data,
>           * set its ops to empty.
>           */
> @@ -390,6 +405,11 @@ static void compress_threads_save_setup(void)
>                             do_data_compress, comp_param + i,
>                             QEMU_THREAD_JOINABLE);
>      }
> +    return 0;
> +
> +exit:
> +    compress_threads_save_cleanup();
> +    return -1;
>  }
>  
>  /* Multiple fd's */
> @@ -1026,7 +1046,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
> +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
>      RAMState *rs = ram_state;
> @@ -1035,8 +1055,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>  
>      bytes_sent = save_page_header(rs, f, block, offset |
>                                    RAM_SAVE_FLAG_COMPRESS_PAGE);
> -    blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
> -                                     migrate_compress_level());
> +    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
>      if (blen < 0) {
>          bytes_sent = 0;
>          qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> @@ -2209,9 +2228,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      RAMState **rsp = opaque;
>      RAMBlock *block;
>  
> +    if (compress_threads_save_setup()) {
> +        return -1;
> +    }
> +
>      /* migration has already setup the bitmap, reuse it. */
>      if (!migration_in_colo_state()) {
>          if (ram_init_all(rsp) != 0) {
> +            compress_threads_save_cleanup();
>              return -1;
>          }
>      }
> @@ -2231,7 +2255,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
>      }
>  
>      rcu_read_unlock();
> -    compress_threads_save_setup();
>  
>      ram_control_before_iterate(f, RAM_CONTROL_SETUP);
>      ram_control_after_iterate(f, RAM_CONTROL_SETUP);
> @@ -2495,6 +2518,30 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
>      }
>  }
>  
> +/* return the size after decompression, or negative value on error */
> +static int qemu_uncompress(z_stream *stream, uint8_t *dest, size_t dest_len,
> +                           uint8_t *source, size_t source_len)
> +{
> +    int err;
> +
> +    err = inflateReset(stream);
> +    if (err != Z_OK) {
> +        return -1;
> +    }
> +
> +    stream->avail_in = source_len;
> +    stream->next_in = source;
> +    stream->avail_out = dest_len;
> +    stream->next_out = dest;
> +
> +    err = inflate(stream, Z_NO_FLUSH);
> +    if (err != Z_STREAM_END) {
> +        return -1;
> +    }
> +
> +    return stream->total_out;
> +}
> +
>  static void *do_data_decompress(void *opaque)
>  {
>      DecompressParam *param = opaque;
> @@ -2511,13 +2558,13 @@ static void *do_data_decompress(void *opaque)
>              qemu_mutex_unlock(&param->mutex);
>  
>              pagesize = TARGET_PAGE_SIZE;
> -            /* uncompress() will return failed in some case, especially
> +            /* qemu_uncompress() will return failed in some case, especially
>               * when the page is dirted when doing the compression, it's
>               * not a problem because the dirty page will be retransferred
>               * and uncompress() won't break the data in other pages.
>               */
> -            uncompress((Bytef *)des, &pagesize,
> -                       (const Bytef *)param->compbuf, len);
> +            qemu_uncompress(&param->stream, des, pagesize,
> +                            param->compbuf, len);
>  
>              qemu_mutex_lock(&decomp_done_lock);
>              param->done = true;
> @@ -2552,30 +2599,6 @@ static void wait_for_decompress_done(void)
>      qemu_mutex_unlock(&decomp_done_lock);
>  }
>  
> -static void compress_threads_load_setup(void)
> -{
> -    int i, thread_count;
> -
> -    if (!migrate_use_compression()) {
> -        return;
> -    }
> -    thread_count = migrate_decompress_threads();
> -    decompress_threads = g_new0(QemuThread, thread_count);
> -    decomp_param = g_new0(DecompressParam, thread_count);
> -    qemu_mutex_init(&decomp_done_lock);
> -    qemu_cond_init(&decomp_done_cond);
> -    for (i = 0; i < thread_count; i++) {
> -        qemu_mutex_init(&decomp_param[i].mutex);
> -        qemu_cond_init(&decomp_param[i].cond);
> -        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> -        decomp_param[i].done = true;
> -        decomp_param[i].quit = false;
> -        qemu_thread_create(decompress_threads + i, "decompress",
> -                           do_data_decompress, decomp_param + i,
> -                           QEMU_THREAD_JOINABLE);
> -    }
> -}
> -
>  static void compress_threads_load_cleanup(void)
>  {
>      int i, thread_count;
> @@ -2585,16 +2608,26 @@ static void compress_threads_load_cleanup(void)
>      }
>      thread_count = migrate_decompress_threads();
>      for (i = 0; i < thread_count; i++) {
> +        if (!decomp_param[i].stream.opaque) {
> +            break;
> +        }
> +
>          qemu_mutex_lock(&decomp_param[i].mutex);
>          decomp_param[i].quit = true;
>          qemu_cond_signal(&decomp_param[i].cond);
>          qemu_mutex_unlock(&decomp_param[i].mutex);
>      }
>      for (i = 0; i < thread_count; i++) {
> +        if (!decomp_param[i].stream.opaque) {
> +            break;
> +        }
> +
>          qemu_thread_join(decompress_threads + i);
>          qemu_mutex_destroy(&decomp_param[i].mutex);
>          qemu_cond_destroy(&decomp_param[i].cond);
>          g_free(decomp_param[i].compbuf);
> +        inflateEnd(&decomp_param[i].stream);
> +        decomp_param[i].stream.opaque = NULL;
>      }
>      g_free(decompress_threads);
>      g_free(decomp_param);
> @@ -2602,6 +2635,40 @@ static void compress_threads_load_cleanup(void)
>      decomp_param = NULL;
>  }
>  
> +static int compress_threads_load_setup(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return 0;
> +    }
> +
> +    thread_count = migrate_decompress_threads();
> +    decompress_threads = g_new0(QemuThread, thread_count);
> +    decomp_param = g_new0(DecompressParam, thread_count);
> +    qemu_mutex_init(&decomp_done_lock);
> +    qemu_cond_init(&decomp_done_cond);
> +    for (i = 0; i < thread_count; i++) {
> +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
> +            goto exit;
> +        }
> +        decomp_param[i].stream.opaque = &decomp_param[i];
> +
> +        qemu_mutex_init(&decomp_param[i].mutex);
> +        qemu_cond_init(&decomp_param[i].cond);
> +        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> +        decomp_param[i].done = true;
> +        decomp_param[i].quit = false;
> +        qemu_thread_create(decompress_threads + i, "decompress",
> +                           do_data_decompress, decomp_param + i,
> +                           QEMU_THREAD_JOINABLE);
> +    }
> +    return 0;
> +exit:
> +    compress_threads_load_cleanup();

I don't think this is safe; if inflateInit(..) fails in not-the-last
thread, compress_threads_load_cleanup() will try and destroy all the
mutex's and condition variables, even though they've not yet all been
_init'd.

However, other than that I think the patch is OK; a chat with Dan
Berrange has convinced me this probably doesn't affect the stream
format, so that's OK.

One thing I would like is a comment as to how the 'opaque' field is
being used; I don't think I quite understand what you're doing there.

Dave

> +    return -1;
> +}
> +
>  static void decompress_data_with_multi_threads(QEMUFile *f,
>                                                 void *host, int len)
>  {
> @@ -2641,8 +2708,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>   */
>  static int ram_load_setup(QEMUFile *f, void *opaque)
>  {
> +    if (compress_threads_load_setup()) {
> +        return -1;
> +    }
> +
>      xbzrle_load_setup();
> -    compress_threads_load_setup();
>      ramblock_recv_map_init();
>      return 0;
>  }
> -- 
> 2.14.3
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Xiao Guangrong March 16, 2018, 8:19 a.m. UTC | #2
On 03/15/2018 07:03 PM, Dr. David Alan Gilbert wrote:

>> +static int compress_threads_load_setup(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_use_compression()) {
>> +        return 0;
>> +    }
>> +
>> +    thread_count = migrate_decompress_threads();
>> +    decompress_threads = g_new0(QemuThread, thread_count);
>> +    decomp_param = g_new0(DecompressParam, thread_count);
>> +    qemu_mutex_init(&decomp_done_lock);
>> +    qemu_cond_init(&decomp_done_cond);
>> +    for (i = 0; i < thread_count; i++) {
>> +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
>> +            goto exit;
>> +        }
>> +        decomp_param[i].stream.opaque = &decomp_param[i];
>> +
>> +        qemu_mutex_init(&decomp_param[i].mutex);
>> +        qemu_cond_init(&decomp_param[i].cond);
>> +        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
>> +        decomp_param[i].done = true;
>> +        decomp_param[i].quit = false;
>> +        qemu_thread_create(decompress_threads + i, "decompress",
>> +                           do_data_decompress, decomp_param + i,
>> +                           QEMU_THREAD_JOINABLE);
>> +    }
>> +    return 0;
>> +exit:
>> +    compress_threads_load_cleanup();
> 
> I don't think this is safe; if inflateInit(..) fails in not-the-last
> thread, compress_threads_load_cleanup() will try and destroy all the
> mutex's and condition variables, even though they've not yet all been
> _init'd.
> 

That is exactly why we used 'opaque', please see more below...

> However, other than that I think the patch is OK; a chat with Dan
> Berrange has convinced me this probably doesn't affect the stream
> format, so that's OK.
> 
> One thing I would like is a comment as to how the 'opaque' field is
> being used; I don't think I quite understand what you're doing there.

The zlib.h file says that:
"     The opaque value provided by the application will be passed as the first
    parameter for calls of zalloc and zfree.  This can be useful for custom
    memory management.  The compression library attaches no meaning to the
    opaque value."
So we can use it to store our private data.

Here, we use it as a indicator which shows if the thread is properly init'd
or not. If inflateInit() is successful we will set it to non-NULL, otherwise
it is NULL, so that the cleanup path can figure out the first thread failed
to do inflateInit().
Dr. David Alan Gilbert March 19, 2018, 10:54 a.m. UTC | #3
* Xiao Guangrong (guangrong.xiao@gmail.com) wrote:
> 
> 
> On 03/15/2018 07:03 PM, Dr. David Alan Gilbert wrote:
> 
> > > +static int compress_threads_load_setup(void)
> > > +{
> > > +    int i, thread_count;
> > > +
> > > +    if (!migrate_use_compression()) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    thread_count = migrate_decompress_threads();
> > > +    decompress_threads = g_new0(QemuThread, thread_count);
> > > +    decomp_param = g_new0(DecompressParam, thread_count);
> > > +    qemu_mutex_init(&decomp_done_lock);
> > > +    qemu_cond_init(&decomp_done_cond);
> > > +    for (i = 0; i < thread_count; i++) {
> > > +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
> > > +            goto exit;
> > > +        }
> > > +        decomp_param[i].stream.opaque = &decomp_param[i];
> > > +
> > > +        qemu_mutex_init(&decomp_param[i].mutex);
> > > +        qemu_cond_init(&decomp_param[i].cond);
> > > +        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> > > +        decomp_param[i].done = true;
> > > +        decomp_param[i].quit = false;
> > > +        qemu_thread_create(decompress_threads + i, "decompress",
> > > +                           do_data_decompress, decomp_param + i,
> > > +                           QEMU_THREAD_JOINABLE);
> > > +    }
> > > +    return 0;
> > > +exit:
> > > +    compress_threads_load_cleanup();
> > 
> > I don't think this is safe; if inflateInit(..) fails in not-the-last
> > thread, compress_threads_load_cleanup() will try and destroy all the
> > mutex's and condition variables, even though they've not yet all been
> > _init'd.
> > 
> 
> That is exactly why we used 'opaque', please see more below...
> 
> > However, other than that I think the patch is OK; a chat with Dan
> > Berrange has convinced me this probably doesn't affect the stream
> > format, so that's OK.
> > 
> > One thing I would like is a comment as to how the 'opaque' field is
> > being used; I don't think I quite understand what you're doing there.
> 
> The zlib.h file says that:
> "     The opaque value provided by the application will be passed as the first
>    parameter for calls of zalloc and zfree.  This can be useful for custom
>    memory management.  The compression library attaches no meaning to the
>    opaque value."
> So we can use it to store our private data.
> 
> Here, we use it as a indicator which shows if the thread is properly init'd
> or not. If inflateInit() is successful we will set it to non-NULL, otherwise
> it is NULL, so that the cleanup path can figure out the first thread failed
> to do inflateInit().

OK, so I think you just need to add a comment to explain that. Put it
above the 'if (!decomp_param[i].stream.opaque) {' in
compress_threads_load_cleanup  so it'll be easy to understand.

Dave

> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Xiao Guangrong March 19, 2018, 12:11 p.m. UTC | #4
On 03/19/2018 06:54 PM, Dr. David Alan Gilbert wrote:

>>>> +    return 0;
>>>> +exit:
>>>> +    compress_threads_load_cleanup();
>>>
>>> I don't think this is safe; if inflateInit(..) fails in not-the-last
>>> thread, compress_threads_load_cleanup() will try and destroy all the
>>> mutex's and condition variables, even though they've not yet all been
>>> _init'd.
>>>
>>
>> That is exactly why we used 'opaque', please see more below...
>>
>>> However, other than that I think the patch is OK; a chat with Dan
>>> Berrange has convinced me this probably doesn't affect the stream
>>> format, so that's OK.
>>>
>>> One thing I would like is a comment as to how the 'opaque' field is
>>> being used; I don't think I quite understand what you're doing there.
>>
>> The zlib.h file says that:
>> "     The opaque value provided by the application will be passed as the first
>>     parameter for calls of zalloc and zfree.  This can be useful for custom
>>     memory management.  The compression library attaches no meaning to the
>>     opaque value."
>> So we can use it to store our private data.
>>
>> Here, we use it as a indicator which shows if the thread is properly init'd
>> or not. If inflateInit() is successful we will set it to non-NULL, otherwise
>> it is NULL, so that the cleanup path can figure out the first thread failed
>> to do inflateInit().
> 
> OK, so I think you just need to add a comment to explain that. Put it
> above the 'if (!decomp_param[i].stream.opaque) {' in
> compress_threads_load_cleanup  so it'll be easy to understand.

Yes, indeed, i will do.

Thanks!
Peter Xu March 21, 2018, 9:06 a.m. UTC | #5
On Tue, Mar 13, 2018 at 03:57:33PM +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Current code uses compress2()/uncompress() to compress/decompress
> memory, these two function manager memory allocation and release
> internally, that causes huge memory is allocated and freed very
> frequently
> 
> More worse, frequently returning memory to kernel will flush TLBs
> and trigger invalidation callbacks on mmu-notification which
> interacts with KVM MMU, that dramatically reduce the performance
> of VM
> 
> So, we maintain the memory by ourselves and reuse it for each
> compression and decompression
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/qemu-file.c |  34 ++++++++++--
>  migration/qemu-file.h |   6 ++-
>  migration/ram.c       | 142 +++++++++++++++++++++++++++++++++++++-------------
>  3 files changed, 140 insertions(+), 42 deletions(-)
> 
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 2ab2bf362d..1ff33a1ffb 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f)
>      return v;
>  }
>  
> +/* return the size after compression, or negative value on error */
> +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
> +                              const uint8_t *source, size_t source_len)
> +{
> +    int err;
> +
> +    err = deflateReset(stream);

I'm not familiar with zlib, but I saw this in manual:

 https://www.zlib.net/manual.html

 This function is equivalent to deflateEnd followed by deflateInit,
 but does not free and reallocate the internal compression state. The
 stream will leave the compression level and any other attributes that
 may have been set unchanged.

I thought it was deflateInit() who is slow?  Can we avoid the reset as
long as we make sure to deflateInit() before doing anything else?

Meanwhile, is there any performance number for this single patch?
Since I thought the old code is calling compress2() which contains
deflateInit() and deflateEnd() too, just like what current patch do?

It would be nice too if we can split the patch into two (decode,
encode) if you want, but that's optional.

Thanks,
Xiao Guangrong March 22, 2018, 11:57 a.m. UTC | #6
On 03/21/2018 05:06 PM, Peter Xu wrote:
> On Tue, Mar 13, 2018 at 03:57:33PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Current code uses compress2()/uncompress() to compress/decompress
>> memory, these two function manager memory allocation and release
>> internally, that causes huge memory is allocated and freed very
>> frequently
>>
>> More worse, frequently returning memory to kernel will flush TLBs
>> and trigger invalidation callbacks on mmu-notification which
>> interacts with KVM MMU, that dramatically reduce the performance
>> of VM
>>
>> So, we maintain the memory by ourselves and reuse it for each
>> compression and decompression
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>> ---
>>   migration/qemu-file.c |  34 ++++++++++--
>>   migration/qemu-file.h |   6 ++-
>>   migration/ram.c       | 142 +++++++++++++++++++++++++++++++++++++-------------
>>   3 files changed, 140 insertions(+), 42 deletions(-)
>>
>> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
>> index 2ab2bf362d..1ff33a1ffb 100644
>> --- a/migration/qemu-file.c
>> +++ b/migration/qemu-file.c
>> @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f)
>>       return v;
>>   }
>>   
>> +/* return the size after compression, or negative value on error */
>> +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
>> +                              const uint8_t *source, size_t source_len)
>> +{
>> +    int err;
>> +
>> +    err = deflateReset(stream);
> 
> I'm not familiar with zlib, but I saw this in manual:
> 
>   https://www.zlib.net/manual.html
> 
>   This function is equivalent to deflateEnd followed by deflateInit,
>   but does not free and reallocate the internal compression state. The
>   stream will leave the compression level and any other attributes that
>   may have been set unchanged.
> 
> I thought it was deflateInit() who is slow?  Can we avoid the reset as

deflateEnd() is worse as it frees memory to kernel which triggers
TLB flush and mmu-notifier.

> long as we make sure to deflateInit() before doing anything else?

Actually, deflateReset() is cheap... :)

> 
> Meanwhile, is there any performance number for this single patch?
> Since I thought the old code is calling compress2() which contains
> deflateInit() and deflateEnd() too, just like what current patch do?

No, after the patch, we just call deflateInit() / deflateEnd() one
time (in _setup() handler and _cleanup handler).

Yes. This is the perf data from our production,
after revert this patch:
+  57.88%  kqemu  [kernel.kallsyms]        [k] queued_spin_lock_slowpath
+  10.55%  kqemu  [kernel.kallsyms]        [k] __lock_acquire
+   4.83%  kqemu  [kernel.kallsyms]        [k] flush_tlb_func_common

-   1.16%  kqemu  [kernel.kallsyms]        [k] lock_acquire                                       ▒
    - lock_acquire                                                                                 ▒
       - 15.68% _raw_spin_lock                                                                     ▒
          + 29.42% __schedule                                                                      ▒
          + 29.14% perf_event_context_sched_out                                                    ▒
          + 23.60% tdp_page_fault                                                                  ▒
          + 10.54% do_anonymous_page                                                               ▒
          + 2.07% kvm_mmu_notifier_invalidate_range_start                                          ▒
          + 1.83% zap_pte_range                                                                    ▒
          + 1.44% kvm_mmu_notifier_invalidate_range_end


apply our work:
+  51.92%  kqemu  [kernel.kallsyms]        [k] queued_spin_lock_slowpath
+  14.82%  kqemu  [kernel.kallsyms]        [k] __lock_acquire
+   1.47%  kqemu  [kernel.kallsyms]        [k] mark_lock.clone.0
+   1.46%  kqemu  [kernel.kallsyms]        [k] native_sched_clock
+   1.31%  kqemu  [kernel.kallsyms]        [k] lock_acquire
+   1.24%  kqemu  libc-2.12.so             [.] __memset_sse2

-  14.82%  kqemu  [kernel.kallsyms]        [k] __lock_acquire                                     ▒
    - __lock_acquire                                                                               ▒
       - 99.75% lock_acquire                                                                       ▒
          - 18.38% _raw_spin_lock                                                                  ▒
             + 39.62% tdp_page_fault                                                               ▒
             + 31.32% __schedule                                                                   ▒
             + 27.53% perf_event_context_sched_out                                                 ▒
             + 0.58% hrtimer_interrupt


You can see the TLB flush and mmu-lock contention have gone after this patch.

> 
> It would be nice too if we can split the patch into two (decode,
> encode) if you want, but that's optional.

That's good to me, thank you, Peter.
Peter Xu March 27, 2018, 7:07 a.m. UTC | #7
On Thu, Mar 22, 2018 at 07:57:54PM +0800, Xiao Guangrong wrote:
> 
> 
> On 03/21/2018 05:06 PM, Peter Xu wrote:
> > On Tue, Mar 13, 2018 at 03:57:33PM +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
> > > 
> > > Current code uses compress2()/uncompress() to compress/decompress
> > > memory, these two function manager memory allocation and release
> > > internally, that causes huge memory is allocated and freed very
> > > frequently
> > > 
> > > More worse, frequently returning memory to kernel will flush TLBs
> > > and trigger invalidation callbacks on mmu-notification which
> > > interacts with KVM MMU, that dramatically reduce the performance
> > > of VM
> > > 
> > > So, we maintain the memory by ourselves and reuse it for each
> > > compression and decompression
> > > 
> > > Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> > > ---
> > >   migration/qemu-file.c |  34 ++++++++++--
> > >   migration/qemu-file.h |   6 ++-
> > >   migration/ram.c       | 142 +++++++++++++++++++++++++++++++++++++-------------
> > >   3 files changed, 140 insertions(+), 42 deletions(-)
> > > 
> > > diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> > > index 2ab2bf362d..1ff33a1ffb 100644
> > > --- a/migration/qemu-file.c
> > > +++ b/migration/qemu-file.c
> > > @@ -658,6 +658,30 @@ uint64_t qemu_get_be64(QEMUFile *f)
> > >       return v;
> > >   }
> > > +/* return the size after compression, or negative value on error */
> > > +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
> > > +                              const uint8_t *source, size_t source_len)
> > > +{
> > > +    int err;
> > > +
> > > +    err = deflateReset(stream);
> > 
> > I'm not familiar with zlib, but I saw this in manual:
> > 
> >   https://www.zlib.net/manual.html
> > 
> >   This function is equivalent to deflateEnd followed by deflateInit,
> >   but does not free and reallocate the internal compression state. The
> >   stream will leave the compression level and any other attributes that
> >   may have been set unchanged.
> > 
> > I thought it was deflateInit() who is slow?  Can we avoid the reset as
> 
> deflateEnd() is worse as it frees memory to kernel which triggers
> TLB flush and mmu-notifier.
> 
> > long as we make sure to deflateInit() before doing anything else?
> 
> Actually, deflateReset() is cheap... :)
> 
> > 
> > Meanwhile, is there any performance number for this single patch?
> > Since I thought the old code is calling compress2() which contains
> > deflateInit() and deflateEnd() too, just like what current patch do?
> 
> No, after the patch, we just call deflateInit() / deflateEnd() one
> time (in _setup() handler and _cleanup handler).
> 
> Yes. This is the perf data from our production,
> after revert this patch:
> +  57.88%  kqemu  [kernel.kallsyms]        [k] queued_spin_lock_slowpath
> +  10.55%  kqemu  [kernel.kallsyms]        [k] __lock_acquire
> +   4.83%  kqemu  [kernel.kallsyms]        [k] flush_tlb_func_common
> 
> -   1.16%  kqemu  [kernel.kallsyms]        [k] lock_acquire                                       ▒
>    - lock_acquire                                                                                 ▒
>       - 15.68% _raw_spin_lock                                                                     ▒
>          + 29.42% __schedule                                                                      ▒
>          + 29.14% perf_event_context_sched_out                                                    ▒
>          + 23.60% tdp_page_fault                                                                  ▒
>          + 10.54% do_anonymous_page                                                               ▒
>          + 2.07% kvm_mmu_notifier_invalidate_range_start                                          ▒
>          + 1.83% zap_pte_range                                                                    ▒
>          + 1.44% kvm_mmu_notifier_invalidate_range_end
> 
> 
> apply our work:
> +  51.92%  kqemu  [kernel.kallsyms]        [k] queued_spin_lock_slowpath
> +  14.82%  kqemu  [kernel.kallsyms]        [k] __lock_acquire
> +   1.47%  kqemu  [kernel.kallsyms]        [k] mark_lock.clone.0
> +   1.46%  kqemu  [kernel.kallsyms]        [k] native_sched_clock
> +   1.31%  kqemu  [kernel.kallsyms]        [k] lock_acquire
> +   1.24%  kqemu  libc-2.12.so             [.] __memset_sse2
> 
> -  14.82%  kqemu  [kernel.kallsyms]        [k] __lock_acquire                                     ▒
>    - __lock_acquire                                                                               ▒
>       - 99.75% lock_acquire                                                                       ▒
>          - 18.38% _raw_spin_lock                                                                  ▒
>             + 39.62% tdp_page_fault                                                               ▒
>             + 31.32% __schedule                                                                   ▒
>             + 27.53% perf_event_context_sched_out                                                 ▒
>             + 0.58% hrtimer_interrupt
> 
> 
> You can see the TLB flush and mmu-lock contention have gone after this patch.

Yes.  Obviously I misunderstood the documentation for deflateReset().
It's not really a combined "End+Init", a quick glance in zlib code
shows that deflateInit() will do the mallocs, then call deflateReset()
at last.  So the buffers should be kept for reset(), as you explained.

> 
> > 
> > It would be nice too if we can split the patch into two (decode,
> > encode) if you want, but that's optional.
> 
> That's good to me, thank you, Peter.

Thanks for explaining.
diff mbox

Patch

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 2ab2bf362d..1ff33a1ffb 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -658,6 +658,30 @@  uint64_t qemu_get_be64(QEMUFile *f)
     return v;
 }
 
+/* return the size after compression, or negative value on error */
+static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+                              const uint8_t *source, size_t source_len)
+{
+    int err;
+
+    err = deflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = source_len;
+    stream->next_in = (uint8_t *)source;
+    stream->avail_out = dest_len;
+    stream->next_out = dest;
+
+    err = deflate(stream, Z_FINISH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->next_out - dest;
+}
+
 /* Compress size bytes of data start at p with specific compression
  * level and store the compressed data to the buffer of f.
  *
@@ -668,8 +692,8 @@  uint64_t qemu_get_be64(QEMUFile *f)
  * data, return -1.
  */
 
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
-                                  int level)
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+                                  const uint8_t *p, size_t size)
 {
     ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
 
@@ -683,8 +707,10 @@  ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
             return -1;
         }
     }
-    if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
-                  (Bytef *)p, size, level) != Z_OK) {
+
+    blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
+                              blen, p, size);
+    if (blen < 0) {
         error_report("Compress Failed!");
         return 0;
     }
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index aae4e5ed36..d123b21ca8 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -25,6 +25,8 @@ 
 #ifndef MIGRATION_QEMU_FILE_H
 #define MIGRATION_QEMU_FILE_H
 
+#include <zlib.h>
+
 /* Read a chunk of data from a file at the given position.  The pos argument
  * can be ignored if the file is only be used for streaming.  The number of
  * bytes actually read should be returned.
@@ -132,8 +134,8 @@  bool qemu_file_is_writable(QEMUFile *f);
 
 size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
 size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
-ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
-                                  int level);
+ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+                                  const uint8_t *p, size_t size);
 int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
 
 /*
diff --git a/migration/ram.c b/migration/ram.c
index 615693f180..fff3f31e90 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -264,6 +264,7 @@  struct CompressParam {
     QemuCond cond;
     RAMBlock *block;
     ram_addr_t offset;
+    z_stream stream;
 };
 typedef struct CompressParam CompressParam;
 
@@ -275,6 +276,7 @@  struct DecompressParam {
     void *des;
     uint8_t *compbuf;
     int len;
+    z_stream stream;
 };
 typedef struct DecompressParam DecompressParam;
 
@@ -294,7 +296,7 @@  static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
                                 ram_addr_t offset);
 
 static void *do_data_compress(void *opaque)
@@ -311,7 +313,7 @@  static void *do_data_compress(void *opaque)
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            do_compress_ram_page(param->file, block, offset);
+            do_compress_ram_page(param->file, &param->stream, block, offset);
 
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
@@ -352,10 +354,17 @@  static void compress_threads_save_cleanup(void)
     terminate_compression_threads();
     thread_count = migrate_compress_threads();
     for (i = 0; i < thread_count; i++) {
+        /* something in compress_threads_save_setup() is wrong. */
+        if (!comp_param[i].stream.opaque) {
+            break;
+        }
+
         qemu_thread_join(compress_threads + i);
         qemu_fclose(comp_param[i].file);
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
+        deflateEnd(&comp_param[i].stream);
+        comp_param[i].stream.opaque = NULL;
     }
     qemu_mutex_destroy(&comp_done_lock);
     qemu_cond_destroy(&comp_done_cond);
@@ -365,12 +374,12 @@  static void compress_threads_save_cleanup(void)
     comp_param = NULL;
 }
 
-static void compress_threads_save_setup(void)
+static int compress_threads_save_setup(void)
 {
     int i, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
     thread_count = migrate_compress_threads();
     compress_threads = g_new0(QemuThread, thread_count);
@@ -378,6 +387,12 @@  static void compress_threads_save_setup(void)
     qemu_cond_init(&comp_done_cond);
     qemu_mutex_init(&comp_done_lock);
     for (i = 0; i < thread_count; i++) {
+        if (deflateInit(&comp_param[i].stream,
+                           migrate_compress_level()) != Z_OK) {
+            goto exit;
+        }
+        comp_param[i].stream.opaque = &comp_param[i];
+
         /* comp_param[i].file is just used as a dummy buffer to save data,
          * set its ops to empty.
          */
@@ -390,6 +405,11 @@  static void compress_threads_save_setup(void)
                            do_data_compress, comp_param + i,
                            QEMU_THREAD_JOINABLE);
     }
+    return 0;
+
+exit:
+    compress_threads_save_cleanup();
+    return -1;
 }
 
 /* Multiple fd's */
@@ -1026,7 +1046,7 @@  static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
-static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
+static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
                                 ram_addr_t offset)
 {
     RAMState *rs = ram_state;
@@ -1035,8 +1055,7 @@  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
 
     bytes_sent = save_page_header(rs, f, block, offset |
                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
-    blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
-                                     migrate_compress_level());
+    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
     if (blen < 0) {
         bytes_sent = 0;
         qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2209,9 +2228,14 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
     RAMState **rsp = opaque;
     RAMBlock *block;
 
+    if (compress_threads_save_setup()) {
+        return -1;
+    }
+
     /* migration has already setup the bitmap, reuse it. */
     if (!migration_in_colo_state()) {
         if (ram_init_all(rsp) != 0) {
+            compress_threads_save_cleanup();
             return -1;
         }
     }
@@ -2231,7 +2255,6 @@  static int ram_save_setup(QEMUFile *f, void *opaque)
     }
 
     rcu_read_unlock();
-    compress_threads_save_setup();
 
     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@@ -2495,6 +2518,30 @@  void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+/* return the size after decompression, or negative value on error */
+static int qemu_uncompress(z_stream *stream, uint8_t *dest, size_t dest_len,
+                           uint8_t *source, size_t source_len)
+{
+    int err;
+
+    err = inflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = source_len;
+    stream->next_in = source;
+    stream->avail_out = dest_len;
+    stream->next_out = dest;
+
+    err = inflate(stream, Z_NO_FLUSH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->total_out;
+}
+
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
@@ -2511,13 +2558,13 @@  static void *do_data_decompress(void *opaque)
             qemu_mutex_unlock(&param->mutex);
 
             pagesize = TARGET_PAGE_SIZE;
-            /* uncompress() will return failed in some case, especially
+            /* qemu_uncompress() will return failed in some case, especially
              * when the page is dirted when doing the compression, it's
              * not a problem because the dirty page will be retransferred
              * and uncompress() won't break the data in other pages.
              */
-            uncompress((Bytef *)des, &pagesize,
-                       (const Bytef *)param->compbuf, len);
+            qemu_uncompress(&param->stream, des, pagesize,
+                            param->compbuf, len);
 
             qemu_mutex_lock(&decomp_done_lock);
             param->done = true;
@@ -2552,30 +2599,6 @@  static void wait_for_decompress_done(void)
     qemu_mutex_unlock(&decomp_done_lock);
 }
 
-static void compress_threads_load_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return;
-    }
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    for (i = 0; i < thread_count; i++) {
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-}
-
 static void compress_threads_load_cleanup(void)
 {
     int i, thread_count;
@@ -2585,16 +2608,26 @@  static void compress_threads_load_cleanup(void)
     }
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        if (!decomp_param[i].stream.opaque) {
+            break;
+        }
+
         qemu_mutex_lock(&decomp_param[i].mutex);
         decomp_param[i].quit = true;
         qemu_cond_signal(&decomp_param[i].cond);
         qemu_mutex_unlock(&decomp_param[i].mutex);
     }
     for (i = 0; i < thread_count; i++) {
+        if (!decomp_param[i].stream.opaque) {
+            break;
+        }
+
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
         g_free(decomp_param[i].compbuf);
+        inflateEnd(&decomp_param[i].stream);
+        decomp_param[i].stream.opaque = NULL;
     }
     g_free(decompress_threads);
     g_free(decomp_param);
@@ -2602,6 +2635,40 @@  static void compress_threads_load_cleanup(void)
     decomp_param = NULL;
 }
 
+static int compress_threads_load_setup(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    thread_count = migrate_decompress_threads();
+    decompress_threads = g_new0(QemuThread, thread_count);
+    decomp_param = g_new0(DecompressParam, thread_count);
+    qemu_mutex_init(&decomp_done_lock);
+    qemu_cond_init(&decomp_done_cond);
+    for (i = 0; i < thread_count; i++) {
+        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+            goto exit;
+        }
+        decomp_param[i].stream.opaque = &decomp_param[i];
+
+        qemu_mutex_init(&decomp_param[i].mutex);
+        qemu_cond_init(&decomp_param[i].cond);
+        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+        decomp_param[i].done = true;
+        decomp_param[i].quit = false;
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+    return 0;
+exit:
+    compress_threads_load_cleanup();
+    return -1;
+}
+
 static void decompress_data_with_multi_threads(QEMUFile *f,
                                                void *host, int len)
 {
@@ -2641,8 +2708,11 @@  static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
+    if (compress_threads_load_setup()) {
+        return -1;
+    }
+
     xbzrle_load_setup();
-    compress_threads_load_setup();
     ramblock_recv_map_init();
     return 0;
 }