diff mbox series

[v3,3/5] migration: use threaded workqueue for compression

Message ID 20181122072028.22819-4-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show
Series migration: improve multithreads | expand

Commit Message

Xiao Guangrong Nov. 22, 2018, 7:20 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

Adapt the compression code to the threaded workqueue

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 308 ++++++++++++++++++++------------------------------------
 1 file changed, 110 insertions(+), 198 deletions(-)

Comments

Dr. David Alan Gilbert Nov. 23, 2018, 6:17 p.m. UTC | #1
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Adapt the compression code to the threaded workqueue
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/ram.c | 308 ++++++++++++++++++++------------------------------------
>  1 file changed, 110 insertions(+), 198 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 7e7deec4d8..254c08f27b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -57,6 +57,7 @@
>  #include "qemu/uuid.h"
>  #include "savevm.h"
>  #include "qemu/iov.h"
> +#include "qemu/threaded-workqueue.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
>  
>  CompressionStats compression_counters;
>  
> -struct CompressParam {
> -    bool done;
> -    bool quit;
> -    bool zero_page;
> -    QEMUFile *file;
> -    QemuMutex mutex;
> -    QemuCond cond;
> -    RAMBlock *block;
> -    ram_addr_t offset;
> -
> -    /* internally used fields */
> -    z_stream stream;
> -    uint8_t *originbuf;
> -};
> -typedef struct CompressParam CompressParam;
> -
>  struct DecompressParam {
>      bool done;
>      bool quit;
> @@ -377,15 +362,6 @@ struct DecompressParam {
>  };
>  typedef struct DecompressParam DecompressParam;
>  
> -static CompressParam *comp_param;
> -static QemuThread *compress_threads;
> -/* comp_done_cond is used to wake up the migration thread when
> - * one of the compression threads has finished the compression.
> - * comp_done_lock is used to co-work with comp_done_cond.
> - */
> -static QemuMutex comp_done_lock;
> -static QemuCond comp_done_cond;
> -/* The empty QEMUFileOps will be used by file in CompressParam */
>  static const QEMUFileOps empty_ops = { };
>  
>  static QEMUFile *decomp_file;
> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
>  static QemuMutex decomp_done_lock;
>  static QemuCond decomp_done_cond;
>  
> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
> -                                 ram_addr_t offset, uint8_t *source_buf);
> -
> -static void *do_data_compress(void *opaque)
> -{
> -    CompressParam *param = opaque;
> -    RAMBlock *block;
> -    ram_addr_t offset;
> -    bool zero_page;
> -
> -    qemu_mutex_lock(&param->mutex);
> -    while (!param->quit) {
> -        if (param->block) {
> -            block = param->block;
> -            offset = param->offset;
> -            param->block = NULL;
> -            qemu_mutex_unlock(&param->mutex);
> -
> -            zero_page = do_compress_ram_page(param->file, &param->stream,
> -                                             block, offset, param->originbuf);
> -
> -            qemu_mutex_lock(&comp_done_lock);
> -            param->done = true;
> -            param->zero_page = zero_page;
> -            qemu_cond_signal(&comp_done_cond);
> -            qemu_mutex_unlock(&comp_done_lock);
> -
> -            qemu_mutex_lock(&param->mutex);
> -        } else {
> -            qemu_cond_wait(&param->cond, &param->mutex);
> -        }
> -    }
> -    qemu_mutex_unlock(&param->mutex);
> -
> -    return NULL;
> -}
> -
> -static void compress_threads_save_cleanup(void)
> -{
> -    int i, thread_count;
> -
> -    if (!migrate_use_compression() || !comp_param) {
> -        return;
> -    }
> -
> -    thread_count = migrate_compress_threads();
> -    for (i = 0; i < thread_count; i++) {
> -        /*
> -         * we use it as a indicator which shows if the thread is
> -         * properly init'd or not
> -         */
> -        if (!comp_param[i].file) {
> -            break;
> -        }
> -
> -        qemu_mutex_lock(&comp_param[i].mutex);
> -        comp_param[i].quit = true;
> -        qemu_cond_signal(&comp_param[i].cond);
> -        qemu_mutex_unlock(&comp_param[i].mutex);
> -
> -        qemu_thread_join(compress_threads + i);
> -        qemu_mutex_destroy(&comp_param[i].mutex);
> -        qemu_cond_destroy(&comp_param[i].cond);
> -        deflateEnd(&comp_param[i].stream);
> -        g_free(comp_param[i].originbuf);
> -        qemu_fclose(comp_param[i].file);
> -        comp_param[i].file = NULL;
> -    }
> -    qemu_mutex_destroy(&comp_done_lock);
> -    qemu_cond_destroy(&comp_done_cond);
> -    g_free(compress_threads);
> -    g_free(comp_param);
> -    compress_threads = NULL;
> -    comp_param = NULL;
> -}
> -
> -static int compress_threads_save_setup(void)
> -{
> -    int i, thread_count;
> -
> -    if (!migrate_use_compression()) {
> -        return 0;
> -    }
> -    thread_count = migrate_compress_threads();
> -    compress_threads = g_new0(QemuThread, thread_count);
> -    comp_param = g_new0(CompressParam, thread_count);
> -    qemu_cond_init(&comp_done_cond);
> -    qemu_mutex_init(&comp_done_lock);
> -    for (i = 0; i < thread_count; i++) {
> -        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> -        if (!comp_param[i].originbuf) {
> -            goto exit;
> -        }
> -
> -        if (deflateInit(&comp_param[i].stream,
> -                        migrate_compress_level()) != Z_OK) {
> -            g_free(comp_param[i].originbuf);
> -            goto exit;
> -        }
> -
> -        /* comp_param[i].file is just used as a dummy buffer to save data,
> -         * set its ops to empty.
> -         */
> -        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> -        comp_param[i].done = true;
> -        comp_param[i].quit = false;
> -        qemu_mutex_init(&comp_param[i].mutex);
> -        qemu_cond_init(&comp_param[i].cond);
> -        qemu_thread_create(compress_threads + i, "compress",
> -                           do_data_compress, comp_param + i,
> -                           QEMU_THREAD_JOINABLE);
> -    }
> -    return 0;
> -
> -exit:
> -    compress_threads_save_cleanup();
> -    return -1;
> -}
> -
>  /* Multiple fd's */
>  
>  #define MULTIFD_MAGIC 0x11223344U
> @@ -1909,12 +1766,25 @@ exit:
>      return zero_page;
>  }
>  
> +struct CompressData {
> +    /* filled by migration thread.*/
> +    RAMBlock *block;
> +    ram_addr_t offset;
> +
> +    /* filled by compress thread. */
> +    QEMUFile *file;
> +    z_stream stream;
> +    uint8_t *originbuf;
> +    bool zero_page;
> +};
> +typedef struct CompressData CompressData;
> +
>  static void
> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)

