From patchwork Tue Nov 6 12:20:21 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10670237 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 95DAA13A4 for ; Tue, 6 Nov 2018 12:20:39 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 840FC2A3F6 for ; Tue, 6 Nov 2018 12:20:39 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 786FD2A418; Tue, 6 Nov 2018 12:20:39 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 206D72A3F6 for ; Tue, 6 Nov 2018 12:20:39 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1730476AbeKFVpf (ORCPT ); Tue, 6 Nov 2018 16:45:35 -0500 Received: from mail-pf1-f194.google.com ([209.85.210.194]:37293 "EHLO mail-pf1-f194.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1730466AbeKFVpe (ORCPT ); Tue, 6 Nov 2018 16:45:34 -0500 Received: by mail-pf1-f194.google.com with SMTP id u13-v6so6044907pfm.4 for ; Tue, 06 Nov 2018 04:20:37 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=Ed2frqWQ0WCOTbULrLvVYoXHc41bwO1VJwCbNB+rA5kQlcT9Wo9de6oxqBWmjoggQw K1zZsZsZiB+T9+jt8UKO+w7vqpRqK1c35f5DCNoH6B4fRGNKG6V+frn3gyvQb7rfA8GO FU1GEtlIK6yCoU4+QUcu7q4xTLHRaO/WUNMo4P2chDrMX+qIPg6ngcV4WLSoxF+hPg6g o8dZ8q5WTySeLkQtG10a3YX5H9FPhzlZFZUhTLxr2KtsrQ5lT/EXDBBJ/xZyiOz3E/3L +imawpKFwcF/8jrhZrYXaAXuGSIH0VoqUeVWKOCGtEnDj28LfS+zZdLJiSoo0tA4fTcO 1Ltg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=OJ99zotCeIhtNgfbsIrQc62AwJrOKoiSb+zZxs/PDvReHkQCP6b9Bzr3OjbcnqA3tK siGel1wvlq0RaCCiAEB44qQdXQrQdS8EGH6U8iIwKuWjoinxaxaCWqz4TIcX5BzdhLNI 1PGJ5ftX0j1Nl4TJU3lHKz7NcnaIayOVTUghVCEqK1CVpsVy2kwouWsIXQ3PolPrQb6D Y6E2Mdz1scC8ZRjzIa1OQEVoQvOoKd44Ix8TY9aH9ZkERZTaozpCblkcCXvgGcCzIXdn yIqq1bvTq9Qy0xP89/eSv/wxnOtr2VyrOUHOYq5xVYyBg2yfOUF2vlONs23yBNxMi8L4 OJgA== X-Gm-Message-State: AGRZ1gKAlqp15dCt/cGr5xTUG9v+IJt/nHRCTCzV327pwktNjWcZiBFn edoMZSq2fHZplZ+gwlLcR2I= X-Google-Smtp-Source: AJdET5ezJWyALmDe6vfgs+Wdj/dGOO5zNbcGBQeByVDaueU5LxLPjp9ooyHLH83/Rw0OVJqm41ch6Q== X-Received: by 2002:a65:4049:: with SMTP id h9mr23246694pgp.304.1541506837191; Tue, 06 Nov 2018 04:20:37 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.33 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:36 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v2 1/5] bitops: introduce change_bit_atomic Date: Tue, 6 Nov 2018 20:20:21 +0800 Message-Id: <20181106122025.3487-2-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It will be used by threaded workqueue Signed-off-by: Xiao Guangrong --- include/qemu/bitops.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h index 3f0926cf40..c522958852 100644 --- a/include/qemu/bitops.h +++ b/include/qemu/bitops.h @@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr) *p ^= mask; } +/** + * change_bit_atomic - Toggle a bit in memory atomically + * @nr: Bit to change + * @addr: Address to start counting from + */ +static inline void change_bit_atomic(long nr, unsigned long *addr) +{ + unsigned long mask = BIT_MASK(nr); + unsigned long *p = addr + BIT_WORD(nr); + + atomic_xor(p, mask); +} + /** * test_and_set_bit - Set a bit and return its old value * @nr: Bit to set From patchwork Tue Nov 6 12:20:22 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10670239 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 6E8F213A4 for ; Tue, 6 Nov 2018 12:20:45 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 5C0C52A418 for ; Tue, 6 Nov 2018 12:20:45 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 506F52A414; Tue, 6 Nov 2018 12:20:45 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 0F6C92A415 for ; Tue, 6 Nov 2018 12:20:44 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1730466AbeKFVpk (ORCPT ); Tue, 6 Nov 2018 16:45:40 -0500 Received: from mail-pf1-f194.google.com ([209.85.210.194]:45253 "EHLO mail-pf1-f194.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1730479AbeKFVpj (ORCPT ); Tue, 6 Nov 2018 16:45:39 -0500 Received: by mail-pf1-f194.google.com with SMTP id p17-v6so5662381pfj.12 for ; Tue, 06 Nov 2018 04:20:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=DCSTE3C9H/zKxT8TB1q3TndGZmpdq5RyqNFTw5hO3zg=; b=anuNOmWxfOy3f2DH/MY4mAxXWGYhrt1Tre7MniieHKC1wQ5EhlHTsT6TN6kR1Mdt8j J/HmVzUR5E0BVEiB88ffVnsGNeZWW304X6JtjWcy7F6Z7D4zQTCBG09mA6IfD2aTZYWY ayjM4ICdm/eM4xdP/jjv8TZdikjIw5rxv6pgneDaNdiOnEk4v9ZBRHlQqlj90LBnrUUS YuDxMrcPPqexNnq9CA4KDb1gxhLgc/Vil31Nh69SwkvpVfq7UnfWHkis7UwNI49EdAyU AdYzVtVArGslhsV2E/tWREeCqb4eVKhnGyV0WciM5nxbLBtmuYKixc8YcRzD89kKuLew zf7A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=DCSTE3C9H/zKxT8TB1q3TndGZmpdq5RyqNFTw5hO3zg=; b=Qb49MbzM/hpVbw4pxRBe2eKjaEoJ5U8dRxGCesLydX9Liz85DK52/xb8K4YyWI9LaO n7yz3FWAOxs+58fb9oaceZcRBf0w5jY4zp3CK+koasQFMFdc/ENFfe/aYU0Re7PwEX5+ gFcaZ0DTouch2iWirYXrMtjy22zDzQtJOJTf4nyEHul9256+6G5kfjN5vPqNy+ovLxlr mJYVxda6YvJgnHLgvS3ZWGSvLXaDIMBm9ESrfQn8NC6+kmnj6f245Jwleeqha4a4nsKV e2r+V2dE1Q4jJotZPfCV/vyCFOc3El8AK+vUvfajdlZChzee/e3w2V3zzKOJvtDovgFF t2cg== X-Gm-Message-State: AGRZ1gK8/LIQfleOlAtBDvtrAATgi9TJ2Kr08NKT3GPdLTfaVLXYvrYF dUJWAHLYfYYhPJtypqznYEc= X-Google-Smtp-Source: AJdET5f85IzNkvkkGdSPAhocuDKHr/YmwYSttORpWD/Pf9miGwRko+AA/0cyyfG0GbW2L9WHC8HK8g== X-Received: by 2002:a63:4d1d:: with SMTP id a29-v6mr20926506pgb.408.1541506841299; Tue, 06 Nov 2018 04:20:41 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.37 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:40 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v2 2/5] util: introduce threaded workqueue Date: Tue, 6 Nov 2018 20:20:22 +0800 Message-Id: <20181106122025.3487-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong This modules implements the lockless and efficient threaded workqueue. Three abstracted objects are used in this module: - Request. It not only contains the data that the workqueue fetches out to finish the request but also offers the space to save the result after the workqueue handles the request. It's flowed between user and workqueue. The user fills the request data into it when it is owned by user. After it is submitted to the workqueue, the workqueue fetched data out and save the result into it after the request is handled. All the requests are pre-allocated and carefully partitioned between threads so there is no contention on the request, that make threads be parallel as much as possible. - User, i.e, the submitter It's the one fills the request and submits it to the workqueue, the result will be collected after it is handled by the work queue. The user can consecutively submit requests without waiting the previous requests been handled. It only supports one submitter, you should do serial submission by yourself if you want more, e.g, use lock on you side. - Workqueue, i.e, thread Each workqueue is represented by a running thread that fetches the request submitted by the user, do the specified work and save the result to the request. Signed-off-by: Xiao Guangrong --- include/qemu/threaded-workqueue.h | 94 ++++++++ util/Makefile.objs | 1 + util/threaded-workqueue.c | 466 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+) create mode 100644 include/qemu/threaded-workqueue.h create mode 100644 util/threaded-workqueue.c diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..d7eb66c8d2 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,94 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* return the size of each request */ + int (*thread_get_request_size)(void); + + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + int thread_request_nr, + ThreadedWorkqueueOps *ops); + +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists. + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..966479631a --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,466 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 +#define BITS_ALIGNED_TO_CACHE(_bits_) \ + QEMU_ALIGN_UP(_bits_, SMP_CACHE_BYTES * BITS_PER_BYTE) + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + bool done; + /* + * the index to Threads::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + int index; +}; +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* + * the request region in Threads::requests that the thread + * need handle + */ + int start_request_index; + int end_request_index; + + /* + * the interim bitmap used by the thread to avoid frequent + * memory allocation + */ + unsigned long *result_bitmap; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + /* the event used to wake up the thread */ + QemuEvent ev; +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* + * in order to avoid contention, the @requests is partitioned to + * @threads_nr pieces, each thread exclusively handles + * @thread_request_nr requests in the array. + */ + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the ï¼ requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + unsigned long *request_fill_bitmap; + /* after handles the request, the thread flips the bit. */ + unsigned long *request_done_bitmap; + + /* + * the interim bitmap used by the user to avoid frequent + * memory allocation + */ + unsigned long *result_bitmap; + + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + + /* the number of requests that each thread need handle */ + unsigned int thread_request_nr; + unsigned int total_requests; + + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + ThreadedWorkqueueOps *ops; + + const char *name; + QemuEvent ev; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(Threads *threads, int request_index) +{ + ThreadRequest *request; + + request = threads->requests + request_index * threads->request_size; + + assert(request->index == request_index); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->index; +} + +static int thread_to_first_request_index(Threads *threads, int thread_id) +{ + thread_id %= threads->threads_nr; + return thread_id * threads->thread_request_nr; +} + +static int request_index_to_thread(Threads *threads, int request_index) +{ + return request_index / threads->thread_request_nr; +} + +/* + * free request: the request is not used by any thread, however, it might + * contian the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's commited + * to the thread, i,e. it's owned by thread. + */ +static unsigned long *get_free_request_bitmap(Threads *threads) +{ + bitmap_xor(threads->result_bitmap, threads->request_fill_bitmap, + threads->request_done_bitmap, threads->total_requests); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetch the result out. + */ + smp_rmb(); + + return threads->result_bitmap; +} + +static int find_free_request_index(Threads *threads) +{ + unsigned long *result_bitmap = get_free_request_bitmap(threads); + int index, cur_index; + + cur_index = thread_to_first_request_index(threads, + threads->current_thread_index); + +retry: + index = find_next_zero_bit(result_bitmap, threads->total_requests, + cur_index); + if (index < threads->total_requests) { + return index; + } + + /* if we get nothing, start it over. */ + if (cur_index != 0) { + cur_index = 0; + goto retry; + } + + return -1; +} + +static void mark_request_valid(Threads *threads, int request_index) +{ + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit(request_index, threads->request_fill_bitmap); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + int index; + + bitmap_xor(thread->result_bitmap, threads->request_fill_bitmap, + threads->request_done_bitmap, threads->total_requests); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_next_bit(thread->result_bitmap, threads->total_requests, + thread->start_request_index); + return index > thread->end_request_index ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, thread->threads->request_done_bitmap); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + assert(index >= thread->start_request_index && + index <= thread->end_request_index); + return index_to_request(thread->threads, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + qemu_event_set(&threads->ev); + } + + return NULL; +} + +static void uninit_requests(Threads *threads, int free_nr) +{ + ThreadRequest *request; + int i; + + for (request = threads->requests, i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + + g_free(threads->result_bitmap); + g_free(threads->request_fill_bitmap); + g_free(threads->request_done_bitmap); + g_free(threads->requests); +} + +static int init_requests(Threads *threads) +{ + ThreadRequest *request; + int aligned_requests, free_nr = 0, ret = -1; + + aligned_requests = BITS_ALIGNED_TO_CACHE(threads->total_requests); + threads->request_fill_bitmap = bitmap_new(aligned_requests); + threads->request_done_bitmap = bitmap_new(aligned_requests); + threads->result_bitmap = bitmap_new(threads->total_requests); + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + + threads->request_size = threads->ops->thread_get_request_size(); + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + threads->requests = g_try_malloc0_n(threads->total_requests, + threads->request_size); + if (!threads->requests) { + goto exit; + } + + for (request = threads->requests; free_nr < threads->total_requests; + free_nr++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->index = free_nr; + request = (void *)request + threads->request_size; + } + + return 0; + +exit: + uninit_requests(threads, free_nr); + return ret; +} + +static void uninit_thread_data(Threads *threads) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < threads->threads_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].ev); + g_free(thread_local[i].result_bitmap); + } +} + +static void init_thread_data(Threads *threads) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int start_index, end_index, i; + + for (i = 0; i < threads->threads_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + start_index = thread_to_first_request_index(threads, i); + end_index = start_index + threads->thread_request_nr - 1; + thread_local[i].start_request_index = start_index; + thread_local[i].end_request_index = end_index; + + thread_local[i].result_bitmap = bitmap_new(threads->total_requests); + + qemu_event_init(&thread_local[i].ev, false); + + name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + int thread_request_nr, ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->name = name; + threads->ops = ops; + + threads->threads_nr = threads_nr; + threads->thread_request_nr = thread_request_nr; + + threads->total_requests = thread_request_nr * threads_nr; + if (init_requests(threads) < 0) { + g_free(threads); + return NULL; + } + + qemu_event_init(&threads->ev, false); + init_thread_data(threads); + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads); + uninit_requests(threads, threads->total_requests); + qemu_event_destroy(&threads->ev); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + int index; + + index = find_free_request_index(threads); + if (index < 0) { + return NULL; + } + + request = index_to_request(threads, index); + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int request_index = request_to_index(req); + int thread_index = request_index_to_thread(threads, request_index); + ThreadLocal *thread_local = &threads->per_thread_data[thread_index]; + + assert(!req->done); + + mark_request_valid(threads, request_index); + + threads->current_thread_index = ++thread_index; + qemu_event_set(&thread_local->ev); +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + unsigned long *result_bitmap; + int index = 0; + +retry: + qemu_event_reset(&threads->ev); + result_bitmap = get_free_request_bitmap(threads); + for (; index < threads->total_requests; index++) { + if (test_bit(index, result_bitmap)) { + qemu_event_wait(&threads->ev); + goto retry; + }; + + request_done(threads, index_to_request(threads, index)); + } +} From patchwork Tue Nov 6 12:20:23 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10670241 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id B249A14BD for ; Tue, 6 Nov 2018 12:20:48 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id A0B142A3BB for ; Tue, 6 Nov 2018 12:20:48 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 927992A41D; Tue, 6 Nov 2018 12:20:48 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id C1E872A3BB for ; Tue, 6 Nov 2018 12:20:47 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387515AbeKFVpo (ORCPT ); Tue, 6 Nov 2018 16:45:44 -0500 Received: from mail-pf1-f196.google.com ([209.85.210.196]:39453 "EHLO mail-pf1-f196.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1730480AbeKFVpn (ORCPT ); Tue, 6 Nov 2018 16:45:43 -0500 Received: by mail-pf1-f196.google.com with SMTP id n11-v6so6041275pfb.6 for ; Tue, 06 Nov 2018 04:20:45 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=h+93I4KEYaqBzpYCU5F/5aYLsBjGY/tiNdC2Huafomg=; b=O5xjg9+jM7XNdO3GsYeN5Q6ox2ForyZCFx47oAYMudMkGMUc/s2mJOMXq3QhsgvYlx v1AQJNNbjPVjnW6HwwjkhwRArRrzMmmL5qDNDUf0nJ3Fr+WRYh15LcOXS1y3Yf4itcwb NS/ehuKDIiW0tJalJO95zhih4oGHCv4Wg0FgjwynR3huyngjuHw9X9yVrgUfO+kiMR12 37zerEpx8X9s3/sgX8soxj0RBe5oIQbBalARnldc0BxRnzwGklKOOHyQOUtYlaejzrp/ dh9OIDRKde/tddfaMfHAj4t2gEg6qQcJ88WvKyQMM2R7UtZOQ7H5Vhb7Uk8P3LoTAB0R S00A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=h+93I4KEYaqBzpYCU5F/5aYLsBjGY/tiNdC2Huafomg=; b=gyK+vp3RyKHjh63360j/rRk+cduUCjXcEy+3P04CxUIWzIITPQJ05Xc8Otn1yyR8Hj Y/eZnowLaQgwuqsd4oC2VSa4GvzAzlHWR0UBD/VmqQxkoLLvGyMoknPJoBwK5lQBlx6F zjWnE4i1pbIFxOSYlqMSaM7i7pTtvHBoizR7dyOONaiRvluUxWJYTTolY+0aIDr5wuec rQ68oBu22uV+UjR5DKcqAKzQ4mtNmj1u0GH3x3jcr7dQdlhZJDhKT+V6/vDN+lpDUks+ 4zXZ/AGAoIIVnRT2u7KAEDONkIt4cFvisLf57ESPPVZV3jVOARQi2XidRK8ugIOtWyZj /KVg== X-Gm-Message-State: AGRZ1gLxAKK9LECe8bgK3PohiaeQ2IoM6WlStZ3vf6h2b434/23hgm3V IqtC0IbRPBiaoYnetPW6GwA= X-Google-Smtp-Source: AJdET5f1VyAO7c9EHVXOgX8ycJBta0RCDvr9P+2BoyAAFo9l4flep9NsA5dSzMEh0zMqxgd3rhGAGw== X-Received: by 2002:a62:4896:: with SMTP id q22-v6mr26153401pfi.248.1541506845496; Tue, 06 Nov 2018 04:20:45 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.41 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:44 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v2 3/5] migration: use threaded workqueue for compression Date: Tue, 6 Nov 2018 20:20:23 +0800 Message-Id: <20181106122025.3487-4-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 313 +++++++++++++++++++++----------------------------------- 1 file changed, 115 insertions(+), 198 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 7e7deec4d8..acca842aff 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/threaded-workqueue.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,25 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1794,128 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static int compress_thread_get_data_size(void) +{ + return sizeof(CompressData); +} + +static int compress_thread_data_init(void *request) +{ + CompressData *cd = request; + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + return -1; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + return -1; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_thread_data_fini(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static ThreadedWorkqueueOps compress_ops = { + .thread_get_request_size = compress_thread_get_data_size, + .thread_request_init = compress_thread_data_init, + .thread_request_uninit = compress_thread_data_fini, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, +}; + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threaded_workqueue_wait_for_requests(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threaded_workqueue_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threaded_workqueue_create("compress", + migrate_compress_threads(), + DEFAULT_THREAD_REQUEST_NR, &compress_ops); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + cd = threaded_workqueue_get_request(compress_threads); + if (!cd) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; + return -1; + } + cd->block = block; + cd->offset = offset; + threaded_workqueue_submit_request(compress_threads, cd); + return 1; } /** From patchwork Tue Nov 6 12:20:24 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10670243 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id A0E4313A4 for ; Tue, 6 Nov 2018 12:20:52 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 8D4422A418 for ; Tue, 6 Nov 2018 12:20:52 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 80EF02A41D; Tue, 6 Nov 2018 12:20:52 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id CC7A22A41C for ; Tue, 6 Nov 2018 12:20:51 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387791AbeKFVps (ORCPT ); Tue, 6 Nov 2018 16:45:48 -0500 Received: from mail-pf1-f195.google.com ([209.85.210.195]:41513 "EHLO mail-pf1-f195.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2387604AbeKFVps (ORCPT ); Tue, 6 Nov 2018 16:45:48 -0500 Received: by mail-pf1-f195.google.com with SMTP id e22-v6so6035231pfn.8 for ; Tue, 06 Nov 2018 04:20:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=bSrHkivhHJMtQkgfkJcIW0le1CciJhfpKRO9fbQYbd8=; b=H84aoVrlmJgc9brppb8J39vP52aoWpHArMya35+I82V6fUJLeG25JbeTE0MW+nhhp3 SFt5WGNDz+KB5HQZoPnztqzwDvz34Amge50GpvTj/NZt1oad/VuWQlWHypILfZD1IXLw S1eNA5DshtZp6JxH2f/EM34GFlxR6xUd72r4o/WDkv+SFOJpdrHjj7d0KSTtT0TPU2gJ aS17bw3YN4FjTb3Ikghmg9A9DTkNyWO5vtPQwcRylvH+qF/zX4RMrCBwoJx8CMCMLv1v 7HI4Xbv52ruRugG2sn7d23iNxcjjBubyABVsjTsMo/qat7AksJ1/wriFyQVPtzvEsFcB p/Hw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=bSrHkivhHJMtQkgfkJcIW0le1CciJhfpKRO9fbQYbd8=; b=Jxn3hml6nNesvQz8lXYceK7GHFeDIekg1v4YLP7y0lQvIEFqOUD+UNY27giIl3JK7u 8k5HCXfQ2ZMFbgJhBuWc0r0dYNT4uhjoU8UOUM4YPAc/TeeYbSeSN5LSUpPJnEiXSrxz tdvOL7nKMO31zFhuWFFGYcDwT7qUQEGGqFf9/Xtw/X3aZJpMka1hYaHn+MsyjDxS+DCY FfX2gxBS+pQrYfcjXjsy79PNjkZpsFCbles15j/2uPOoso0MIz+KTQyc2VLLvxm7hIKc 46NrHrKrXLn4LSiyJcG2oP9rI1WNl9kZkjOaukLAnsxvRQ/bMztxhPf1nXPnSmcjfeDo +kjg== X-Gm-Message-State: AGRZ1gJLP0bd3X+cuQfLQnB85E/CSoQhjLz0zNCHldMTKK3yqRLHmD0j cORhkawCABW2jc2f2YWsx5Y= X-Google-Smtp-Source: AJdET5dJGXd5Wv1IOAUylvx06lh5T2hCdHmZ7njJQtdVcpiiIcLUhSTQJ8lwYCPr1Ww1L0jRWIkheQ== X-Received: by 2002:a63:eb42:: with SMTP id b2-v6mr23221907pgk.348.1541506849676; Tue, 06 Nov 2018 04:20:49 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.45 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:49 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v2 4/5] migration: use threaded workqueue for decompression Date: Tue, 6 Nov 2018 20:20:24 +0800 Message-Id: <20181106122025.3487-5-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 225 ++++++++++++++++++++------------------------------------ 1 file changed, 81 insertions(+), 144 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index acca842aff..834198f11c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; /* Multiple fd's */ @@ -3404,6 +3388,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } + /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, @@ -3429,166 +3414,118 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, return stream->total_out; } -static void *do_data_decompress(void *opaque) +struct DecompressData { + /* filled by migration thread.*/ + void *des; + uint8_t *compbuf; + size_t len; + + z_stream stream; +}; +typedef struct DecompressData DecompressData; + +static Threads *decompress_threads; + +static int decompress_thread_get_data_size(void) { - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } + return sizeof(DecompressData); +} - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); +static int decompress_thread_data_init(void *request) +{ + DecompressData *dd = request; - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } + if (inflateInit(&dd->stream) != Z_OK) { + return -1; } - qemu_mutex_unlock(¶m->mutex); - return NULL; + dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + return 0; } -static int wait_for_decompress_done(void) +static void decompress_thread_data_fini(void *request) { - int idx, thread_count; + DecompressData *dd = request; - if (!migrate_use_compression()) { - return 0; - } + inflateEnd(&dd->stream); + g_free(dd->compbuf); +} - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!decomp_param[idx].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +static void decompress_thread_data_handler(void *request) +{ + DecompressData *dd = request; + unsigned long pagesize = TARGET_PAGE_SIZE; + int ret; + + ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize, + dd->compbuf, dd->len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); } -static void compress_threads_load_cleanup(void) +static void decompress_thread_data_done(void *request) { - int i, thread_count; +} + +static ThreadedWorkqueueOps decompress_ops = { + .thread_get_request_size = decompress_thread_get_data_size, + .thread_request_init = decompress_thread_data_init, + .thread_request_uninit = decompress_thread_data_fini, + .thread_request_handler = decompress_thread_data_handler, + .thread_request_done = decompress_thread_data_done, +}; +static int decompress_init(QEMUFile *f) +{ if (!migrate_use_compression()) { - return; + return 0; } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } + decomp_file = f; + decompress_threads = threaded_workqueue_create("decompress", + migrate_decompress_threads(), + DEFAULT_THREAD_REQUEST_NR, &decompress_ops); + return decompress_threads ? 0 : -1; +} - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; +static void decompress_fini(void) +{ + if (!decompress_threads) { + return; } - g_free(decompress_threads); - g_free(decomp_param); + + threaded_workqueue_destroy(decompress_threads); decompress_threads = NULL; - decomp_param = NULL; decomp_file = NULL; } -static int compress_threads_load_setup(QEMUFile *f) +static int flush_decompressed_data(void) { - int i, thread_count; - if (!migrate_use_compression()) { return 0; } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; + threaded_workqueue_wait_for_requests(decompress_threads); + return qemu_file_get_error(decomp_file); } static void decompress_data_with_multi_threads(QEMUFile *f, - void *host, int len) + void *host, size_t len) { - int idx, thread_count; + DecompressData *dd; - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (decomp_param[idx].done) { - decomp_param[idx].done = false; - qemu_mutex_lock(&decomp_param[idx].mutex); - qemu_get_buffer(f, decomp_param[idx].compbuf, len); - decomp_param[idx].des = host; - decomp_param[idx].len = len; - qemu_cond_signal(&decomp_param[idx].cond); - qemu_mutex_unlock(&decomp_param[idx].mutex); - break; - } - } - if (idx < thread_count) { - break; - } else { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +retry: + dd = threaded_workqueue_get_request(decompress_threads); + if (!dd) { + goto retry; } - qemu_mutex_unlock(&decomp_done_lock); + + dd->des = host; + dd->len = len; + qemu_get_buffer(f, dd->compbuf, len); + threaded_workqueue_submit_request(decompress_threads, dd); } /* @@ -3683,7 +3620,7 @@ void colo_release_ram_cache(void) */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup(f)) { + if (decompress_init(f)) { return -1; } @@ -3704,7 +3641,7 @@ static int ram_load_cleanup(void *opaque) } xbzrle_load_cleanup(); - compress_threads_load_cleanup(); + decompress_fini(); RAMBLOCK_FOREACH_MIGRATABLE(rb) { g_free(rb->receivedmap); @@ -4106,7 +4043,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - ret |= wait_for_decompress_done(); + ret |= flush_decompressed_data(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); From patchwork Tue Nov 6 12:20:25 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10670245 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id B394513A4 for ; Tue, 6 Nov 2018 12:20:56 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id A34512A3F6 for ; Tue, 6 Nov 2018 12:20:56 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 97AA92A421; Tue, 6 Nov 2018 12:20:56 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id E126F2A41C for ; Tue, 6 Nov 2018 12:20:55 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387910AbeKFVpw (ORCPT ); Tue, 6 Nov 2018 16:45:52 -0500 Received: from mail-pf1-f194.google.com ([209.85.210.194]:38634 "EHLO mail-pf1-f194.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2387848AbeKFVpv (ORCPT ); Tue, 6 Nov 2018 16:45:51 -0500 Received: by mail-pf1-f194.google.com with SMTP id b11-v6so6045482pfi.5 for ; Tue, 06 Nov 2018 04:20:54 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=jznZJVjlRLoD9TNH6b8XfAsUK78wEZWIsv5uQU7dmjQ=; b=XKxVl5zDkr+Jqt/XwiYms5HgLgo5bc4trnulWq73QfHgBX9d+P0CG+WYUM6jPiwr+W EESPJkX0arpFOOwe+oXjtUPgHGtnWuDIbKQjGNi7lwpZRLLHVCZ4lcJK/diUEt40tRPO k4rXegWgZdT8x9sFvuivHTPZDOLq/3CTGIEH7vKeOtwf59AAppx5h0WMwYWR1zhNkas+ 7Tn/k5qvPdyF0umOAaPLE+fDrWDUXaKqUNsSVuc/AGh53o8A0QiuHqHi9jo2+F00YqLG UKEHodWfx7QTdGzJ1XQVeiBeUOHeNTJS4MJGxzAInbQS2/ewODhQkEtBPH0aIX15KgLO Qefg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=jznZJVjlRLoD9TNH6b8XfAsUK78wEZWIsv5uQU7dmjQ=; b=qNYw6QejugPIcz9sC3Wa7QjMlDSmUK7jBiSfZHX2Y0QCfBaQb4+34I15hZnb8mg1Ma 1LnWInxE3Oaq8ZTjZXRtDwTs3oq2asrk4wQ2WHvIi69R/7Mq3z4yKbHJLjDLpRrprJxp OeBglhbwfZogGgPbNybxUEBbddukLnT1NQA2ymboAan0RUTAk9vxVSU83Ow5ScLCSybu xVV0FAZpeyCQZ9JOo1FAO9xZcLfxVNhVv0NDUdAdhJLJmiW9osPgdMZMD7pGnfUpPmba A/z3mgn2El6l0quZLC745gHcodkd3a/UInBvSYnbPlq3fRzO7R/ZSLQolLIZUIB/W9r4 9WGg== X-Gm-Message-State: AGRZ1gIEzxjvwwxJRq3zgTyQ6/fGrDRaHyoiGEY5YjT1WvC3gv52OSJA m+F3OXlPlOh5daL5DVdpwX8= X-Google-Smtp-Source: AJdET5dP8D0zZIR+KN+46nAH9f//8cE3SqKupptEdNFlDdaFsAdZjjkh0XKDywC/ocMbsPH1fvUVrw== X-Received: by 2002:a63:6205:: with SMTP id w5mr23315591pgb.53.1541506853788; Tue, 06 Nov 2018 04:20:53 -0800 (PST) Received: from localhost.localdomain ([203.205.141.52]) by smtp.gmail.com with ESMTPSA id v4-v6sm19050021pff.9.2018.11.06.04.20.49 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Tue, 06 Nov 2018 04:20:53 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v2 5/5] tests: add threaded-workqueue-bench Date: Tue, 6 Nov 2018 20:20:25 +0800 Message-Id: <20181106122025.3487-6-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181106122025.3487-1-xiaoguangrong@tencent.com> References: <20181106122025.3487-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It's the benhcmark of threaded-workqueue, also it's a good example to show how threaded-workqueue is used Signed-off-by: Xiao Guangrong --- tests/Makefile.include | 5 +- tests/threaded-workqueue-bench.c | 256 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 260 insertions(+), 1 deletion(-) create mode 100644 tests/threaded-workqueue-bench.c diff --git a/tests/Makefile.include b/tests/Makefile.include index d2e577eabb..a4deb210ab 100644 --- a/tests/Makefile.include +++ b/tests/Makefile.include @@ -499,7 +499,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \ tests/test-rcu-tailq.o \ tests/test-qdist.o tests/test-shift128.o \ tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \ - tests/atomic_add-bench.o tests/atomic64-bench.o + tests/atomic_add-bench.o tests/atomic64-bench.o \ + tests/threaded-workqueue-bench.o $(test-obj-y): QEMU_INCLUDES += -Itests QEMU_CFLAGS += -I$(SRC_PATH)/tests @@ -555,6 +556,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y) tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y) tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y) tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y) +tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \ + $(test-util-obj-y) tests/fp/%: $(MAKE) -C $(dir $@) $(notdir $@) diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c new file mode 100644 index 0000000000..88026f1a8f --- /dev/null +++ b/tests/threaded-workqueue-bench.c @@ -0,0 +1,256 @@ +/* + * Threaded Workqueue Benchmark + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ +#include + +#include "qemu/osdep.h" +#include "exec/cpu-common.h" +#include "qemu/error-report.h" +#include "migration/qemu-file.h" +#include "qemu/threaded-workqueue.h" + +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1 << PAGE_SHIFT) +#define DEFAULT_THREAD_NR 2 +#define DEFAULT_MEM_SIZE 1 +#define DEFAULT_REPEATED_COUNT 3 + +static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, + int64_t pos) +{ + int i, size = 0; + + for (i = 0; i < iovcnt; i++) { + size += iov[i].iov_len; + } + return size; +} + +static int test_fclose(void *opaque) +{ + return 0; +} + +static const QEMUFileOps test_write_ops = { + .writev_buffer = test_writev_buffer, + .close = test_fclose +}; + +static QEMUFile *dest_file; + +static const QEMUFileOps empty_ops = { }; + +struct CompressData { + uint8_t *ram_addr; + QEMUFile *file; + z_stream stream; +}; +typedef struct CompressData CompressData; + +static int compress_request_size(void) +{ + return sizeof(CompressData); +} + +static int compress_request_init(void *request) +{ + CompressData *cd = request; + + if (deflateInit(&cd->stream, 1) != Z_OK) { + return -1; + } + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_request_uninit(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + int blen; + + blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr, + PAGE_SIZE); + if (blen < 0) { + error_report("compressed data failed!"); + qemu_file_set_error(dest_file, blen); + } +} + +struct CompressStats { + unsigned long pages; + unsigned long compressed_size; +}; +typedef struct CompressStats CompressStats; + +static CompressStats comp_stats; + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(dest_file, cd->file); + + comp_stats.pages++; + comp_stats.compressed_size += bytes_xmit; +} + +static ThreadedWorkqueueOps ops = { + .thread_get_request_size = compress_request_size, + .thread_request_init = compress_request_init, + .thread_request_uninit = compress_request_uninit, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, +}; + +static void compress_threads_save_cleanup(Threads *threads) +{ + threaded_workqueue_destroy(threads); + qemu_fclose(dest_file); +} + +static Threads *compress_threads_save_setup(int threads_nr, int requests_nr) +{ + Threads *compress_threads; + + dest_file = qemu_fopen_ops(NULL, &test_write_ops); + compress_threads = threaded_workqueue_create("compress", threads_nr, + requests_nr, &ops); + assert(compress_threads); + return compress_threads; +} + +static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr) +{ + CompressData *cd; + +retry: + cd = threaded_workqueue_get_request(threads); + if (!cd) { + goto retry; + } + + cd->ram_addr = addr; + threaded_workqueue_submit_request(threads, cd); +} + +static void run(Threads *threads, uint8_t *mem, unsigned long mem_size, + int repeated_count) +{ + uint8_t *ptr = mem, *end = mem + mem_size; + uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT; + double rate; + int i; + + for (i = 0; i < repeated_count; i++) { + ptr = mem; + memset(&comp_stats, 0, sizeof(comp_stats)); + + start_ts = g_get_monotonic_time(); + for (ptr = mem; ptr < end; ptr += PAGE_SIZE) { + *ptr = 0x10; + compress_page_with_multi_thread(threads, ptr); + } + threaded_workqueue_wait_for_requests(threads); + spend = g_get_monotonic_time() - start_ts; + total_ts += spend; + + if (comp_stats.pages != pages) { + printf("ERROR: pages are compressed %ld, expect %ld.\n", + comp_stats.pages, pages); + exit(-1); + } + + rate = (double)(comp_stats.pages * PAGE_SIZE) / + comp_stats.compressed_size; + printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i, + comp_stats.pages, spend, rate); + } + + printf("AVG: Time Cost %ld.\n", total_ts / repeated_count); +} + +static void usage(const char *arg0) +{ + printf("\nThreaded Workqueue Benchmark.\n"); + printf("Usage:\n"); + printf(" %s [OPTIONS]\n", arg0); + printf("Options:\n"); + printf(" -t the number of threads (default %d).\n", + DEFAULT_THREAD_NR); + printf(" -r: the number of requests handled by each thread (default %d).\n", + DEFAULT_THREAD_REQUEST_NR); + printf(" -m: the size of the memory (G) used to test (default %dG).\n", + DEFAULT_MEM_SIZE); + printf(" -c: the repeated count (default %d).\n", + DEFAULT_REPEATED_COUNT); + printf(" -h show this help info.\n"); +} + +int main(int argc, char *argv[]) +{ + int c, threads_nr, requests_nr, repeated_count; + unsigned long mem_size; + uint8_t *mem; + Threads *threads; + + threads_nr = DEFAULT_THREAD_NR; + requests_nr = DEFAULT_THREAD_REQUEST_NR; + mem_size = DEFAULT_MEM_SIZE; + repeated_count = DEFAULT_REPEATED_COUNT; + + for (;;) { + c = getopt(argc, argv, "t:r:m:c:h"); + if (c < 0) { + break; + } + + switch (c) { + case 't': + threads_nr = atoi(optarg); + break; + case 'r': + requests_nr = atoi(optarg); + break; + case 'm': + mem_size = atol(optarg); + break; + case 'c': + repeated_count = atoi(optarg); + break; + default: + printf("Unkown option: %c.\n", c); + case 'h': + usage(argv[0]); + return -1; + } + } + + printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n", + threads_nr, requests_nr, mem_size, repeated_count); + + mem_size = mem_size << 30; + mem = qemu_memalign(PAGE_SIZE, mem_size); + memset(mem, 0, mem_size); + + threads = compress_threads_save_setup(threads_nr, requests_nr); + run(threads, mem, mem_size, repeated_count); + compress_threads_save_cleanup(threads); + return 0; +}