From patchwork Thu Nov 22 07:20:24 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693591 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 17BEF5A4 for ; Thu, 22 Nov 2018 07:20:43 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id EBD102CBBB for ; Thu, 22 Nov 2018 07:20:42 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id DF2A12CBD5; Thu, 22 Nov 2018 07:20:42 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 870592CBBB for ; Thu, 22 Nov 2018 07:20:42 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392549AbeKVR6t (ORCPT ); Thu, 22 Nov 2018 12:58:49 -0500 Received: from mail-pf1-f196.google.com ([209.85.210.196]:35787 "EHLO mail-pf1-f196.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392545AbeKVR6t (ORCPT ); Thu, 22 Nov 2018 12:58:49 -0500 Received: by mail-pf1-f196.google.com with SMTP id z9so1506121pfi.2 for ; Wed, 21 Nov 2018 23:20:41 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=fTfpmCft4PTzkGnE09K2JFzCUG1MNp4nj1q/RwYt4+W7IBHkl+E45kMJNMhwKJrxMZ AIaAqvO3gkEJzdvfWdEHOJjvVRf2UO7nlUPda501yeflFYfHlvdj2dGJ48fcujbv709u e14uYsMKvse/ptCnjXPGLLlYBMEVRbHORCVUydnoZF+8YnNs9YLPF1vVjyy2gFNXeeeT HVwkRucMH0aE+On8w5E32We9jcCmGCZ12OjmcYQrjTmV7qCYt2fBTqOjLIcCI6AHYdrV dlFN/Ppxv1xOIo4ZFUSzE7vIB6K886dlWczh2rX/MfdlxAWzerR3RS26YHG231R5EzkV Khiw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=8yuKyyw8d4l95Odfc54RiKR7Oe6eGv/wZcUX85zjzVA=; b=XBtNTLW0DtPxHaAdljycX9q7wmEjdHNvrK0lGCk8O21mUAH0ArWsOsfRj7Q9wyBvJB /D0curvqGbIAWDS7OEI486sN+xxVhbeMB3zPKn2DdFT4deODbFyNvwIMx5z6EcI6x0vC 42apzjnWrOV5gnvujRlTl879tBwWlbGttHVD3r527QqzBBtP4mBQN4gRcNXr1/LbM/1R //16j/vDlepaI9cmxcYDKscG5k6s/S5GeRlBDbh4R/S3e/c8cjpqjllDJ+xT4otMs/Ed KZzH5wFLXIJx+ny9HhVQ9HSCzosEmJfXa/7+Oe/Kte9llKJDSiNgl6U1fhnlrk/2kypV Um9w== X-Gm-Message-State: AA+aEWaJgjkrsn4M9q60H/obcWDIH3Uwx76X03iizW4+JKEZRcnr0BW1 6x9jcQFL8Holu78suu5wG+I= X-Google-Smtp-Source: AJdET5eVKFUZJJZQsiU0j9H9aInN5gEHJNj52CgJnvTilAMZNE5Vc9MeJj53tWuLezulXR4zJbumPA== X-Received: by 2002:a63:f141:: with SMTP id o1mr9135320pgk.134.1542871240862; Wed, 21 Nov 2018 23:20:40 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.36 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:40 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 1/5] bitops: introduce change_bit_atomic Date: Thu, 22 Nov 2018 15:20:24 +0800 Message-Id: <20181122072028.22819-2-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It will be used by threaded workqueue Signed-off-by: Xiao Guangrong Reviewed-by: Dr. David Alan Gilbert Reviewed-by: Juan Quintela --- include/qemu/bitops.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/qemu/bitops.h b/include/qemu/bitops.h index 3f0926cf40..c522958852 100644 --- a/include/qemu/bitops.h +++ b/include/qemu/bitops.h @@ -79,6 +79,19 @@ static inline void change_bit(long nr, unsigned long *addr) *p ^= mask; } +/** + * change_bit_atomic - Toggle a bit in memory atomically + * @nr: Bit to change + * @addr: Address to start counting from + */ +static inline void change_bit_atomic(long nr, unsigned long *addr) +{ + unsigned long mask = BIT_MASK(nr); + unsigned long *p = addr + BIT_WORD(nr); + + atomic_xor(p, mask); +} + /** * test_and_set_bit - Set a bit and return its old value * @nr: Bit to set From patchwork Thu Nov 22 07:20:25 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693593 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 9DBD415A7 for ; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 795E72CBBB for ; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 6D4EA2CBD5; Thu, 22 Nov 2018 07:20:49 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 2871D2CBBB for ; Thu, 22 Nov 2018 07:20:48 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392554AbeKVR6z (ORCPT ); Thu, 22 Nov 2018 12:58:55 -0500 Received: from mail-pf1-f193.google.com ([209.85.210.193]:38347 "EHLO mail-pf1-f193.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392552AbeKVR6y (ORCPT ); Thu, 22 Nov 2018 12:58:54 -0500 Received: by mail-pf1-f193.google.com with SMTP id q1so1500288pfi.5 for ; Wed, 21 Nov 2018 23:20:46 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references :mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=lUrOS14dg65z7Ciy/jCUX3X8N0I/zT5JLoqXduK+Icvi2qzqy1c9i6WMbVma41v9I4 3vW8FE9akuA7FaIeFtRfje8VhuC3WIU8Nh+gAiBKxI87j3TwLMHC8s4q/YZw4oHno0l0 thgGBPHToUV0/g7o8VxN0QfQlo70nh8b5NStLPzvtY9L0Sm7VCKUGLy7E40Otqh6Jwtv yL/5p3aMK0iztyl6OXLoyAKnYClby+AXTg/l3IaJXTnXQzganyfqo+IeS5t9aejPVsBE CqFlEXOmrTWWgnuRZANSALuRHjVUBmrAOK9DH8oSeQK5Omk1xRP13nXyFgKhoBa8fm9t 0kKw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=rax0SfV08nU6hzgMl3qgHqkzDVovsOb8w02R0HHxB98=; b=b4AxBzVsDWR7YfxQQ10YJYlIaxa6GJgB19q4TARTtTBufuDyRi5ZJZxrxxwF8EI9x0 C5UWDWSaTRj4dY4J5XpJiHnHN/hNPGJ4oIAPEqcn/ZjF+Lp4w+v3A26yvnBDjhu+dKIM 5UT1hgZe7fkU4naW965ZFELadpBGmzNF8vvIBObYFM5t+T/8CB2cNVO/dXXBAEwBvMnP BMxp2XAhgeNJlwsohaVa6bts3MEwUIg7Gvol508Ml1F4WcDw5/XkHpNsvCbtwkAGy6NU 1QasB78TuAAkEJh5G3GVUh1EFE+SDRO0tSQsu9znL93q8dNzpzb2ondjFEaU5c6+Gt6i NvKg== X-Gm-Message-State: AA+aEWZdRWNTMri8k2F/h90S008MnvoG8xqrtT5tcgMuMwf1339YXP2V mLP0VfMEgfGt8VCR7m8pA+g= X-Google-Smtp-Source: AFSGD/WvgefpkXSrjAyWYYTfxGNyCNJ9cjKKVFveBLbmg2vFTI/ABvhzyhkW2/oDLAceTPS7ZXoFKw== X-Received: by 2002:a63:104d:: with SMTP id 13mr9007481pgq.303.1542871245669; Wed, 21 Nov 2018 23:20:45 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.41 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:45 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 2/5] util: introduce threaded workqueue Date: Thu, 22 Nov 2018 15:20:25 +0800 Message-Id: <20181122072028.22819-3-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> MIME-Version: 1.0 Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong This modules implements the lockless and efficient threaded workqueue. Three abstracted objects are used in this module: - Request. It not only contains the data that the workqueue fetches out to finish the request but also offers the space to save the result after the workqueue handles the request. It's flowed between user and workqueue. The user fills the request data into it when it is owned by user. After it is submitted to the workqueue, the workqueue fetched data out and save the result into it after the request is handled. All the requests are pre-allocated and carefully partitioned between threads so there is no contention on the request, that make threads be parallel as much as possible. - User, i.e, the submitter It's the one fills the request and submits it to the workqueue, the result will be collected after it is handled by the work queue. The user can consecutively submit requests without waiting the previous requests been handled. It only supports one submitter, you should do serial submission by yourself if you want more, e.g, use lock on you side. - Workqueue, i.e, thread Each workqueue is represented by a running thread that fetches the request submitted by the user, do the specified work and save the result to the request. Signed-off-by: Xiao Guangrong --- include/qemu/threaded-workqueue.h | 106 +++++++++ util/Makefile.objs | 1 + util/threaded-workqueue.c | 463 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 570 insertions(+) create mode 100644 include/qemu/threaded-workqueue.h create mode 100644 util/threaded-workqueue.c diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h new file mode 100644 index 0000000000..e0ede496d0 --- /dev/null +++ b/include/qemu/threaded-workqueue.h @@ -0,0 +1,106 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#ifndef QEMU_THREADED_WORKQUEUE_H +#define QEMU_THREADED_WORKQUEUE_H + +#include "qemu/queue.h" +#include "qemu/thread.h" + +/* + * This modules implements the lockless and efficient threaded workqueue. + * + * Three abstracted objects are used in this module: + * - Request. + * It not only contains the data that the workqueue fetches out + * to finish the request but also offers the space to save the result + * after the workqueue handles the request. + * + * It's flowed between user and workqueue. The user fills the request + * data into it when it is owned by user. After it is submitted to the + * workqueue, the workqueue fetched data out and save the result into + * it after the request is handled. + * + * All the requests are pre-allocated and carefully partitioned between + * threads so there is no contention on the request, that make threads + * be parallel as much as possible. + * + * - User, i.e, the submitter + * It's the one fills the request and submits it to the workqueue, + * the result will be collected after it is handled by the work queue. + * + * The user can consecutively submit requests without waiting the previous + * requests been handled. + * It only supports one submitter, you should do serial submission by + * yourself if you want more, e.g, use lock on you side. + * + * - Workqueue, i.e, thread + * Each workqueue is represented by a running thread that fetches + * the request submitted by the user, do the specified work and save + * the result to the request. + */ + +typedef struct Threads Threads; + +struct ThreadedWorkqueueOps { + /* constructor of the request */ + int (*thread_request_init)(void *request); + /* destructor of the request */ + void (*thread_request_uninit)(void *request); + + /* the handler of the request that is called by the thread */ + void (*thread_request_handler)(void *request); + /* called by the user after the request has been handled */ + void (*thread_request_done)(void *request); + + size_t request_size; +}; +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps; + +/* the default number of requests that thread need handle */ +#define DEFAULT_THREAD_REQUEST_NR 4 +/* the max number of requests that thread need handle */ +#define MAX_THREAD_REQUEST_NR (sizeof(uint64_t) * BITS_PER_BYTE) + +/* + * create a threaded queue. Other APIs will work on the Threads it returned + * + * @name: the identity of the workqueue which is used to construct the name + * of threads only + * @threads_nr: the number of threads that the workqueue will create + * @thread_requests_nr: the number of requests that each single thread will + * handle + * @ops: the handlers of the request + * + * Return NULL if it failed + */ +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops); +void threaded_workqueue_destroy(Threads *threads); + +/* + * find a free request where the user can store the data that is needed to + * finish the request + * + * If all requests are used up, return NULL + */ +void *threaded_workqueue_get_request(Threads *threads); +/* submit the request and notify the thread */ +void threaded_workqueue_submit_request(Threads *threads, void *request); + +/* + * wait all threads to complete the request to make sure there is no + * previous request exists + */ +void threaded_workqueue_wait_for_requests(Threads *threads); +#endif diff --git a/util/Makefile.objs b/util/Makefile.objs index 0820923c18..f26dfe5182 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -50,5 +50,6 @@ util-obj-y += range.o util-obj-y += stats64.o util-obj-y += systemd.o util-obj-y += iova-tree.o +util-obj-y += threaded-workqueue.o util-obj-$(CONFIG_LINUX) += vfio-helpers.o util-obj-$(CONFIG_OPENGL) += drm.o diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c new file mode 100644 index 0000000000..2ab37cee8d --- /dev/null +++ b/util/threaded-workqueue.c @@ -0,0 +1,463 @@ +/* + * Lockless and Efficient Threaded Workqueue Abstraction + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/bitmap.h" +#include "qemu/threaded-workqueue.h" + +#define SMP_CACHE_BYTES 64 + +/* + * the request representation which contains the internally used mete data, + * it is the header of user-defined data. + * + * It should be aligned to the nature size of CPU. + */ +struct ThreadRequest { + /* + * the request has been handled by the thread and need the user + * to fetch result out. + */ + uint8_t done; + + /* + * the index to Thread::requests. + * Save it to the padding space although it can be calculated at runtime. + */ + uint8_t request_index; + + /* the index to Threads::per_thread_data */ + unsigned int thread_index; +} QEMU_ALIGNED(sizeof(unsigned long)); +typedef struct ThreadRequest ThreadRequest; + +struct ThreadLocal { + struct Threads *threads; + + /* the index of the thread */ + int self; + + /* thread is useless and needs to exit */ + bool quit; + + QemuThread thread; + + void *requests; + + /* + * the bit in these two bitmaps indicates the index of the ï¼ requests + * respectively. If it's the same, the corresponding request is free + * and owned by the user, i.e, where the user fills a request. Otherwise, + * it is valid and owned by the thread, i.e, where the thread fetches + * the request and write the result. + */ + + /* after the user fills the request, the bit is flipped. */ + uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + /* after handles the request, the thread flips the bit. */ + uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event used to wake up the thread whenever a valid request has + * been submitted + */ + QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES); + + /* + * the event is notified whenever a request has been completed + * (i.e, become free), which is used to wake up the user + */ + QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES); +}; +typedef struct ThreadLocal ThreadLocal; + +/* + * the main data struct represents multithreads which is shared by + * all threads + */ +struct Threads { + /* the request header, ThreadRequest, is contained */ + unsigned int request_size; + unsigned int thread_requests_nr; + unsigned int threads_nr; + + /* the request is pushed to the thread with round-robin manner */ + unsigned int current_thread_index; + + const ThreadedWorkqueueOps *ops; + + ThreadLocal per_thread_data[0]; +}; +typedef struct Threads Threads; + +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index) +{ + ThreadRequest *request; + + request = thread->requests + request_index * thread->threads->request_size; + assert(request->request_index == request_index); + assert(request->thread_index == thread->self); + return request; +} + +static int request_to_index(ThreadRequest *request) +{ + return request->request_index; +} + +static int request_to_thread_index(ThreadRequest *request) +{ + return request->thread_index; +} + +/* + * free request: the request is not used by any thread, however, it might + * contain the result need the user to call thread_request_done() + * + * valid request: the request contains the request data and it's committed + * to the thread, i,e. it's owned by thread. + */ +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread) +{ + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + + /* + * paired with smp_wmb() in mark_request_free() to make sure that we + * read request_done_bitmap before fetching the result out. + */ + smp_rmb(); + + return result_bitmap; +} + +static ThreadRequest +*find_thread_free_request(Threads *threads, ThreadLocal *thread) +{ + uint64_t result_bitmap = get_free_request_bitmap(threads, thread); + int index; + + index = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr); + if (index >= threads->thread_requests_nr) { + return NULL; + } + + return index_to_request(thread, index); +} + +static ThreadRequest *threads_find_free_request(Threads *threads) +{ + ThreadLocal *thread; + ThreadRequest *request; + int cur_thread, thread_index; + + cur_thread = threads->current_thread_index % threads->threads_nr; + thread_index = cur_thread; + do { + thread = threads->per_thread_data + thread_index++; + request = find_thread_free_request(threads, thread); + if (request) { + break; + } + thread_index %= threads->threads_nr; + } while (thread_index != cur_thread); + + return request; +} + +/* + * the change bit operation combined with READ_ONCE and WRITE_ONCE which + * only works on single uint64_t width + */ +static void change_bit_once(long nr, uint64_t *addr) +{ + uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr); + + atomic_rcu_set(addr, value); +} + +static void mark_request_valid(Threads *threads, ThreadRequest *request) +{ + int thread_index = request_to_thread_index(request); + int request_index = request_to_index(request); + ThreadLocal *thread = threads->per_thread_data + thread_index; + + /* + * paired with smp_rmb() in find_first_valid_request_index() to make + * sure the request has been filled before the bit is flipped that + * will make the request be visible to the thread + */ + smp_wmb(); + + change_bit_once(request_index, &thread->request_fill_bitmap); + qemu_event_set(&thread->request_valid_ev); +} + +static int thread_find_first_valid_request_index(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap; + int index; + + request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap); + request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap); + bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap, + threads->thread_requests_nr); + /* + * paired with smp_wmb() in mark_request_valid() to make sure that + * we read request_fill_bitmap before fetch the request out. + */ + smp_rmb(); + + index = find_first_bit(&result_bitmap, threads->thread_requests_nr); + return index >= threads->thread_requests_nr ? -1 : index; +} + +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request) +{ + int index = request_to_index(request); + + /* + * smp_wmb() is implied in change_bit_atomic() that is paired with + * smp_rmb() in get_free_request_bitmap() to make sure the result + * has been saved before the bit is flipped. + */ + change_bit_atomic(index, &thread->request_done_bitmap); + qemu_event_set(&thread->request_free_ev); +} + +/* retry to see if there is available request before actually go to wait. */ +#define BUSY_WAIT_COUNT 1000 + +static ThreadRequest * +thread_busy_wait_for_request(ThreadLocal *thread) +{ + int index, count = 0; + + for (count = 0; count < BUSY_WAIT_COUNT; count++) { + index = thread_find_first_valid_request_index(thread); + if (index >= 0) { + return index_to_request(thread, index); + } + + cpu_relax(); + } + + return NULL; +} + +static void *thread_run(void *opaque) +{ + ThreadLocal *self_data = (ThreadLocal *)opaque; + Threads *threads = self_data->threads; + void (*handler)(void *request) = threads->ops->thread_request_handler; + ThreadRequest *request; + + for ( ; !atomic_read(&self_data->quit); ) { + qemu_event_reset(&self_data->request_valid_ev); + + request = thread_busy_wait_for_request(self_data); + if (!request) { + qemu_event_wait(&self_data->request_valid_ev); + continue; + } + + assert(!request->done); + + handler(request + 1); + request->done = true; + mark_request_free(self_data, request); + } + + return NULL; +} + +static void uninit_thread_requests(ThreadLocal *thread, int free_nr) +{ + Threads *threads = thread->threads; + ThreadRequest *request = thread->requests; + int i; + + for (i = 0; i < free_nr; i++) { + threads->ops->thread_request_uninit(request + 1); + request = (void *)request + threads->request_size; + } + g_free(thread->requests); +} + +static int init_thread_requests(ThreadLocal *thread) +{ + Threads *threads = thread->threads; + ThreadRequest *request; + int ret, i, thread_reqs_size; + + thread_reqs_size = threads->thread_requests_nr * threads->request_size; + thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES); + thread->requests = g_malloc0(thread_reqs_size); + + request = thread->requests; + for (i = 0; i < threads->thread_requests_nr; i++) { + ret = threads->ops->thread_request_init(request + 1); + if (ret < 0) { + goto exit; + } + + request->request_index = i; + request->thread_index = thread->self; + request = (void *)request + threads->request_size; + } + return 0; + +exit: + uninit_thread_requests(thread, i); + return -1; +} + +static void uninit_thread_data(Threads *threads, int free_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + int i; + + for (i = 0; i < free_nr; i++) { + thread_local[i].quit = true; + qemu_event_set(&thread_local[i].request_valid_ev); + qemu_thread_join(&thread_local[i].thread); + qemu_event_destroy(&thread_local[i].request_valid_ev); + qemu_event_destroy(&thread_local[i].request_free_ev); + uninit_thread_requests(&thread_local[i], threads->thread_requests_nr); + } +} + +static int +init_thread_data(Threads *threads, const char *thread_name, int thread_nr) +{ + ThreadLocal *thread_local = threads->per_thread_data; + char *name; + int i; + + for (i = 0; i < thread_nr; i++) { + thread_local[i].threads = threads; + thread_local[i].self = i; + + if (init_thread_requests(&thread_local[i]) < 0) { + goto exit; + } + + qemu_event_init(&thread_local[i].request_free_ev, false); + qemu_event_init(&thread_local[i].request_valid_ev, false); + + name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self); + qemu_thread_create(&thread_local[i].thread, name, + thread_run, &thread_local[i], QEMU_THREAD_JOINABLE); + g_free(name); + } + return 0; + +exit: + uninit_thread_data(threads, i); + return -1; +} + +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr, + unsigned int thread_requests_nr, + const ThreadedWorkqueueOps *ops) +{ + Threads *threads; + + if (threads_nr > MAX_THREAD_REQUEST_NR) { + return NULL; + } + + threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal)); + threads->ops = ops; + threads->threads_nr = threads_nr; + threads->thread_requests_nr = thread_requests_nr; + + QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long))); + threads->request_size = threads->ops->request_size; + threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long)); + threads->request_size += sizeof(ThreadRequest); + + if (init_thread_data(threads, name, threads_nr) < 0) { + g_free(threads); + return NULL; + } + + return threads; +} + +void threaded_workqueue_destroy(Threads *threads) +{ + uninit_thread_data(threads, threads->threads_nr); + g_free(threads); +} + +static void request_done(Threads *threads, ThreadRequest *request) +{ + if (!request->done) { + return; + } + + threads->ops->thread_request_done(request + 1); + request->done = false; +} + +void *threaded_workqueue_get_request(Threads *threads) +{ + ThreadRequest *request; + + request = threads_find_free_request(threads); + if (!request) { + return NULL; + } + + request_done(threads, request); + return request + 1; +} + +void threaded_workqueue_submit_request(Threads *threads, void *request) +{ + ThreadRequest *req = request - sizeof(ThreadRequest); + int thread_index = request_to_thread_index(request); + + assert(!req->done); + mark_request_valid(threads, req); + threads->current_thread_index = thread_index + 1; +} + +void threaded_workqueue_wait_for_requests(Threads *threads) +{ + ThreadLocal *thread; + uint64_t result_bitmap; + int thread_index, index = 0; + + for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) { + thread = threads->per_thread_data + thread_index; + index = 0; +retry: + qemu_event_reset(&thread->request_free_ev); + result_bitmap = get_free_request_bitmap(threads, thread); + + for (; index < threads->thread_requests_nr; index++) { + if (test_bit(index, &result_bitmap)) { + qemu_event_wait(&thread->request_free_ev); + goto retry; + } + + request_done(threads, index_to_request(thread, index)); + } + } +} From patchwork Thu Nov 22 07:20:26 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693595 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 5AE7215A7 for ; Thu, 22 Nov 2018 07:20:53 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 3A1FA2CBBB for ; Thu, 22 Nov 2018 07:20:53 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 2E39B2CBD5; Thu, 22 Nov 2018 07:20:53 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 63F9E2CBBB for ; Thu, 22 Nov 2018 07:20:52 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392557AbeKVR67 (ORCPT ); Thu, 22 Nov 2018 12:58:59 -0500 Received: from mail-pf1-f195.google.com ([209.85.210.195]:45455 "EHLO mail-pf1-f195.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392551AbeKVR66 (ORCPT ); Thu, 22 Nov 2018 12:58:58 -0500 Received: by mail-pf1-f195.google.com with SMTP id g62so1485585pfd.12 for ; Wed, 21 Nov 2018 23:20:50 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=vEVvrbhEOiEpCLe1eJCTeurBkl6VvXXUL0twGNSbhZ0=; b=WkvxnV0brQ/LgDT5RpD2kULRtqBjZaqnN0lAiDpbztxo/qbamf19NJ8+Xzzvw43SmV oWqCmL4e5upkZrA1gIeoeOZot2GzjbHO5yR1vh95IZqvzocqZzctgSiW0wc4jvDYKCIX 6Gh+8q1N4YbMlkEGgiuauLlRrOfFHJyY/MAWIzeVHkIx6kGDZljJfM9HxAXkODc6FS2B FGz0SGPn9iih9x1hONNaY/EIOxkWhlslhCNHLdvVuRDdkDCGTynKFfvzx6bBPZlDvAYg B84oxL+ycQcmTbirsuyorCPerONxyydO5yRXIM8seHYVZ4HZ9hCm7tXFtOBzdgwK21Sm t1Tw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=vEVvrbhEOiEpCLe1eJCTeurBkl6VvXXUL0twGNSbhZ0=; b=h7PAIswExuapa637WOB9dmH1lBxZ8938A2CC/wyFz4kW7CGqKUga1A8F0KTOj7LtG5 OqXD9XXkmAeu8+irU3vc+KeyjYlYvymQnTDEAvnL4+eakNpRoQeL+g/zI01EZgsQ0iC8 Xo2yJjv1CQKfhYGfcIMparqwc1ZuZU6/qJ3dUAr/tagYvd6xA3mcs3T1Xz2nAQTfGu9q +rFIQ6QCxvWahPOgHEwUIPR4vt6D1hp95p939WxJXkapD/VPf13kv175bNY4YgfHKEgY sGhBZB/6oIAZVv/ja3ofT5ikvIltY8+D0vBuAAkiENiexp4Io6/avEpKKX+yX0unGM6L U0Ow== X-Gm-Message-State: AA+aEWZh5frZ/b1HyVX3FlRdJp4+HO3wgq3pI9XTV1RXSMCZJcRTEDKX tV3nX7FmbXwekY+kwYqK5OITOfrF X-Google-Smtp-Source: AFSGD/UWYboi1+yi4bFDtCg2e+8XSh0I5poeMrFe3pxuClwDoLzpweRxsYAwPCzlSg7G7Pe/85o/fw== X-Received: by 2002:a63:3c44:: with SMTP id i4mr8850868pgn.286.1542871249873; Wed, 21 Nov 2018 23:20:49 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.45 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:49 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 3/5] migration: use threaded workqueue for compression Date: Thu, 22 Nov 2018 15:20:26 +0800 Message-Id: <20181122072028.22819-4-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 308 ++++++++++++++++++++------------------------------------ 1 file changed, 110 insertions(+), 198 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 7e7deec4d8..254c08f27b 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -57,6 +57,7 @@ #include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "qemu/threaded-workqueue.h" /***********************************************************/ /* ram save/restore */ @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct CompressParam { - bool done; - bool quit; - bool zero_page; - QEMUFile *file; - QemuMutex mutex; - QemuCond cond; - RAMBlock *block; - ram_addr_t offset; - - /* internally used fields */ - z_stream stream; - uint8_t *originbuf; -}; -typedef struct CompressParam CompressParam; - struct DecompressParam { bool done; bool quit; @@ -377,15 +362,6 @@ struct DecompressParam { }; typedef struct DecompressParam DecompressParam; -static CompressParam *comp_param; -static QemuThread *compress_threads; -/* comp_done_cond is used to wake up the migration thread when - * one of the compression threads has finished the compression. - * comp_done_lock is used to co-work with comp_done_cond. - */ -static QemuMutex comp_done_lock; -static QemuCond comp_done_cond; -/* The empty QEMUFileOps will be used by file in CompressParam */ static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; @@ -394,125 +370,6 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, - ram_addr_t offset, uint8_t *source_buf); - -static void *do_data_compress(void *opaque) -{ - CompressParam *param = opaque; - RAMBlock *block; - ram_addr_t offset; - bool zero_page; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->block) { - block = param->block; - offset = param->offset; - param->block = NULL; - qemu_mutex_unlock(¶m->mutex); - - zero_page = do_compress_ram_page(param->file, ¶m->stream, - block, offset, param->originbuf); - - qemu_mutex_lock(&comp_done_lock); - param->done = true; - param->zero_page = zero_page; - qemu_cond_signal(&comp_done_cond); - qemu_mutex_unlock(&comp_done_lock); - - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } - } - qemu_mutex_unlock(¶m->mutex); - - return NULL; -} - -static void compress_threads_save_cleanup(void) -{ - int i, thread_count; - - if (!migrate_use_compression() || !comp_param) { - return; - } - - thread_count = migrate_compress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!comp_param[i].file) { - break; - } - - qemu_mutex_lock(&comp_param[i].mutex); - comp_param[i].quit = true; - qemu_cond_signal(&comp_param[i].cond); - qemu_mutex_unlock(&comp_param[i].mutex); - - qemu_thread_join(compress_threads + i); - qemu_mutex_destroy(&comp_param[i].mutex); - qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); - g_free(comp_param[i].originbuf); - qemu_fclose(comp_param[i].file); - comp_param[i].file = NULL; - } - qemu_mutex_destroy(&comp_done_lock); - qemu_cond_destroy(&comp_done_cond); - g_free(compress_threads); - g_free(comp_param); - compress_threads = NULL; - comp_param = NULL; -} - -static int compress_threads_save_setup(void) -{ - int i, thread_count; - - if (!migrate_use_compression()) { - return 0; - } - thread_count = migrate_compress_threads(); - compress_threads = g_new0(QemuThread, thread_count); - comp_param = g_new0(CompressParam, thread_count); - qemu_cond_init(&comp_done_cond); - qemu_mutex_init(&comp_done_lock); - for (i = 0; i < thread_count; i++) { - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE); - if (!comp_param[i].originbuf) { - goto exit; - } - - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { - g_free(comp_param[i].originbuf); - goto exit; - } - - /* comp_param[i].file is just used as a dummy buffer to save data, - * set its ops to empty. - */ - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops); - comp_param[i].done = true; - comp_param[i].quit = false; - qemu_mutex_init(&comp_param[i].mutex); - qemu_cond_init(&comp_param[i].cond); - qemu_thread_create(compress_threads + i, "compress", - do_data_compress, comp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; - -exit: - compress_threads_save_cleanup(); - return -1; -} - /* Multiple fd's */ #define MULTIFD_MAGIC 0x11223344U @@ -1909,12 +1766,25 @@ exit: return zero_page; } +struct CompressData { + /* filled by migration thread.*/ + RAMBlock *block; + ram_addr_t offset; + + /* filled by compress thread. */ + QEMUFile *file; + z_stream stream; + uint8_t *originbuf; + bool zero_page; +}; +typedef struct CompressData CompressData; + static void -update_compress_thread_counts(const CompressParam *param, int bytes_xmit) +update_compress_thread_counts(CompressData *cd, int bytes_xmit) { ram_counters.transferred += bytes_xmit; - if (param->zero_page) { + if (cd->zero_page) { ram_counters.duplicate++; return; } @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit) compression_counters.pages++; } +static int compress_thread_data_init(void *request) +{ + CompressData *cd = request; + + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE); + if (!cd->originbuf) { + return -1; + } + + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) { + g_free(cd->originbuf); + return -1; + } + + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_thread_data_fini(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); + g_free(cd->originbuf); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + + /* + * if compression fails, it will be indicated by + * migrate_get_current()->to_dst_file. + */ + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block, + cd->offset, cd->originbuf); +} + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + RAMState *rs = ram_state; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file); + update_compress_thread_counts(cd, bytes_xmit); +} + +static const ThreadedWorkqueueOps compress_ops = { + .thread_request_init = compress_thread_data_init, + .thread_request_uninit = compress_thread_data_fini, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, + .request_size = sizeof(CompressData), +}; + +static Threads *compress_threads; + static bool save_page_use_compression(RAMState *rs); static void flush_compressed_data(RAMState *rs) { - int idx, len, thread_count; - if (!save_page_use_compression(rs)) { return; } - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!comp_param[idx].done) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - } - } - qemu_mutex_unlock(&comp_done_lock); + threaded_workqueue_wait_for_requests(compress_threads); +} - for (idx = 0; idx < thread_count; idx++) { - qemu_mutex_lock(&comp_param[idx].mutex); - if (!comp_param[idx].quit) { - len = qemu_put_qemu_file(rs->f, comp_param[idx].file); - /* - * it's safe to fetch zero_page without holding comp_done_lock - * as there is no further request submitted to the thread, - * i.e, the thread should be waiting for a request at this point. - */ - update_compress_thread_counts(&comp_param[idx], len); - } - qemu_mutex_unlock(&comp_param[idx].mutex); +static void compress_threads_save_cleanup(void) +{ + if (!compress_threads) { + return; } + + threaded_workqueue_destroy(compress_threads); + compress_threads = NULL; } -static inline void set_compress_params(CompressParam *param, RAMBlock *block, - ram_addr_t offset) +static int compress_threads_save_setup(void) { - param->block = block; - param->offset = offset; + if (!migrate_use_compression()) { + return 0; + } + + compress_threads = threaded_workqueue_create("compress", + migrate_compress_threads(), + DEFAULT_THREAD_REQUEST_NR, &compress_ops); + return compress_threads ? 0 : -1; } static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block, ram_addr_t offset) { - int idx, thread_count, bytes_xmit = -1, pages = -1; + CompressData *cd; bool wait = migrate_compress_wait_thread(); - thread_count = migrate_compress_threads(); - qemu_mutex_lock(&comp_done_lock); retry: - for (idx = 0; idx < thread_count; idx++) { - if (comp_param[idx].done) { - comp_param[idx].done = false; - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file); - qemu_mutex_lock(&comp_param[idx].mutex); - set_compress_params(&comp_param[idx], block, offset); - qemu_cond_signal(&comp_param[idx].cond); - qemu_mutex_unlock(&comp_param[idx].mutex); - pages = 1; - update_compress_thread_counts(&comp_param[idx], bytes_xmit); - break; + cd = threaded_workqueue_get_request(compress_threads); + if (!cd) { + /* + * wait for the free thread if the user specifies + * 'compress-wait-thread', otherwise we will post + * the page out in the main thread as normal page. + */ + if (wait) { + cpu_relax(); + goto retry; } - } - /* - * wait for the free thread if the user specifies 'compress-wait-thread', - * otherwise we will post the page out in the main thread as normal page. - */ - if (pages < 0 && wait) { - qemu_cond_wait(&comp_done_cond, &comp_done_lock); - goto retry; - } - qemu_mutex_unlock(&comp_done_lock); - - return pages; + return -1; + } + cd->block = block; + cd->offset = offset; + threaded_workqueue_submit_request(compress_threads, cd); + return 1; } /** From patchwork Thu Nov 22 07:20:27 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693597 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 2490C5A4 for ; Thu, 22 Nov 2018 07:20:57 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 02F7F2CBBB for ; Thu, 22 Nov 2018 07:20:57 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id E97712CBD5; Thu, 22 Nov 2018 07:20:56 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 46FAD2CBBB for ; Thu, 22 Nov 2018 07:20:56 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392562AbeKVR7D (ORCPT ); Thu, 22 Nov 2018 12:59:03 -0500 Received: from mail-pl1-f193.google.com ([209.85.214.193]:37470 "EHLO mail-pl1-f193.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392528AbeKVR7C (ORCPT ); Thu, 22 Nov 2018 12:59:02 -0500 Received: by mail-pl1-f193.google.com with SMTP id b5so8845978plr.4 for ; Wed, 21 Nov 2018 23:20:54 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=61mMAJkubPYEEOer1T50gaBWivxzWtm0scPJxeVz4QI=; b=eL8T++jF0IgiWOr0/u3Ii28zGri7CNpzMQ9y78nqIFxmkLtWpWXUjnN3wu1zkJ5HVc DtOQtgHxGwDSAXNdRDTwkSBY/yz2Grz6Edoylv/hGP2fbQlDcAYJUkmg2f91AGTPdhMC Xsr6KdDZ5RFgdE0rlwDfDVADbiL/ahsh0HFXecTIkpNCzaopQEoVwEgqUDhTJ1At8kPU tWcp570OXqJd/eBqYjDiHyqLO6Gd4Uwd7dqNEWSb1TX9tEK318JTKJocZr4s7iunGzI6 6xxYv2PyVcBQtzX2e+I8tYXYfXeHPXJbsZoseyBYJc7D4wOJakkeRn7t6cdxSdnEO1I6 hl5A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=61mMAJkubPYEEOer1T50gaBWivxzWtm0scPJxeVz4QI=; b=qsTOni+WVcUEJHAtfsvaCOFOqtyBPKAe0SPyeGuNttB76xrfyTE7VqgeOmbpPna8TN vw6bHCCPCxLbGmDBMo1giPeAN4qPiWsF9NOrIZRkAtf0Tf2fqLKTa43gCTeuZcbya7zC /DDfxgKFxn/ops3GX6cYtRx0XNI3fHjFZS1Lye0k90WoQicsrE7eG/cmBFzsKuOajGCh /+h5NVOiwafswtLXj8oI2HCj0goCohuT41fz6dA9L7Tg2WPcbwiw+qL4dQL48GMO7sLU kx7DTRE5RJ6lpkhU4yF6YEbLzwpjwMbfj32bIXmZFC3cHq5MXGyVWrmjdaufzgqUIxQ9 96NQ== X-Gm-Message-State: AA+aEWas0BoMzkMEIoEVm9wBaNcF7RYTxPJz+HtIXNLasanGALqDPcjS EKfH6Z1tx8x6n9laBTsicAQ= X-Google-Smtp-Source: AFSGD/W1LAZVCQyr9XN2zhPxj8EVEvn4Ih3h55XtvesmXHaXKBa3+670V/N1Z2JNSwaoyVxOpSVfvQ== X-Received: by 2002:a17:902:43e4:: with SMTP id j91mr9857335pld.147.1542871254089; Wed, 21 Nov 2018 23:20:54 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.50 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:53 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 4/5] migration: use threaded workqueue for decompression Date: Thu, 22 Nov 2018 15:20:27 +0800 Message-Id: <20181122072028.22819-5-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong Adapt the compression code to the threaded workqueue Signed-off-by: Xiao Guangrong --- migration/ram.c | 222 ++++++++++++++++++++------------------------------------ 1 file changed, 77 insertions(+), 145 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index 254c08f27b..ccec59c35e 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -350,25 +350,9 @@ typedef struct PageSearchStatus PageSearchStatus; CompressionStats compression_counters; -struct DecompressParam { - bool done; - bool quit; - QemuMutex mutex; - QemuCond cond; - void *des; - uint8_t *compbuf; - int len; - z_stream stream; -}; -typedef struct DecompressParam DecompressParam; - static const QEMUFileOps empty_ops = { }; static QEMUFile *decomp_file; -static DecompressParam *decomp_param; -static QemuThread *decompress_threads; -static QemuMutex decomp_done_lock; -static QemuCond decomp_done_cond; /* Multiple fd's */ @@ -3399,6 +3383,7 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } + /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, @@ -3424,166 +3409,113 @@ qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, return stream->total_out; } -static void *do_data_decompress(void *opaque) -{ - DecompressParam *param = opaque; - unsigned long pagesize; - uint8_t *des; - int len, ret; - - qemu_mutex_lock(¶m->mutex); - while (!param->quit) { - if (param->des) { - des = param->des; - len = param->len; - param->des = 0; - qemu_mutex_unlock(¶m->mutex); - - pagesize = TARGET_PAGE_SIZE; - - ret = qemu_uncompress_data(¶m->stream, des, pagesize, - param->compbuf, len); - if (ret < 0 && migrate_get_current()->decompress_error_check) { - error_report("decompress data failed"); - qemu_file_set_error(decomp_file, ret); - } +struct DecompressData { + /* filled by migration thread.*/ + void *des; + uint8_t *compbuf; + size_t len; - qemu_mutex_lock(&decomp_done_lock); - param->done = true; - qemu_cond_signal(&decomp_done_cond); - qemu_mutex_unlock(&decomp_done_lock); + z_stream stream; +}; +typedef struct DecompressData DecompressData; - qemu_mutex_lock(¶m->mutex); - } else { - qemu_cond_wait(¶m->cond, ¶m->mutex); - } +static Threads *decompress_threads; + +static int decompress_thread_data_init(void *request) +{ + DecompressData *dd = request; + + if (inflateInit(&dd->stream) != Z_OK) { + return -1; } - qemu_mutex_unlock(¶m->mutex); - return NULL; + dd->compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + return 0; } -static int wait_for_decompress_done(void) +static void decompress_thread_data_fini(void *request) { - int idx, thread_count; + DecompressData *dd = request; - if (!migrate_use_compression()) { - return 0; - } + inflateEnd(&dd->stream); + g_free(dd->compbuf); +} - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - for (idx = 0; idx < thread_count; idx++) { - while (!decomp_param[idx].done) { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +static void decompress_thread_data_handler(void *request) +{ + DecompressData *dd = request; + unsigned long pagesize = TARGET_PAGE_SIZE; + int ret; + + ret = qemu_uncompress_data(&dd->stream, dd->des, pagesize, + dd->compbuf, dd->len); + if (ret < 0 && migrate_get_current()->decompress_error_check) { + error_report("decompress data failed"); + qemu_file_set_error(decomp_file, ret); } - qemu_mutex_unlock(&decomp_done_lock); - return qemu_file_get_error(decomp_file); } -static void compress_threads_load_cleanup(void) +static void decompress_thread_data_done(void *request) { - int i, thread_count; +} + +static const ThreadedWorkqueueOps decompress_ops = { + .thread_request_init = decompress_thread_data_init, + .thread_request_uninit = decompress_thread_data_fini, + .thread_request_handler = decompress_thread_data_handler, + .thread_request_done = decompress_thread_data_done, + .request_size = sizeof(DecompressData), +}; +static int decompress_init(QEMUFile *f) +{ if (!migrate_use_compression()) { - return; + return 0; } - thread_count = migrate_decompress_threads(); - for (i = 0; i < thread_count; i++) { - /* - * we use it as a indicator which shows if the thread is - * properly init'd or not - */ - if (!decomp_param[i].compbuf) { - break; - } - qemu_mutex_lock(&decomp_param[i].mutex); - decomp_param[i].quit = true; - qemu_cond_signal(&decomp_param[i].cond); - qemu_mutex_unlock(&decomp_param[i].mutex); - } - for (i = 0; i < thread_count; i++) { - if (!decomp_param[i].compbuf) { - break; - } + decomp_file = f; + decompress_threads = threaded_workqueue_create("decompress", + migrate_decompress_threads(), + DEFAULT_THREAD_REQUEST_NR, &decompress_ops); + return decompress_threads ? 0 : -1; +} - qemu_thread_join(decompress_threads + i); - qemu_mutex_destroy(&decomp_param[i].mutex); - qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); - g_free(decomp_param[i].compbuf); - decomp_param[i].compbuf = NULL; +static void decompress_fini(void) +{ + if (!decompress_threads) { + return; } - g_free(decompress_threads); - g_free(decomp_param); + + threaded_workqueue_destroy(decompress_threads); decompress_threads = NULL; - decomp_param = NULL; decomp_file = NULL; } -static int compress_threads_load_setup(QEMUFile *f) +static int flush_decompressed_data(void) { - int i, thread_count; - if (!migrate_use_compression()) { return 0; } - thread_count = migrate_decompress_threads(); - decompress_threads = g_new0(QemuThread, thread_count); - decomp_param = g_new0(DecompressParam, thread_count); - qemu_mutex_init(&decomp_done_lock); - qemu_cond_init(&decomp_done_cond); - decomp_file = f; - for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { - goto exit; - } - - decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); - qemu_mutex_init(&decomp_param[i].mutex); - qemu_cond_init(&decomp_param[i].cond); - decomp_param[i].done = true; - decomp_param[i].quit = false; - qemu_thread_create(decompress_threads + i, "decompress", - do_data_decompress, decomp_param + i, - QEMU_THREAD_JOINABLE); - } - return 0; -exit: - compress_threads_load_cleanup(); - return -1; + threaded_workqueue_wait_for_requests(decompress_threads); + return qemu_file_get_error(decomp_file); } static void decompress_data_with_multi_threads(QEMUFile *f, - void *host, int len) + void *host, size_t len) { - int idx, thread_count; + DecompressData *dd; - thread_count = migrate_decompress_threads(); - qemu_mutex_lock(&decomp_done_lock); - while (true) { - for (idx = 0; idx < thread_count; idx++) { - if (decomp_param[idx].done) { - decomp_param[idx].done = false; - qemu_mutex_lock(&decomp_param[idx].mutex); - qemu_get_buffer(f, decomp_param[idx].compbuf, len); - decomp_param[idx].des = host; - decomp_param[idx].len = len; - qemu_cond_signal(&decomp_param[idx].cond); - qemu_mutex_unlock(&decomp_param[idx].mutex); - break; - } - } - if (idx < thread_count) { - break; - } else { - qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); - } +retry: + dd = threaded_workqueue_get_request(decompress_threads); + if (!dd) { + goto retry; } - qemu_mutex_unlock(&decomp_done_lock); + + dd->des = host; + dd->len = len; + qemu_get_buffer(f, dd->compbuf, len); + threaded_workqueue_submit_request(decompress_threads, dd); } /* @@ -3678,7 +3610,7 @@ void colo_release_ram_cache(void) */ static int ram_load_setup(QEMUFile *f, void *opaque) { - if (compress_threads_load_setup(f)) { + if (decompress_init(f)) { return -1; } @@ -3699,7 +3631,7 @@ static int ram_load_cleanup(void *opaque) } xbzrle_load_cleanup(); - compress_threads_load_cleanup(); + decompress_fini(); RAMBLOCK_FOREACH_MIGRATABLE(rb) { g_free(rb->receivedmap); @@ -4101,7 +4033,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } } - ret |= wait_for_decompress_done(); + ret |= flush_decompressed_data(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); From patchwork Thu Nov 22 07:20:28 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Xiao Guangrong X-Patchwork-Id: 10693599 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 22E3315A7 for ; Thu, 22 Nov 2018 07:21:03 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 02AFC2CBD5 for ; Thu, 22 Nov 2018 07:21:03 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id EA5B32CBCB; Thu, 22 Nov 2018 07:21:02 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-8.0 required=2.0 tests=BAYES_00,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,FREEMAIL_FROM,MAILING_LIST_MULTI,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 358F62CBCB for ; Thu, 22 Nov 2018 07:21:02 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2392581AbeKVR7J (ORCPT ); Thu, 22 Nov 2018 12:59:09 -0500 Received: from mail-pl1-f195.google.com ([209.85.214.195]:46017 "EHLO mail-pl1-f195.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S2392576AbeKVR7G (ORCPT ); Thu, 22 Nov 2018 12:59:06 -0500 Received: by mail-pl1-f195.google.com with SMTP id a14so8820418plm.12 for ; Wed, 21 Nov 2018 23:20:58 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:to:cc:subject:date:message-id:in-reply-to:references; bh=wbPjx05ddO3emTOc85i0PBVC/qKf1knWe1gds+ynZDw=; b=VUqc7Zpn1rqjbGpPkPJnnm7vURgq2VoT1R1LLWyWBW+U8qn5/MRB8iRGSPjsYVWkht rw9Xxm8hgRDU40IlA/PAshVmenkXpx699Ogj4GI3u1iP8+np56fDtfCC4sRLUBNJRtXv Qhk70DKxrs5MLPT4GyL6NRwxkrRddoAR4WZ0J0kB7I+XoDXikydOA0X5cIRGsP2n7X4W Gy2cJH37L2G5reMWIzT5JdHwroylQHMx/nJfWHuuw20MKm3wzv+PGjmhG0CXKNDbFv3w f8XCU2q0PVTIDprKrxe5FHd+nnPGH2uU5C6TC0XCvzH8qhdAXVbqniwQ2Taj8HgZ0Ktf s0aw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references; bh=wbPjx05ddO3emTOc85i0PBVC/qKf1knWe1gds+ynZDw=; b=gCyO9Qj7XIhxy0Md27mhIyZgY1emwoi6j6sh0wyoxB7Fz2BWSrMqW6+cgmLuEojCHS EJAlQAHcbORgShl3nYWercxx3zVDTI0fUo1dJcG8pCrYPC2nFrsnyW5d7l3PgWbjRwbE SCFWl5yMbS3YIRM3iv4uBO8WvGAHuqI8bIwB2KdxrWxwgNsbBcF02fG7Qe4c+teURniN QP92yQBw6FkkmyMBwDQmwpTJbrMN1mbcpiIxB44I895+bkyS9B7dNaq2BIIUUQbRbse+ U5wkTkfmvso+VLR4r/4y2md+aDmG47Jy59Yhy+fsdPgNv7anZViGNeANfUzSq5B9UcY2 gDpQ== X-Gm-Message-State: AA+aEWaIe+/ZzIlzD3WxlFaIh7PMHw5AtlPMfK6MZGgZOHSQOK8VfFI/ eeFB08fsaFKT0K/BzcVZm3A= X-Google-Smtp-Source: AFSGD/XeJwyf+9wSMKTgdgd4HE2YPTpMeWIQZQ7zLysRNQhIJ283dKDS6Z6Xt+lv2BJ6KupbF9V7Tg== X-Received: by 2002:a17:902:8214:: with SMTP id x20-v6mr9719409pln.224.1542871258257; Wed, 21 Nov 2018 23:20:58 -0800 (PST) Received: from localhost.localdomain ([203.205.141.36]) by smtp.gmail.com with ESMTPSA id 19sm52731312pfs.108.2018.11.21.23.20.54 (version=TLS1_2 cipher=ECDHE-RSA-CHACHA20-POLY1305 bits=256/256); Wed, 21 Nov 2018 23:20:57 -0800 (PST) From: guangrong.xiao@gmail.com X-Google-Original-From: xiaoguangrong@tencent.com To: pbonzini@redhat.com, mst@redhat.com, mtosatti@redhat.com Cc: qemu-devel@nongnu.org, kvm@vger.kernel.org, dgilbert@redhat.com, peterx@redhat.com, wei.w.wang@intel.com, jiang.biao2@zte.com.cn, eblake@redhat.com, quintela@redhat.com, cota@braap.org, Xiao Guangrong Subject: [PATCH v3 5/5] tests: add threaded-workqueue-bench Date: Thu, 22 Nov 2018 15:20:28 +0800 Message-Id: <20181122072028.22819-6-xiaoguangrong@tencent.com> X-Mailer: git-send-email 2.14.5 In-Reply-To: <20181122072028.22819-1-xiaoguangrong@tencent.com> References: <20181122072028.22819-1-xiaoguangrong@tencent.com> Sender: kvm-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: kvm@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP From: Xiao Guangrong It's the benhcmark of threaded-workqueue, also it's a good example to show how threaded-workqueue is used Signed-off-by: Xiao Guangrong --- tests/Makefile.include | 5 +- tests/threaded-workqueue-bench.c | 255 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 tests/threaded-workqueue-bench.c diff --git a/tests/Makefile.include b/tests/Makefile.include index 613242bc6e..05ad27e75d 100644 --- a/tests/Makefile.include +++ b/tests/Makefile.include @@ -500,7 +500,8 @@ test-obj-y = tests/check-qnum.o tests/check-qstring.o tests/check-qdict.o \ tests/test-rcu-tailq.o \ tests/test-qdist.o tests/test-shift128.o \ tests/test-qht.o tests/qht-bench.o tests/test-qht-par.o \ - tests/atomic_add-bench.o tests/atomic64-bench.o + tests/atomic_add-bench.o tests/atomic64-bench.o \ + tests/threaded-workqueue-bench.o $(test-obj-y): QEMU_INCLUDES += -Itests QEMU_CFLAGS += -I$(SRC_PATH)/tests @@ -557,6 +558,8 @@ tests/qht-bench$(EXESUF): tests/qht-bench.o $(test-util-obj-y) tests/test-bufferiszero$(EXESUF): tests/test-bufferiszero.o $(test-util-obj-y) tests/atomic_add-bench$(EXESUF): tests/atomic_add-bench.o $(test-util-obj-y) tests/atomic64-bench$(EXESUF): tests/atomic64-bench.o $(test-util-obj-y) +tests/threaded-workqueue-bench$(EXESUF): tests/threaded-workqueue-bench.o migration/qemu-file.o \ + $(test-util-obj-y) tests/fp/%: $(MAKE) -C $(dir $@) $(notdir $@) diff --git a/tests/threaded-workqueue-bench.c b/tests/threaded-workqueue-bench.c new file mode 100644 index 0000000000..0d04948ed3 --- /dev/null +++ b/tests/threaded-workqueue-bench.c @@ -0,0 +1,255 @@ +/* + * Threaded Workqueue Benchmark + * + * Author: + * Xiao Guangrong + * + * Copyright(C) 2018 Tencent Corporation. + * + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later. + * See the COPYING.LIB file in the top-level directory. + */ +#include + +#include "qemu/osdep.h" +#include "exec/cpu-common.h" +#include "qemu/error-report.h" +#include "migration/qemu-file.h" +#include "qemu/threaded-workqueue.h" + +#define PAGE_SHIFT 12 +#define PAGE_SIZE (1 << PAGE_SHIFT) +#define DEFAULT_THREAD_NR 2 +#define DEFAULT_MEM_SIZE 1 +#define DEFAULT_REPEATED_COUNT 3 + +static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, + int64_t pos) +{ + int i, size = 0; + + for (i = 0; i < iovcnt; i++) { + size += iov[i].iov_len; + } + return size; +} + +static int test_fclose(void *opaque) +{ + return 0; +} + +static const QEMUFileOps test_write_ops = { + .writev_buffer = test_writev_buffer, + .close = test_fclose +}; + +static QEMUFile *dest_file; + +static const QEMUFileOps empty_ops = { }; + +struct CompressData { + uint8_t *ram_addr; + QEMUFile *file; + z_stream stream; +}; +typedef struct CompressData CompressData; + +static int compress_request_init(void *request) +{ + CompressData *cd = request; + + if (deflateInit(&cd->stream, 1) != Z_OK) { + return -1; + } + cd->file = qemu_fopen_ops(NULL, &empty_ops); + return 0; +} + +static void compress_request_uninit(void *request) +{ + CompressData *cd = request; + + qemu_fclose(cd->file); + deflateEnd(&cd->stream); +} + +static void compress_thread_data_handler(void *request) +{ + CompressData *cd = request; + int blen; + + blen = qemu_put_compression_data(cd->file, &cd->stream, cd->ram_addr, + PAGE_SIZE); + if (blen < 0) { + error_report("compressed data failed!"); + qemu_file_set_error(dest_file, blen); + } +} + +struct CompressStats { + unsigned long pages; + unsigned long compressed_size; +}; +typedef struct CompressStats CompressStats; + +static CompressStats comp_stats; + +static void compress_thread_data_done(void *request) +{ + CompressData *cd = request; + int bytes_xmit; + + bytes_xmit = qemu_put_qemu_file(dest_file, cd->file); + + comp_stats.pages++; + comp_stats.compressed_size += bytes_xmit; +} + +static const ThreadedWorkqueueOps ops = { + .thread_request_init = compress_request_init, + .thread_request_uninit = compress_request_uninit, + .thread_request_handler = compress_thread_data_handler, + .thread_request_done = compress_thread_data_done, + .request_size = sizeof(CompressData), +}; + +static void compress_threads_save_cleanup(Threads *threads) +{ + threaded_workqueue_destroy(threads); + qemu_fclose(dest_file); +} + +static Threads *compress_threads_save_setup(int threads_nr, int requests_nr) +{ + Threads *compress_threads; + + dest_file = qemu_fopen_ops(NULL, &test_write_ops); + compress_threads = threaded_workqueue_create("compress", threads_nr, + requests_nr, &ops); + assert(compress_threads); + return compress_threads; +} + +static void compress_page_with_multi_thread(Threads *threads, uint8_t *addr) +{ + CompressData *cd; + +retry: + cd = threaded_workqueue_get_request(threads); + if (!cd) { + goto retry; + } + + cd->ram_addr = addr; + threaded_workqueue_submit_request(threads, cd); +} + +static void run(Threads *threads, uint8_t *mem, unsigned long mem_size, + int repeated_count) +{ + uint8_t *ptr = mem, *end = mem + mem_size; + uint64_t start_ts, spend, total_ts = 0, pages = mem_size >> PAGE_SHIFT; + double rate; + int i; + + for (i = 0; i < repeated_count; i++) { + ptr = mem; + memset(&comp_stats, 0, sizeof(comp_stats)); + + start_ts = g_get_monotonic_time(); + for (ptr = mem; ptr < end; ptr += PAGE_SIZE) { + *ptr = 0x10; + compress_page_with_multi_thread(threads, ptr); + } + threaded_workqueue_wait_for_requests(threads); + spend = g_get_monotonic_time() - start_ts; + total_ts += spend; + + if (comp_stats.pages != pages) { + printf("ERROR: pages are compressed %ld, expect %ld.\n", + comp_stats.pages, pages); + exit(-1); + } + + rate = (double)(comp_stats.pages * PAGE_SIZE) / + comp_stats.compressed_size; + printf("RUN %d: Request # %ld Cost %ld, Compression Rate %f.\n", i, + comp_stats.pages, spend, rate); + } + + printf("AVG: Time Cost %ld\n", total_ts / repeated_count); + printf("AVG Throughput: %f GB/s\n", + (double)(mem_size >> 30) * repeated_count * 1e6 / total_ts); +} + +static void usage(const char *arg0) +{ + printf("\nThreaded Workqueue Benchmark.\n"); + printf("Usage:\n"); + printf(" %s [OPTIONS]\n", arg0); + printf("Options:\n"); + printf(" -t the number of threads (default %d).\n", + DEFAULT_THREAD_NR); + printf(" -r: the number of requests handled by each thread (default %d).\n", + DEFAULT_THREAD_REQUEST_NR); + printf(" -m: the size of the memory (G) used to test (default %dG).\n", + DEFAULT_MEM_SIZE); + printf(" -c: the repeated count (default %d).\n", + DEFAULT_REPEATED_COUNT); + printf(" -h show this help info.\n"); +} + +int main(int argc, char *argv[]) +{ + int c, threads_nr, requests_nr, repeated_count; + unsigned long mem_size; + uint8_t *mem; + Threads *threads; + + threads_nr = DEFAULT_THREAD_NR; + requests_nr = DEFAULT_THREAD_REQUEST_NR; + mem_size = DEFAULT_MEM_SIZE; + repeated_count = DEFAULT_REPEATED_COUNT; + + for (;;) { + c = getopt(argc, argv, "t:r:m:c:h"); + if (c < 0) { + break; + } + + switch (c) { + case 't': + threads_nr = atoi(optarg); + break; + case 'r': + requests_nr = atoi(optarg); + break; + case 'm': + mem_size = atol(optarg); + break; + case 'c': + repeated_count = atoi(optarg); + break; + default: + printf("Unkown option: %c.\n", c); + case 'h': + usage(argv[0]); + return -1; + } + } + + printf("Run the benchmark: threads %d requests-per-thread: %d memory %ldG repeat %d.\n", + threads_nr, requests_nr, mem_size, repeated_count); + + mem_size = mem_size << 30; + mem = qemu_memalign(PAGE_SIZE, mem_size); + memset(mem, 0, mem_size); + + threads = compress_threads_save_setup(threads_nr, requests_nr); + run(threads, mem, mem_size, repeated_count); + compress_threads_save_cleanup(threads); + + qemu_vfree(mem); + return 0; +}