Keep the const?
>  {
>      ram_counters.transferred += bytes_xmit;
>  
> -    if (param->zero_page) {
> +    if (cd->zero_page) {
>          ram_counters.duplicate++;
>          return;
>      }
> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>      compression_counters.pages++;
>  }
>  
> +static int compress_thread_data_init(void *request)
> +{
> +    CompressData *cd = request;
> +
> +    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> +    if (!cd->originbuf) {
> +        return -1;
> +    }
> +
> +    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
> +        g_free(cd->originbuf);
> +        return -1;
> +    }

Please print errors if you fail in any case so we can easily tell what
happened.

> +    cd->file = qemu_fopen_ops(NULL, &empty_ops);
> +    return 0;
> +}
> +
> +static void compress_thread_data_fini(void *request)
> +{
> +    CompressData *cd = request;
> +
> +    qemu_fclose(cd->file);
> +    deflateEnd(&cd->stream);
> +    g_free(cd->originbuf);
> +}
> +
> +static void compress_thread_data_handler(void *request)
> +{
> +    CompressData *cd = request;
> +
> +    /*
> +     * if compression fails, it will be indicated by
> +     * migrate_get_current()->to_dst_file.
> +     */
> +    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
> +                                         cd->offset, cd->originbuf);
> +}
> +
> +static void compress_thread_data_done(void *request)
> +{
> +    CompressData *cd = request;
> +    RAMState *rs = ram_state;
> +    int bytes_xmit;
> +
> +    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
> +    update_compress_thread_counts(cd, bytes_xmit);
> +}
> +
> +static const ThreadedWorkqueueOps compress_ops = {
> +    .thread_request_init = compress_thread_data_init,
> +    .thread_request_uninit = compress_thread_data_fini,
> +    .thread_request_handler = compress_thread_data_handler,
> +    .thread_request_done = compress_thread_data_done,
> +    .request_size = sizeof(CompressData),
> +};
> +
> +static Threads *compress_threads;
> +
>  static bool save_page_use_compression(RAMState *rs);
>  
>  static void flush_compressed_data(RAMState *rs)
>  {
> -    int idx, len, thread_count;
> -
>      if (!save_page_use_compression(rs)) {
>          return;
>      }
> -    thread_count = migrate_compress_threads();
>  
> -    qemu_mutex_lock(&comp_done_lock);
> -    for (idx = 0; idx < thread_count; idx++) {
> -        while (!comp_param[idx].done) {
> -            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> -        }
> -    }
> -    qemu_mutex_unlock(&comp_done_lock);
> +    threaded_workqueue_wait_for_requests(compress_threads);
> +}
>  
> -    for (idx = 0; idx < thread_count; idx++) {
> -        qemu_mutex_lock(&comp_param[idx].mutex);
> -        if (!comp_param[idx].quit) {
> -            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> -            /*
> -             * it's safe to fetch zero_page without holding comp_done_lock
> -             * as there is no further request submitted to the thread,
> -             * i.e, the thread should be waiting for a request at this point.
> -             */
> -            update_compress_thread_counts(&comp_param[idx], len);
> -        }
> -        qemu_mutex_unlock(&comp_param[idx].mutex);
> +static void compress_threads_save_cleanup(void)
> +{
> +    if (!compress_threads) {
> +        return;
>      }
> +
> +    threaded_workqueue_destroy(compress_threads);
> +    compress_threads = NULL;
>  }
>  
> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> -                                       ram_addr_t offset)
> +static int compress_threads_save_setup(void)
>  {
> -    param->block = block;
> -    param->offset = offset;
> +    if (!migrate_use_compression()) {
> +        return 0;
> +    }
> +
> +    compress_threads = threaded_workqueue_create("compress",
> +                                migrate_compress_threads(),
> +                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
> +    return compress_threads ? 0 : -1;
>  }
>  
>  static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
>                                             ram_addr_t offset)
>  {
> -    int idx, thread_count, bytes_xmit = -1, pages = -1;
> +    CompressData *cd;
>      bool wait = migrate_compress_wait_thread();
>  
> -    thread_count = migrate_compress_threads();
> -    qemu_mutex_lock(&comp_done_lock);
>  retry:
> -    for (idx = 0; idx < thread_count; idx++) {
> -        if (comp_param[idx].done) {
> -            comp_param[idx].done = false;
> -            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> -            qemu_mutex_lock(&comp_param[idx].mutex);
> -            set_compress_params(&comp_param[idx], block, offset);
> -            qemu_cond_signal(&comp_param[idx].cond);
> -            qemu_mutex_unlock(&comp_param[idx].mutex);
> -            pages = 1;
> -            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
> -            break;
> +    cd = threaded_workqueue_get_request(compress_threads);
> +    if (!cd) {
> +        /*
> +         * wait for the free thread if the user specifies
> +         * 'compress-wait-thread', otherwise we will post
> +         *  the page out in the main thread as normal page.
> +         */
> +        if (wait) {
> +            cpu_relax();
> +            goto retry;

Is there nothing better we can use to wait without eating CPU time?

Dave

>          }
> -    }
>  
> -    /*
> -     * wait for the free thread if the user specifies 'compress-wait-thread',
> -     * otherwise we will post the page out in the main thread as normal page.
> -     */
> -    if (pages < 0 && wait) {
> -        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> -        goto retry;
> -    }
> -    qemu_mutex_unlock(&comp_done_lock);
> -
> -    return pages;
> +        return -1;
> +     }
> +    cd->block = block;
> +    cd->offset = offset;
> +    threaded_workqueue_submit_request(compress_threads, cd);
> +    return 1;
>  }
>  
>  /**
> -- 
> 2.14.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Paolo Bonzini Nov. 23, 2018, 6:22 p.m. UTC | #2
On 23/11/18 19:17, Dr. David Alan Gilbert wrote:
> * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Adapt the compression code to the threaded workqueue
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>> ---
>>  migration/ram.c | 308 ++++++++++++++++++++------------------------------------
>>  1 file changed, 110 insertions(+), 198 deletions(-)
>>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 7e7deec4d8..254c08f27b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -57,6 +57,7 @@
>>  #include "qemu/uuid.h"
>>  #include "savevm.h"
>>  #include "qemu/iov.h"
>> +#include "qemu/threaded-workqueue.h"
>>  
>>  /***********************************************************/
>>  /* ram save/restore */
>> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
>>  
>>  CompressionStats compression_counters;
>>  
>> -struct CompressParam {
>> -    bool done;
>> -    bool quit;
>> -    bool zero_page;
>> -    QEMUFile *file;
>> -    QemuMutex mutex;
>> -    QemuCond cond;
>> -    RAMBlock *block;
>> -    ram_addr_t offset;
>> -
>> -    /* internally used fields */
>> -    z_stream stream;
>> -    uint8_t *originbuf;
>> -};
>> -typedef struct CompressParam CompressParam;
>> -
>>  struct DecompressParam {
>>      bool done;
>>      bool quit;
>> @@ -377,15 +362,6 @@ struct DecompressParam {
>>  };
>>  typedef struct DecompressParam DecompressParam;
>>  
>> -static CompressParam *comp_param;
>> -static QemuThread *compress_threads;
>> -/* comp_done_cond is used to wake up the migration thread when
>> - * one of the compression threads has finished the compression.
>> - * comp_done_lock is used to co-work with comp_done_cond.
>> - */
>> -static QemuMutex comp_done_lock;
>> -static QemuCond comp_done_cond;
>> -/* The empty QEMUFileOps will be used by file in CompressParam */
>>  static const QEMUFileOps empty_ops = { };
>>  
>>  static QEMUFile *decomp_file;
>> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
>>  static QemuMutex decomp_done_lock;
>>  static QemuCond decomp_done_cond;
>>  
>> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>> -                                 ram_addr_t offset, uint8_t *source_buf);
>> -
>> -static void *do_data_compress(void *opaque)
>> -{
>> -    CompressParam *param = opaque;
>> -    RAMBlock *block;
>> -    ram_addr_t offset;
>> -    bool zero_page;
>> -
>> -    qemu_mutex_lock(&param->mutex);
>> -    while (!param->quit) {
>> -        if (param->block) {
>> -            block = param->block;
>> -            offset = param->offset;
>> -            param->block = NULL;
>> -            qemu_mutex_unlock(&param->mutex);
>> -
>> -            zero_page = do_compress_ram_page(param->file, &param->stream,
>> -                                             block, offset, param->originbuf);
>> -
>> -            qemu_mutex_lock(&comp_done_lock);
>> -            param->done = true;
>> -            param->zero_page = zero_page;
>> -            qemu_cond_signal(&comp_done_cond);
>> -            qemu_mutex_unlock(&comp_done_lock);
>> -
>> -            qemu_mutex_lock(&param->mutex);
>> -        } else {
>> -            qemu_cond_wait(&param->cond, &param->mutex);
>> -        }
>> -    }
>> -    qemu_mutex_unlock(&param->mutex);
>> -
>> -    return NULL;
>> -}
>> -
>> -static void compress_threads_save_cleanup(void)
>> -{
>> -    int i, thread_count;
>> -
>> -    if (!migrate_use_compression() || !comp_param) {
>> -        return;
>> -    }
>> -
>> -    thread_count = migrate_compress_threads();
>> -    for (i = 0; i < thread_count; i++) {
>> -        /*
>> -         * we use it as a indicator which shows if the thread is
>> -         * properly init'd or not
>> -         */
>> -        if (!comp_param[i].file) {
>> -            break;
>> -        }
>> -
>> -        qemu_mutex_lock(&comp_param[i].mutex);
>> -        comp_param[i].quit = true;
>> -        qemu_cond_signal(&comp_param[i].cond);
>> -        qemu_mutex_unlock(&comp_param[i].mutex);
>> -
>> -        qemu_thread_join(compress_threads + i);
>> -        qemu_mutex_destroy(&comp_param[i].mutex);
>> -        qemu_cond_destroy(&comp_param[i].cond);
>> -        deflateEnd(&comp_param[i].stream);
>> -        g_free(comp_param[i].originbuf);
>> -        qemu_fclose(comp_param[i].file);
>> -        comp_param[i].file = NULL;
>> -    }
>> -    qemu_mutex_destroy(&comp_done_lock);
>> -    qemu_cond_destroy(&comp_done_cond);
>> -    g_free(compress_threads);
>> -    g_free(comp_param);
>> -    compress_threads = NULL;
>> -    comp_param = NULL;
>> -}
>> -
>> -static int compress_threads_save_setup(void)
>> -{
>> -    int i, thread_count;
>> -
>> -    if (!migrate_use_compression()) {
>> -        return 0;
>> -    }
>> -    thread_count = migrate_compress_threads();
>> -    compress_threads = g_new0(QemuThread, thread_count);
>> -    comp_param = g_new0(CompressParam, thread_count);
>> -    qemu_cond_init(&comp_done_cond);
>> -    qemu_mutex_init(&comp_done_lock);
>> -    for (i = 0; i < thread_count; i++) {
>> -        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> -        if (!comp_param[i].originbuf) {
>> -            goto exit;
>> -        }
>> -
>> -        if (deflateInit(&comp_param[i].stream,
>> -                        migrate_compress_level()) != Z_OK) {
>> -            g_free(comp_param[i].originbuf);
>> -            goto exit;
>> -        }
>> -
>> -        /* comp_param[i].file is just used as a dummy buffer to save data,
>> -         * set its ops to empty.
>> -         */
>> -        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
>> -        comp_param[i].done = true;
>> -        comp_param[i].quit = false;
>> -        qemu_mutex_init(&comp_param[i].mutex);
>> -        qemu_cond_init(&comp_param[i].cond);
>> -        qemu_thread_create(compress_threads + i, "compress",
>> -                           do_data_compress, comp_param + i,
>> -                           QEMU_THREAD_JOINABLE);
>> -    }
>> -    return 0;
>> -
>> -exit:
>> -    compress_threads_save_cleanup();
>> -    return -1;
>> -}
>> -
>>  /* Multiple fd's */
>>  
>>  #define MULTIFD_MAGIC 0x11223344U
>> @@ -1909,12 +1766,25 @@ exit:
>>      return zero_page;
>>  }
>>  
>> +struct CompressData {
>> +    /* filled by migration thread.*/
>> +    RAMBlock *block;
>> +    ram_addr_t offset;
>> +
>> +    /* filled by compress thread. */
>> +    QEMUFile *file;
>> +    z_stream stream;
>> +    uint8_t *originbuf;
>> +    bool zero_page;
>> +};
>> +typedef struct CompressData CompressData;
>> +
>>  static void
>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
> 
> Keep the const?
>>  {
>>      ram_counters.transferred += bytes_xmit;
>>  
>> -    if (param->zero_page) {
>> +    if (cd->zero_page) {
>>          ram_counters.duplicate++;
>>          return;
>>      }
>> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>>      compression_counters.pages++;
>>  }
>>  
>> +static int compress_thread_data_init(void *request)
>> +{
>> +    CompressData *cd = request;
>> +
>> +    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> +    if (!cd->originbuf) {
>> +        return -1;
>> +    }
>> +
>> +    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
>> +        g_free(cd->originbuf);
>> +        return -1;
>> +    }
> 
> Please print errors if you fail in any case so we can easily tell what
> happened.
> 
>> +    cd->file = qemu_fopen_ops(NULL, &empty_ops);
>> +    return 0;
>> +}
>> +
>> +static void compress_thread_data_fini(void *request)
>> +{
>> +    CompressData *cd = request;
>> +
>> +    qemu_fclose(cd->file);
>> +    deflateEnd(&cd->stream);
>> +    g_free(cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_handler(void *request)
>> +{
>> +    CompressData *cd = request;
>> +
>> +    /*
>> +     * if compression fails, it will be indicated by
>> +     * migrate_get_current()->to_dst_file.
>> +     */
>> +    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
>> +                                         cd->offset, cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_done(void *request)
>> +{
>> +    CompressData *cd = request;
>> +    RAMState *rs = ram_state;
>> +    int bytes_xmit;
>> +
>> +    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
>> +    update_compress_thread_counts(cd, bytes_xmit);
>> +}
>> +
>> +static const ThreadedWorkqueueOps compress_ops = {
>> +    .thread_request_init = compress_thread_data_init,
>> +    .thread_request_uninit = compress_thread_data_fini,
>> +    .thread_request_handler = compress_thread_data_handler,
>> +    .thread_request_done = compress_thread_data_done,
>> +    .request_size = sizeof(CompressData),
>> +};
>> +
>> +static Threads *compress_threads;
>> +
>>  static bool save_page_use_compression(RAMState *rs);
>>  
>>  static void flush_compressed_data(RAMState *rs)
>>  {
>> -    int idx, len, thread_count;
>> -
>>      if (!save_page_use_compression(rs)) {
>>          return;
>>      }
>> -    thread_count = migrate_compress_threads();
>>  
>> -    qemu_mutex_lock(&comp_done_lock);
>> -    for (idx = 0; idx < thread_count; idx++) {
>> -        while (!comp_param[idx].done) {
>> -            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
>> -        }
>> -    }
>> -    qemu_mutex_unlock(&comp_done_lock);
>> +    threaded_workqueue_wait_for_requests(compress_threads);
>> +}
>>  
>> -    for (idx = 0; idx < thread_count; idx++) {
>> -        qemu_mutex_lock(&comp_param[idx].mutex);
>> -        if (!comp_param[idx].quit) {
>> -            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> -            /*
>> -             * it's safe to fetch zero_page without holding comp_done_lock
>> -             * as there is no further request submitted to the thread,
>> -             * i.e, the thread should be waiting for a request at this point.
>> -             */
>> -            update_compress_thread_counts(&comp_param[idx], len);
>> -        }
>> -        qemu_mutex_unlock(&comp_param[idx].mutex);
>> +static void compress_threads_save_cleanup(void)
>> +{
>> +    if (!compress_threads) {
>> +        return;
>>      }
>> +
>> +    threaded_workqueue_destroy(compress_threads);
>> +    compress_threads = NULL;
>>  }
>>  
>> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
>> -                                       ram_addr_t offset)
>> +static int compress_threads_save_setup(void)
>>  {
>> -    param->block = block;
>> -    param->offset = offset;
>> +    if (!migrate_use_compression()) {
>> +        return 0;
>> +    }
>> +
>> +    compress_threads = threaded_workqueue_create("compress",
>> +                                migrate_compress_threads(),
>> +                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
>> +    return compress_threads ? 0 : -1;
>>  }
>>  
>>  static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
>>                                             ram_addr_t offset)
>>  {
>> -    int idx, thread_count, bytes_xmit = -1, pages = -1;
>> +    CompressData *cd;
>>      bool wait = migrate_compress_wait_thread();
>>  
>> -    thread_count = migrate_compress_threads();
>> -    qemu_mutex_lock(&comp_done_lock);
>>  retry:
>> -    for (idx = 0; idx < thread_count; idx++) {
>> -        if (comp_param[idx].done) {
>> -            comp_param[idx].done = false;
>> -            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> -            qemu_mutex_lock(&comp_param[idx].mutex);
>> -            set_compress_params(&comp_param[idx], block, offset);
>> -            qemu_cond_signal(&comp_param[idx].cond);
>> -            qemu_mutex_unlock(&comp_param[idx].mutex);
>> -            pages = 1;
>> -            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
>> -            break;
>> +    cd = threaded_workqueue_get_request(compress_threads);
>> +    if (!cd) {
>> +        /*
>> +         * wait for the free thread if the user specifies
>> +         * 'compress-wait-thread', otherwise we will post
>> +         *  the page out in the main thread as normal page.
>> +         */
>> +        if (wait) {
>> +            cpu_relax();
>> +            goto retry;
> 
> Is there nothing better we can use to wait without eating CPU time?

There is a mechanism to wait without eating CPU time in the data
structure, but it makes sense to busy wait.  There are 4 threads in the
workqueue, so you have to compare 1/4th of the time spent compressing a
page, with the trip into the kernel to wake you up.  You're adding 20%
CPU usage, but I'm not surprised it's worthwhile.

Paolo
Dr. David Alan Gilbert Nov. 23, 2018, 6:29 p.m. UTC | #3
* Paolo Bonzini (pbonzini@redhat.com) wrote:
> On 23/11/18 19:17, Dr. David Alan Gilbert wrote:
> > * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> >> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> >>
> >> Adapt the compression code to the threaded workqueue
> >>
> >> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> >> ---
> >>  migration/ram.c | 308 ++++++++++++++++++++------------------------------------
> >>  1 file changed, 110 insertions(+), 198 deletions(-)
> >>
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index 7e7deec4d8..254c08f27b 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -57,6 +57,7 @@
> >>  #include "qemu/uuid.h"
> >>  #include "savevm.h"
> >>  #include "qemu/iov.h"
> >> +#include "qemu/threaded-workqueue.h"
> >>  
> >>  /***********************************************************/
> >>  /* ram save/restore */
> >> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
> >>  
> >>  CompressionStats compression_counters;
> >>  
> >> -struct CompressParam {
> >> -    bool done;
> >> -    bool quit;
> >> -    bool zero_page;
> >> -    QEMUFile *file;
> >> -    QemuMutex mutex;
> >> -    QemuCond cond;
> >> -    RAMBlock *block;
> >> -    ram_addr_t offset;
> >> -
> >> -    /* internally used fields */
> >> -    z_stream stream;
> >> -    uint8_t *originbuf;
> >> -};
> >> -typedef struct CompressParam CompressParam;
> >> -
> >>  struct DecompressParam {
> >>      bool done;
> >>      bool quit;
> >> @@ -377,15 +362,6 @@ struct DecompressParam {
> >>  };
> >>  typedef struct DecompressParam DecompressParam;
> >>  
> >> -static CompressParam *comp_param;
> >> -static QemuThread *compress_threads;
> >> -/* comp_done_cond is used to wake up the migration thread when
> >> - * one of the compression threads has finished the compression.
> >> - * comp_done_lock is used to co-work with comp_done_cond.
> >> - */
> >> -static QemuMutex comp_done_lock;
> >> -static QemuCond comp_done_cond;
> >> -/* The empty QEMUFileOps will be used by file in CompressParam */
> >>  static const QEMUFileOps empty_ops = { };
> >>  
> >>  static QEMUFile *decomp_file;
> >> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
> >>  static QemuMutex decomp_done_lock;
> >>  static QemuCond decomp_done_cond;
> >>  
> >> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
> >> -                                 ram_addr_t offset, uint8_t *source_buf);
> >> -
> >> -static void *do_data_compress(void *opaque)
> >> -{
> >> -    CompressParam *param = opaque;
> >> -    RAMBlock *block;
> >> -    ram_addr_t offset;
> >> -    bool zero_page;
> >> -
> >> -    qemu_mutex_lock(&param->mutex);
> >> -    while (!param->quit) {
> >> -        if (param->block) {
> >> -            block = param->block;
> >> -            offset = param->offset;
> >> -            param->block = NULL;
> >> -            qemu_mutex_unlock(&param->mutex);
> >> -
> >> -            zero_page = do_compress_ram_page(param->file, &param->stream,
> >> -                                             block, offset, param->originbuf);
> >> -
> >> -            qemu_mutex_lock(&comp_done_lock);
> >> -            param->done = true;
> >> -            param->zero_page = zero_page;
> >> -            qemu_cond_signal(&comp_done_cond);
> >> -            qemu_mutex_unlock(&comp_done_lock);
> >> -
> >> -            qemu_mutex_lock(&param->mutex);
> >> -        } else {
> >> -            qemu_cond_wait(&param->cond, &param->mutex);
> >> -        }
> >> -    }
> >> -    qemu_mutex_unlock(&param->mutex);
> >> -
> >> -    return NULL;
> >> -}
> >> -
> >> -static void compress_threads_save_cleanup(void)
> >> -{
> >> -    int i, thread_count;
> >> -
> >> -    if (!migrate_use_compression() || !comp_param) {
> >> -        return;
> >> -    }
> >> -
> >> -    thread_count = migrate_compress_threads();
> >> -    for (i = 0; i < thread_count; i++) {
> >> -        /*
> >> -         * we use it as a indicator which shows if the thread is
> >> -         * properly init'd or not
> >> -         */
> >> -        if (!comp_param[i].file) {
> >> -            break;
> >> -        }
> >> -
> >> -        qemu_mutex_lock(&comp_param[i].mutex);
> >> -        comp_param[i].quit = true;
> >> -        qemu_cond_signal(&comp_param[i].cond);
> >> -        qemu_mutex_unlock(&comp_param[i].mutex);
> >> -
> >> -        qemu_thread_join(compress_threads + i);
> >> -        qemu_mutex_destroy(&comp_param[i].mutex);
> >> -        qemu_cond_destroy(&comp_param[i].cond);
> >> -        deflateEnd(&comp_param[i].stream);
> >> -        g_free(comp_param[i].originbuf);
> >> -        qemu_fclose(comp_param[i].file);
> >> -        comp_param[i].file = NULL;
> >> -    }
> >> -    qemu_mutex_destroy(&comp_done_lock);
> >> -    qemu_cond_destroy(&comp_done_cond);
> >> -    g_free(compress_threads);
> >> -    g_free(comp_param);
> >> -    compress_threads = NULL;
> >> -    comp_param = NULL;
> >> -}
> >> -
> >> -static int compress_threads_save_setup(void)
> >> -{
> >> -    int i, thread_count;
> >> -
> >> -    if (!migrate_use_compression()) {
> >> -        return 0;
> >> -    }
> >> -    thread_count = migrate_compress_threads();
> >> -    compress_threads = g_new0(QemuThread, thread_count);
> >> -    comp_param = g_new0(CompressParam, thread_count);
> >> -    qemu_cond_init(&comp_done_cond);
> >> -    qemu_mutex_init(&comp_done_lock);
> >> -    for (i = 0; i < thread_count; i++) {
> >> -        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> >> -        if (!comp_param[i].originbuf) {
> >> -            goto exit;
> >> -        }
> >> -
> >> -        if (deflateInit(&comp_param[i].stream,
> >> -                        migrate_compress_level()) != Z_OK) {
> >> -            g_free(comp_param[i].originbuf);
> >> -            goto exit;
> >> -        }
> >> -
> >> -        /* comp_param[i].file is just used as a dummy buffer to save data,
> >> -         * set its ops to empty.
> >> -         */
> >> -        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> >> -        comp_param[i].done = true;
> >> -        comp_param[i].quit = false;
> >> -        qemu_mutex_init(&comp_param[i].mutex);
> >> -        qemu_cond_init(&comp_param[i].cond);
> >> -        qemu_thread_create(compress_threads + i, "compress",
> >> -                           do_data_compress, comp_param + i,
> >> -                           QEMU_THREAD_JOINABLE);
> >> -    }
> >> -    return 0;
> >> -
> >> -exit:
> >> -    compress_threads_save_cleanup();
> >> -    return -1;
> >> -}
> >> -
> >>  /* Multiple fd's */
> >>  
> >>  #define MULTIFD_MAGIC 0x11223344U
> >> @@ -1909,12 +1766,25 @@ exit:
> >>      return zero_page;
> >>  }
> >>  
> >> +struct CompressData {
> >> +    /* filled by migration thread.*/
> >> +    RAMBlock *block;
> >> +    ram_addr_t offset;
> >> +
> >> +    /* filled by compress thread. */
> >> +    QEMUFile *file;
> >> +    z_stream stream;
> >> +    uint8_t *originbuf;
> >> +    bool zero_page;
> >> +};
> >> +typedef struct CompressData CompressData;
> >> +
> >>  static void
> >> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> >> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
> > 
> > Keep the const?
> >>  {
> >>      ram_counters.transferred += bytes_xmit;
> >>  
> >> -    if (param->zero_page) {
> >> +    if (cd->zero_page) {
> >>          ram_counters.duplicate++;
> >>          return;
> >>      }
> >> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> >>      compression_counters.pages++;
> >>  }
> >>  
> >> +static int compress_thread_data_init(void *request)
> >> +{
> >> +    CompressData *cd = request;
> >> +
> >> +    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> >> +    if (!cd->originbuf) {
> >> +        return -1;
> >> +    }
> >> +
> >> +    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
> >> +        g_free(cd->originbuf);
> >> +        return -1;
> >> +    }
> > 
> > Please print errors if you fail in any case so we can easily tell what
> > happened.
> > 
> >> +    cd->file = qemu_fopen_ops(NULL, &empty_ops);
> >> +    return 0;
> >> +}
> >> +
> >> +static void compress_thread_data_fini(void *request)
> >> +{
> >> +    CompressData *cd = request;
> >> +
> >> +    qemu_fclose(cd->file);
> >> +    deflateEnd(&cd->stream);
> >> +    g_free(cd->originbuf);
> >> +}
> >> +
> >> +static void compress_thread_data_handler(void *request)
> >> +{
> >> +    CompressData *cd = request;
> >> +
> >> +    /*
> >> +     * if compression fails, it will be indicated by
> >> +     * migrate_get_current()->to_dst_file.
> >> +     */
> >> +    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
> >> +                                         cd->offset, cd->originbuf);
> >> +}
> >> +
> >> +static void compress_thread_data_done(void *request)
> >> +{
> >> +    CompressData *cd = request;
> >> +    RAMState *rs = ram_state;
> >> +    int bytes_xmit;
> >> +
> >> +    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
> >> +    update_compress_thread_counts(cd, bytes_xmit);
> >> +}
> >> +
> >> +static const ThreadedWorkqueueOps compress_ops = {
> >> +    .thread_request_init = compress_thread_data_init,
> >> +    .thread_request_uninit = compress_thread_data_fini,
> >> +    .thread_request_handler = compress_thread_data_handler,
> >> +    .thread_request_done = compress_thread_data_done,
> >> +    .request_size = sizeof(CompressData),
> >> +};
> >> +
> >> +static Threads *compress_threads;
> >> +
> >>  static bool save_page_use_compression(RAMState *rs);
> >>  
> >>  static void flush_compressed_data(RAMState *rs)
> >>  {
> >> -    int idx, len, thread_count;
> >> -
> >>      if (!save_page_use_compression(rs)) {
> >>          return;
> >>      }
> >> -    thread_count = migrate_compress_threads();
> >>  
> >> -    qemu_mutex_lock(&comp_done_lock);
> >> -    for (idx = 0; idx < thread_count; idx++) {
> >> -        while (!comp_param[idx].done) {
> >> -            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> >> -        }
> >> -    }
> >> -    qemu_mutex_unlock(&comp_done_lock);
> >> +    threaded_workqueue_wait_for_requests(compress_threads);
> >> +}
> >>  
> >> -    for (idx = 0; idx < thread_count; idx++) {
> >> -        qemu_mutex_lock(&comp_param[idx].mutex);
> >> -        if (!comp_param[idx].quit) {
> >> -            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> >> -            /*
> >> -             * it's safe to fetch zero_page without holding comp_done_lock
> >> -             * as there is no further request submitted to the thread,
> >> -             * i.e, the thread should be waiting for a request at this point.
> >> -             */
> >> -            update_compress_thread_counts(&comp_param[idx], len);
> >> -        }
> >> -        qemu_mutex_unlock(&comp_param[idx].mutex);
> >> +static void compress_threads_save_cleanup(void)
> >> +{
> >> +    if (!compress_threads) {
> >> +        return;
> >>      }
> >> +
> >> +    threaded_workqueue_destroy(compress_threads);
> >> +    compress_threads = NULL;
> >>  }
> >>  
> >> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> >> -                                       ram_addr_t offset)
> >> +static int compress_threads_save_setup(void)
> >>  {
> >> -    param->block = block;
> >> -    param->offset = offset;
> >> +    if (!migrate_use_compression()) {
> >> +        return 0;
> >> +    }
> >> +
> >> +    compress_threads = threaded_workqueue_create("compress",
> >> +                                migrate_compress_threads(),
> >> +                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
> >> +    return compress_threads ? 0 : -1;
> >>  }
> >>  
> >>  static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
> >>                                             ram_addr_t offset)
> >>  {
> >> -    int idx, thread_count, bytes_xmit = -1, pages = -1;
> >> +    CompressData *cd;
> >>      bool wait = migrate_compress_wait_thread();
> >>  
> >> -    thread_count = migrate_compress_threads();
> >> -    qemu_mutex_lock(&comp_done_lock);
> >>  retry:
> >> -    for (idx = 0; idx < thread_count; idx++) {
> >> -        if (comp_param[idx].done) {
> >> -            comp_param[idx].done = false;
> >> -            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> >> -            qemu_mutex_lock(&comp_param[idx].mutex);
> >> -            set_compress_params(&comp_param[idx], block, offset);
> >> -            qemu_cond_signal(&comp_param[idx].cond);
> >> -            qemu_mutex_unlock(&comp_param[idx].mutex);
> >> -            pages = 1;
> >> -            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
> >> -            break;
> >> +    cd = threaded_workqueue_get_request(compress_threads);
> >> +    if (!cd) {
> >> +        /*
> >> +         * wait for the free thread if the user specifies
> >> +         * 'compress-wait-thread', otherwise we will post
> >> +         *  the page out in the main thread as normal page.
> >> +         */
> >> +        if (wait) {
> >> +            cpu_relax();
> >> +            goto retry;
> > 
> > Is there nothing better we can use to wait without eating CPU time?
> 
> There is a mechanism to wait without eating CPU time in the data
> structure, but it makes sense to busy wait.  There are 4 threads in the
> workqueue, so you have to compare 1/4th of the time spent compressing a
> page, with the trip into the kernel to wake you up.  You're adding 20%
> CPU usage, but I'm not surprised it's worthwhile.

Hmm OK; in that case it does at least need a comment because it's a bit
odd, and we should watch out how that scales - I guess it's less of
an overhead the more threads you use.

Dave

> Paolo
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Xiao Guangrong Nov. 26, 2018, 8 a.m. UTC | #4
On 11/24/18 2:29 AM, Dr. David Alan Gilbert wrote:

>>>>   static void
>>>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>>>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
>>>
>>> Keep the const?

Yes, indeed. Will correct it in the next version.

>>>> +    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
>>>> +        g_free(cd->originbuf);
>>>> +        return -1;
>>>> +    }
>>>
>>> Please print errors if you fail in any case so we can easily tell what
>>> happened.

Sure, will do.

>>>> +        if (wait) {
>>>> +            cpu_relax();
>>>> +            goto retry;
>>>
>>> Is there nothing better we can use to wait without eating CPU time?
>>
>> There is a mechanism to wait without eating CPU time in the data
>> structure, but it makes sense to busy wait.  There are 4 threads in the
>> workqueue, so you have to compare 1/4th of the time spent compressing a
>> page, with the trip into the kernel to wake you up.  You're adding 20%
>> CPU usage, but I'm not surprised it's worthwhile.
> 
> Hmm OK; in that case it does at least need a comment because it's a bit
> odd, and we should watch out how that scales - I guess it's less of
> an overhead the more threads you use.
> 

Sure, will add some comments to explain the purpose.
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 7e7deec4d8..254c08f27b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@ 
 #include "qemu/uuid.h"
 #include "savevm.h"
 #include "qemu/iov.h"
+#include "qemu/threaded-workqueue.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -349,22 +350,6 @@  typedef struct PageSearchStatus PageSearchStatus;
 
 CompressionStats compression_counters;
 
-struct CompressParam {
-    bool done;
-    bool quit;
-    bool zero_page;
-    QEMUFile *file;
-    QemuMutex mutex;
-    QemuCond cond;
-    RAMBlock *block;
-    ram_addr_t offset;
-
-    /* internally used fields */
-    z_stream stream;
-    uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
 struct DecompressParam {
     bool done;
     bool quit;
@@ -377,15 +362,6 @@  struct DecompressParam {
 };
 typedef struct DecompressParam DecompressParam;
 
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
 static const QEMUFileOps empty_ops = { };
 
 static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@  static QemuThread *decompress_threads;
 static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                 ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
-    CompressParam *param = opaque;
-    RAMBlock *block;
-    ram_addr_t offset;
-    bool zero_page;
-
-    qemu_mutex_lock(&param->mutex);
-    while (!param->quit) {
-        if (param->block) {
-            block = param->block;
-            offset = param->offset;
-            param->block = NULL;
-            qemu_mutex_unlock(&param->mutex);
-
-            zero_page = do_compress_ram_page(param->file, &param->stream,
-                                             block, offset, param->originbuf);
-
-            qemu_mutex_lock(&comp_done_lock);
-            param->done = true;
-            param->zero_page = zero_page;
-            qemu_cond_signal(&comp_done_cond);
-            qemu_mutex_unlock(&comp_done_lock);
-
-            qemu_mutex_lock(&param->mutex);
-        } else {
-            qemu_cond_wait(&param->cond, &param->mutex);
-        }
-    }
-    qemu_mutex_unlock(&param->mutex);
-
-    return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression() || !comp_param) {
-        return;
-    }
-
-    thread_count = migrate_compress_threads();
-    for (i = 0; i < thread_count; i++) {
-        /*
-         * we use it as a indicator which shows if the thread is
-         * properly init'd or not
-         */
-        if (!comp_param[i].file) {
-            break;
-        }
-
-        qemu_mutex_lock(&comp_param[i].mutex);
-        comp_param[i].quit = true;
-        qemu_cond_signal(&comp_param[i].cond);
-        qemu_mutex_unlock(&comp_param[i].mutex);
-
-        qemu_thread_join(compress_threads + i);
-        qemu_mutex_destroy(&comp_param[i].mutex);
-        qemu_cond_destroy(&comp_param[i].cond);
-        deflateEnd(&comp_param[i].stream);
-        g_free(comp_param[i].originbuf);
-        qemu_fclose(comp_param[i].file);
-        comp_param[i].file = NULL;
-    }
-    qemu_mutex_destroy(&comp_done_lock);
-    qemu_cond_destroy(&comp_done_cond);
-    g_free(compress_threads);
-    g_free(comp_param);
-    compress_threads = NULL;
-    comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return 0;
-    }
-    thread_count = migrate_compress_threads();
-    compress_threads = g_new0(QemuThread, thread_count);
-    comp_param = g_new0(CompressParam, thread_count);
-    qemu_cond_init(&comp_done_cond);
-    qemu_mutex_init(&comp_done_lock);
-    for (i = 0; i < thread_count; i++) {
-        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
-        if (!comp_param[i].originbuf) {
-            goto exit;
-        }
-
-        if (deflateInit(&comp_param[i].stream,
-                        migrate_compress_level()) != Z_OK) {
-            g_free(comp_param[i].originbuf);
-            goto exit;
-        }
-
-        /* comp_param[i].file is just used as a dummy buffer to save data,
-         * set its ops to empty.
-         */
-        comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
-        comp_param[i].done = true;
-        comp_param[i].quit = false;
-        qemu_mutex_init(&comp_param[i].mutex);
-        qemu_cond_init(&comp_param[i].cond);
-        qemu_thread_create(compress_threads + i, "compress",
-                           do_data_compress, comp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-    return 0;
-
-exit:
-    compress_threads_save_cleanup();
-    return -1;
-}
-
 /* Multiple fd's */
 
 #define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,25 @@  exit:
     return zero_page;
 }
 
+struct CompressData {
+    /* filled by migration thread.*/
+    RAMBlock *block;
+    ram_addr_t offset;
+
+    /* filled by compress thread. */
+    QEMUFile *file;
+    z_stream stream;
+    uint8_t *originbuf;
+    bool zero_page;
+};
+typedef struct CompressData CompressData;
+
 static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
 {
     ram_counters.transferred += bytes_xmit;
 
-    if (param->zero_page) {
+    if (cd->zero_page) {
         ram_counters.duplicate++;
         return;
     }
@@ -1924,81 +1794,123 @@  update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
     compression_counters.pages++;
 }
 
+static int compress_thread_data_init(void *request)
+{
+    CompressData *cd = request;
+
+    cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+    if (!cd->originbuf) {
+        return -1;
+    }
+
+    if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+        g_free(cd->originbuf);
+        return -1;
+    }
+
+    cd->file = qemu_fopen_ops(NULL, &empty_ops);
+    return 0;
+}
+
+static void compress_thread_data_fini(void *request)
+{
+    CompressData *cd = request;
+
+    qemu_fclose(cd->file);
+    deflateEnd(&cd->stream);
+    g_free(cd->originbuf);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+    CompressData *cd = request;
+
+    /*
+     * if compression fails, it will be indicated by
+     * migrate_get_current()->to_dst_file.
+     */
+    cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+                                         cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(void *request)
+{
+    CompressData *cd = request;
+    RAMState *rs = ram_state;
+    int bytes_xmit;
+
+    bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+    update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static const ThreadedWorkqueueOps compress_ops = {
+    .thread_request_init = compress_thread_data_init,
+    .thread_request_uninit = compress_thread_data_fini,
+    .thread_request_handler = compress_thread_data_handler,
+    .thread_request_done = compress_thread_data_done,
+    .request_size = sizeof(CompressData),
+};
+
+static Threads *compress_threads;
+
 static bool save_page_use_compression(RAMState *rs);
 
 static void flush_compressed_data(RAMState *rs)
 {
-    int idx, len, thread_count;
-
     if (!save_page_use_compression(rs)) {
         return;
     }
-    thread_count = migrate_compress_threads();
 
-    qemu_mutex_lock(&comp_done_lock);
-    for (idx = 0; idx < thread_count; idx++) {
-        while (!comp_param[idx].done) {
-            qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        }
-    }
-    qemu_mutex_unlock(&comp_done_lock);
+    threaded_workqueue_wait_for_requests(compress_threads);
+}
 
-    for (idx = 0; idx < thread_count; idx++) {
-        qemu_mutex_lock(&comp_param[idx].mutex);
-        if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            /*
-             * it's safe to fetch zero_page without holding comp_done_lock
-             * as there is no further request submitted to the thread,
-             * i.e, the thread should be waiting for a request at this point.
-             */
-            update_compress_thread_counts(&comp_param[idx], len);
-        }
-        qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+    if (!compress_threads) {
+        return;
     }
+
+    threaded_workqueue_destroy(compress_threads);
+    compress_threads = NULL;
 }
 
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
-                                       ram_addr_t offset)
+static int compress_threads_save_setup(void)
 {
-    param->block = block;
-    param->offset = offset;
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    compress_threads = threaded_workqueue_create("compress",
+                                migrate_compress_threads(),
+                                DEFAULT_THREAD_REQUEST_NR, &compress_ops);
+    return compress_threads ? 0 : -1;
 }
 
 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
                                            ram_addr_t offset)
 {
-    int idx, thread_count, bytes_xmit = -1, pages = -1;
+    CompressData *cd;
     bool wait = migrate_compress_wait_thread();
 
-    thread_count = migrate_compress_threads();
-    qemu_mutex_lock(&comp_done_lock);
 retry:
-    for (idx = 0; idx < thread_count; idx++) {
-        if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
-            pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
-            break;
+    cd = threaded_workqueue_get_request(compress_threads);
+    if (!cd) {
+        /*
+         * wait for the free thread if the user specifies
+         * 'compress-wait-thread', otherwise we will post
+         *  the page out in the main thread as normal page.
+         */
+        if (wait) {
+            cpu_relax();
+            goto retry;
         }
-    }
 
-    /*
-     * wait for the free thread if the user specifies 'compress-wait-thread',
-     * otherwise we will post the page out in the main thread as normal page.
-     */
-    if (pages < 0 && wait) {
-        qemu_cond_wait(&comp_done_cond, &comp_done_lock);
-        goto retry;
-    }
-    qemu_mutex_unlock(&comp_done_lock);
-
-    return pages;
+        return -1;
+     }
+    cd->block = block;
+    cd->offset = offset;
+    threaded_workqueue_submit_request(compress_threads, cd);
+    return 1;
 }
 
 /**