From patchwork Sun Apr 2 17:56:28 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Lukas Straub X-Patchwork-Id: 13197509 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from lists.gnu.org (lists.gnu.org [209.51.188.17]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.lore.kernel.org (Postfix) with ESMTPS id 2BFDAC7619A for ; Sun, 2 Apr 2023 17:57:41 +0000 (UTC) Received: from localhost ([::1] helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1pj1wW-0006nS-Cu; Sun, 02 Apr 2023 13:56:40 -0400 Received: from eggs.gnu.org ([2001:470:142:3::10]) by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1pj1wS-0006mh-ML for qemu-devel@nongnu.org; Sun, 02 Apr 2023 13:56:37 -0400 Received: from mout.web.de ([212.227.17.11]) by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1pj1wQ-00043o-0W for qemu-devel@nongnu.org; Sun, 02 Apr 2023 13:56:36 -0400 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=web.de; s=s29768273; t=1680458190; i=lukasstraub2@web.de; bh=XKpglTA6owmd537ugJcEj92M/s2rh7Eom5QGAWKbQ6o=; h=X-UI-Sender-Class:Date:From:To:Cc:Subject:In-Reply-To:References; b=Ktt7/GDehJXbd9v08QBdxo5546p0QqDcQuvFyB7O8OTHUH5UdoizQ0rGaM61RoMei VFvRSVA2d7VtK7Qw5Ls/s/MOGRPB5xmF8/OXwikxympYcr72l23ccFBS+SrJFltkJn y9KA/+6V9Ea8W663QP/hRgV0LnAMxYK7V599bPRnayicbbUba6lGKKqCtZa+CEUv1o WCUQH12wwurQ4/fg7G9UkK9o7rw1wdmzAucCkbVU4hrhUWUy9UPpm00BHNBFlQQVpg RpE4JRQT8EwkWAQthx/pazyADsjliP1OdUpEOyqxWNKhe9vr6uE3nD65fjTXIGHSTv F7ZLulQfB37oA== X-UI-Sender-Class: 814a7b36-bfc1-4dae-8640-3722d8ec6cd6 Received: from gecko.fritz.box ([82.207.254.111]) by smtp.web.de (mrweb106 [213.165.67.124]) with ESMTPSA (Nemesis) id 1MKuKF-1pzCHA3u6H-00KueF; Sun, 02 Apr 2023 19:56:30 +0200 Date: Sun, 2 Apr 2023 17:56:28 +0000 From: Lukas Straub To: qemu-devel Cc: "Dr. David Alan Gilbert" , Juan Quintela , Peter Xu Subject: [PATCH 08/14] ram.c: Move core compression code into its own file Message-ID: <15e26fb46da7826061fd47b0d58943f3279c149b.1680457764.git.lukasstraub2@web.de> In-Reply-To: References: MIME-Version: 1.0 X-Provags-ID: V03:K1:NSFHLn3l3BwlqSvzuU5igoVunj6T+a9dA9+bD5f0Q6kyTHQjybJ V1FuWM1KEMikFjLRZ7nPQkmldTIZxGnQJ3SWOpRFwv6kCvJBlI5Z3o4zVPZWKyPShAMnTK9 h5RPPdRK6St9KRB+HbgWX87s20w38DZPMUvy86ZQZ7PF9iYOvqapkXxj7CU33gmiCC092QK LvXe8rnLNH60Be3+ideTg== UI-OutboundReport: notjunk:1;M01:P0:V4lC6v6sKMM=;oQHthH925q7pzzI8LoaYWmPFUIg bUTfnciLHDoiFwJeL+hll524BCcVNCmIPlIcdb5Y+v9BI7tnrkRfmBmAWZ1puI+m3I472fStl ALyEJ06TVlmYJWBcPnE/L8uj4hhqfwLKrm2nUp+nNVt/WuGclPU1UnJ0WqMrHFnNhr1XoyHHI cbV/IS0Wv28zP6vgUL4DTpZUb1JGzBwOMfzvytL8Yw6jpjfheyVt8OaWPFftYcJtvuS1VYp58 BWw1yNs9zm50Bfzk9zUX3unwZWghsr0fCgy9PkBKxZF7deeaJmG8jFwwz+Fv7zUgXKQzQG3jF phOU3xgqtqOqssTX1NPd/5/TygPn0VX1iaJmvuM7aqnR/HfFTYShUEgFje51mHMhb5KLUTZW9 szp5ixZzn2lhYVTBN8kusv+mYLiDwzLVdjK0akyFtLjm5cX2gZJUNwIVxOoc4Jj62WEk5iKLi 4sRmiqsYeslFuiEL67Lxgr0iPIZz499R5KxWbov3hsScjQfMRhely96Vd2HaSx8WrFoJVvwhX qZwHevCmSLpJlDBqOHh9ABm6Ii2G8p1UI1+mHWzldiY2/Dlmy0RUamJtrWWeA7dIoXUzJ2Vj1 jNQLa572c55qt/WsrYyLBxHFZcmkTCE0jnOiI4VuEVMlLUraBR6hshx4ZJTAXhQLg3IjABTtr exdwff1p5+6wTP8vBhOfSUI6sivnP/M93wZOkkEMKAp6/HFRmIbRFFvpjzaJ2G9AuNsd0Zlzp zXJG7y9sgC7x2sVKpXDQuU6QHiBGD0OxKDrCg3gomQ9NTBPY70Kv3uQcPJ6lA2ff+rxJOfm1B ksiZj4+oIAOoQHM0aY625Zs2b9N92MBjYPWgmKlQGwqpCOsK5feTs++5jVthpPyFvHpaU3yyj U1Ad6MRSTQvnYCo1Ke0vTTDSq6PLp+uEF6Q4UJdAiABYJWvXs08GMAs9ifEsr65wFOElB/tR7 C6wytUt/PuKBOvedxZH/K3s63wo= Received-SPF: pass client-ip=212.227.17.11; envelope-from=lukasstraub2@web.de; helo=mout.web.de X-Spam_score_int: -18 X-Spam_score: -1.9 X-Spam_bar: - X-Spam_report: (-1.9 / 5.0 requ) BAYES_00=-1.9, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, FREEMAIL_FROM=0.001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_HELO_NONE=0.001, SPF_PASS=-0.001 autolearn=ham autolearn_force=no X-Spam_action: no action X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+qemu-devel=archiver.kernel.org@nongnu.org Sender: qemu-devel-bounces+qemu-devel=archiver.kernel.org@nongnu.org No functional changes intended. Signed-off-by: Lukas Straub --- migration/meson.build | 5 +- migration/ram-compress.c | 273 +++++++++++++++++++++++++++++++++++++++ migration/ram-compress.h | 65 ++++++++++ migration/ram.c | 255 +----------------------------------- 4 files changed, 343 insertions(+), 255 deletions(-) create mode 100644 migration/ram-compress.c create mode 100644 migration/ram-compress.h -- 2.30.2 diff --git a/migration/meson.build b/migration/meson.build index 0d1bb9f96e..262e3c9754 100644 --- a/migration/meson.build +++ b/migration/meson.build @@ -36,4 +36,7 @@ endif softmmu_ss.add(when: zstd, if_true: files('multifd-zstd.c')) specific_ss.add(when: 'CONFIG_SOFTMMU', - if_true: files('dirtyrate.c', 'ram.c', 'target.c')) + if_true: files('dirtyrate.c', + 'ram.c', + 'ram-compress.c', + 'target.c')) diff --git a/migration/ram-compress.c b/migration/ram-compress.c new file mode 100644 index 0000000000..77902a1d65 --- /dev/null +++ b/migration/ram-compress.c @@ -0,0 +1,273 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2011-2015 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include "qemu/osdep.h" +#include "qemu/cutils.h" + +#include "ram-compress.h" + +#include "qemu/error-report.h" +#include "migration.h" +#include "io/channel-null.h" +#include "exec/ram_addr.h" + +CompressionStats compression_counters; + +static CompressParam *comp_param; +static QemuThread *compress_threads; +/* comp_done_cond is used to wake up the migration thread when + * one of the compression threads has finished the compression. + * comp_done_lock is used to co-work with comp_done_cond. + */ +static QemuMutex comp_done_lock; +static QemuCond comp_done_cond; + +static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, + RAMBlock *block, ram_addr_t offset, + uint8_t *source_buf); + +static void *do_data_compress(void *opaque) +{ + CompressParam *param = opaque; + RAMBlock *block; + ram_addr_t offset; + CompressResult result; + + qemu_mutex_lock(¶m->mutex); + while (!param->quit) { + if (param->trigger) { + block = param->block; + offset = param->offset; + param->trigger = false; + qemu_mutex_unlock(¶m->mutex); + + result = do_compress_ram_page(param->file, ¶m->stream, + block, offset, param->originbuf); + + qemu_mutex_lock(&comp_done_lock); + param->done = true; + param->result = result; + qemu_cond_signal(&comp_done_cond); + qemu_mutex_unlock(&comp_done_lock); + + qemu_mutex_lock(¶m->mutex); + } else { + qemu_cond_wait(¶m->cond, ¶m->mutex); + } + } + qemu_mutex_unlock(¶m->mutex); + + return NULL; +} + +void compress_threads_save_cleanup(void) +{ + int i, thread_count; + + if (!migrate_use_compression() || !comp_param) { + return; + } + + thread_count = migrate_compress_threads(); + for (i = 0; i < thread_count; i++) { + /* + * we use it as a indicator which shows if the thread is + * properly init'd or not + */ + if (!comp_param[i].file) { + break; + } + + qemu_mutex_lock(&comp_param[i].mutex); + comp_param[i].quit = true; + qemu_cond_signal(&comp_param[i].cond); + qemu_mutex_unlock(&comp_param[i].mutex); + + qemu_thread_join(compress_threads + i); + qemu_mutex_destroy(&comp_param[i].mutex); + qemu_cond_destroy(&comp_param[i].cond); + deflateEnd(&comp_param[i].stream); + g_free(comp_param[i].originbuf); + qemu_fclose(comp_param[i].file); + comp_param[i].file = NULL; + } + qemu_mutex_destroy(&comp_done_lock); + qemu_cond_destroy(&comp_done_cond); + g_free(compress_threads); + g_free(comp_param); + compress_threads = NULL; + comp_param = NULL; +} + +int compress_threads_save_setup(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return 0; + } + thread_count = migrate_compress_threads(); + compress_threads = g_new0(QemuThread, thread_count); + comp_param = g_new0(CompressParam, thread_count); + qemu_cond_init(&comp_done_cond); + qemu_mutex_init(&comp_done_lock); + for (i = 0; i < thread_count; i++) { + comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!comp_param[i].originbuf) { + goto exit; + } + + if (deflateInit(&comp_param[i].stream, + migrate_compress_level()) != Z_OK) { + g_free(comp_param[i].originbuf); + goto exit; + } + + /* comp_param[i].file is just used as a dummy buffer to save data, + * set its ops to empty. + */ + comp_param[i].file = qemu_file_new_output( + QIO_CHANNEL(qio_channel_null_new())); + comp_param[i].done = true; + comp_param[i].quit = false; + qemu_mutex_init(&comp_param[i].mutex); + qemu_cond_init(&comp_param[i].cond); + qemu_thread_create(compress_threads + i, "compress", + do_data_compress, comp_param + i, + QEMU_THREAD_JOINABLE); + } + return 0; + +exit: + compress_threads_save_cleanup(); + return -1; +} + +static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, + RAMBlock *block, ram_addr_t offset, + uint8_t *source_buf) +{ + uint8_t *p = block->host + offset; + int ret; + + if (buffer_is_zero(p, TARGET_PAGE_SIZE)) { + return RES_ZEROPAGE; + } + + /* + * copy it to a internal buffer to avoid it being modified by VM + * so that we can catch up the error during compression and + * decompression + */ + memcpy(source_buf, p, TARGET_PAGE_SIZE); + ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); + if (ret < 0) { + qemu_file_set_error(migrate_get_current()->to_dst_file, ret); + error_report("compressed data failed!"); + return RES_NONE; + } + return RES_COMPRESS; +} + +static inline void compress_reset_result(CompressParam *param) +{ + param->result = RES_NONE; + param->block = NULL; + param->offset = 0; +} + +void flush_compressed_data(int (send_queued_data(CompressParam *))) +{ + int idx, thread_count; + + thread_count = migrate_compress_threads(); + + qemu_mutex_lock(&comp_done_lock); + for (idx = 0; idx < thread_count; idx++) { + while (!comp_param[idx].done) { + qemu_cond_wait(&comp_done_cond, &comp_done_lock); + } + } + qemu_mutex_unlock(&comp_done_lock); + + for (idx = 0; idx < thread_count; idx++) { + qemu_mutex_lock(&comp_param[idx].mutex); + if (!comp_param[idx].quit) { + CompressParam *param = &comp_param[idx]; + send_queued_data(param); + compress_reset_result(param); + } + qemu_mutex_unlock(&comp_param[idx].mutex); + } +} + +static inline void set_compress_params(CompressParam *param, RAMBlock *block, + ram_addr_t offset) +{ + param->block = block; + param->offset = offset; + param->trigger = true; +} + +int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, + int (send_queued_data(CompressParam *))) +{ + int idx, thread_count, pages = -1; + bool wait = migrate_compress_wait_thread(); + + thread_count = migrate_compress_threads(); + qemu_mutex_lock(&comp_done_lock); +retry: + for (idx = 0; idx < thread_count; idx++) { + if (comp_param[idx].done) { + CompressParam *param = &comp_param[idx]; + qemu_mutex_lock(¶m->mutex); + param->done = false; + send_queued_data(param); + compress_reset_result(param); + set_compress_params(param, block, offset); + + qemu_cond_signal(¶m->cond); + qemu_mutex_unlock(¶m->mutex); + pages = 1; + break; + } + } + + /* + * wait for the free thread if the user specifies 'compress-wait-thread', + * otherwise we will post the page out in the main thread as normal page. + */ + if (pages < 0 && wait) { + qemu_cond_wait(&comp_done_cond, &comp_done_lock); + goto retry; + } + qemu_mutex_unlock(&comp_done_lock); + + return pages; +} diff --git a/migration/ram-compress.h b/migration/ram-compress.h new file mode 100644 index 0000000000..06570a799c --- /dev/null +++ b/migration/ram-compress.h @@ -0,0 +1,65 @@ +/* + * QEMU System Emulator + * + * Copyright (c) 2003-2008 Fabrice Bellard + * Copyright (c) 2011-2015 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef QEMU_MIGRATION_COMPRESS_H +#define QEMU_MIGRATION_COMPRESS_H + +#include "qemu-file.h" + +enum CompressResult { + RES_NONE = 0, + RES_ZEROPAGE = 1, + RES_COMPRESS = 2 +}; +typedef enum CompressResult CompressResult; + +struct CompressParam { + bool done; + bool quit; + bool trigger; + CompressResult result; + QEMUFile *file; + QemuMutex mutex; + QemuCond cond; + RAMBlock *block; + ram_addr_t offset; + + /* internally used fields */ + z_stream stream; + uint8_t *originbuf; +}; +typedef struct CompressParam CompressParam; + +void compress_threads_save_cleanup(void); +int compress_threads_save_setup(void); + +void flush_compressed_data(int (send_queued_data(CompressParam *))); +int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, + int (send_queued_data(CompressParam *))); + +#endif diff --git a/migration/ram.c b/migration/ram.c index 475c04a18b..114901241e 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -34,6 +34,7 @@ #include "qemu/main-loop.h" #include "io/channel-null.h" #include "xbzrle.h" +#include "ram-compress.h" #include "ram.h" #include "migration.h" #include "migration/register.h" @@ -491,32 +492,6 @@ typedef struct MigrationOps MigrationOps; MigrationOps *migration_ops; -CompressionStats compression_counters; - -enum CompressResult { - RES_NONE = 0, - RES_ZEROPAGE = 1, - RES_COMPRESS = 2 -}; -typedef enum CompressResult CompressResult; - -struct CompressParam { - bool done; - bool quit; - bool trigger; - CompressResult result; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -529,15 +504,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; - static QEMUFile *decomp_file; static DecompressParam *decomp_param; static QemuThread *decompress_threads; @@ -546,10 +512,6 @@ static QemuCond decomp_done_cond; static int ram_save_host_page_urgent(PageSearchStatus *pss); -static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, - RAMBlock *block, ram_addr_t offset, - uint8_t *source_buf); - /* NOTE: page is the PFN not real ram_addr_t. */ static void pss_init(PageSearchStatus *pss, RAMBlock *rb, ram_addr_t page) { @@ -568,39 +530,7 @@ static bool pss_overlap(PageSearchStatus *pss1, PageSearchStatus *pss2) (pss1->host_page_start == pss2->host_page_start); } -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - CompressResult result; - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->trigger) { - block = param->block; - offset = param->offset; - param->trigger = false; - qemu_mutex_unlock(¶m->mutex); - - result = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->result = result; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} @@ -608,44 +538,7 @@ static void *do_data_compress(void *opaque) -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} @@ -653,49 +546,7 @@ static void compress_threads_save_cleanup(void) -static int compress_threads_save_setup(void) -{ - int i, thread_count; - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_file_new_output( - QIO_CHANNEL(qio_channel_null_new())); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} /** * save_page_header: write page header to wire @@ -1484,32 +1335,6 @@ static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block, return 1; } -static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, - RAMBlock *block, ram_addr_t offset, - uint8_t *source_buf) -{ - uint8_t *p = block->host + offset; - int ret; - - if (buffer_is_zero(p, TARGET_PAGE_SIZE)) { - return RES_ZEROPAGE; - } - - /* - * copy it to a internal buffer to avoid it being modified by VM - * so that we can catch up the error during compression and - * decompression - */ - memcpy(source_buf, p, TARGET_PAGE_SIZE); - ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE); - if (ret < 0) { - qemu_file_set_error(migrate_get_current()->to_dst_file, ret); - error_report("compressed data failed!"); - return RES_NONE; - } - return RES_COMPRESS; -} - static void update_compress_thread_counts(const CompressParam *param, int bytes_xmit) { @@ -1527,13 +1352,6 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) static bool save_page_use_compression(RAMState *rs); -static inline void compress_reset_result(CompressParam *param) -{ - param->result = RES_NONE; - param->block = NULL; - param->offset = 0; -} - static int send_queued_data(CompressParam *param) { PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY]; @@ -1568,31 +1386,6 @@ static int send_queued_data(CompressParam *param) return len; } -static void flush_compressed_data(int (send_queued_data(CompressParam *))) -{ - int idx, thread_count; - - thread_count = migrate_compress_threads(); - - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); - - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - CompressParam *param = &comp_param[idx]; - send_queued_data(param); - compress_reset_result(param); - } - qemu_mutex_unlock(&comp_param[idx].mutex); - } -} - static void ram_flush_compressed_data(RAMState *rs) { if (!save_page_use_compression(rs)) { @@ -1602,52 +1395,6 @@ static void ram_flush_compressed_data(RAMState *rs) flush_compressed_data(send_queued_data); } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) -{ - param->block = block; - param->offset = offset; - param->trigger = true; -} - -static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, - int (send_queued_data(CompressParam *))) -{ - int idx, thread_count, pages = -1; - bool wait = migrate_compress_wait_thread(); - - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); -retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - CompressParam *param = &comp_param[idx]; - qemu_mutex_lock(¶m->mutex); - param->done = false; - send_queued_data(param); - compress_reset_result(param); - set_compress_params(param, block, offset); - - qemu_cond_signal(¶m->cond); - qemu_mutex_unlock(¶m->mutex); - pages = 1; - break; - } - } - - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; -} - #define PAGE_ALL_CLEAN 0 #define PAGE_TRY_AGAIN 1 #define PAGE_DIRTY_FOUND 2