diff mbox series

[v2,2/5] util: introduce threaded workqueue

Message ID 20181106122025.3487-3-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show
Series migration: improve multithreads | expand

Commit Message

Xiao Guangrong Nov. 6, 2018, 12:20 p.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

This modules implements the lockless and efficient threaded workqueue.

Three abstracted objects are used in this module:
- Request.
     It not only contains the data that the workqueue fetches out
    to finish the request but also offers the space to save the result
    after the workqueue handles the request.

    It's flowed between user and workqueue. The user fills the request
    data into it when it is owned by user. After it is submitted to the
    workqueue, the workqueue fetched data out and save the result into
    it after the request is handled.

    All the requests are pre-allocated and carefully partitioned between
    threads so there is no contention on the request, that make threads
    be parallel as much as possible.

  - User, i.e, the submitter
    It's the one fills the request and submits it to the workqueue,
    the result will be collected after it is handled by the work queue.

    The user can consecutively submit requests without waiting the previous
    requests been handled.
    It only supports one submitter, you should do serial submission by
    yourself if you want more, e.g, use lock on you side.

  - Workqueue, i.e, thread
    Each workqueue is represented by a running thread that fetches
    the request submitted by the user, do the specified work and save
    the result to the request.

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/threaded-workqueue.h |  94 ++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 466 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 561 insertions(+)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 util/threaded-workqueue.c

Comments

Emilio Cota Nov. 13, 2018, 6:38 p.m. UTC | #1
On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> This modules implements the lockless and efficient threaded workqueue.
(snip)
> +++ b/util/threaded-workqueue.c
> +struct Threads {
> +    /*
> +     * in order to avoid contention, the @requests is partitioned to
> +     * @threads_nr pieces, each thread exclusively handles
> +     * @thread_request_nr requests in the array.
> +     */
> +    void *requests;
(snip)
> +    /*
> +     * the bit in these two bitmaps indicates the index of the @requests
> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    unsigned long *request_fill_bitmap;
> +    /* after handles the request, the thread flips the bit. */
> +    unsigned long *request_done_bitmap;
(snip)
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
(snip)
> +    QemuEvent ev;
(snip)
> +};

The fields I'm showing above are all shared by all worker threads.
This can lead to unnecessary contention. For example:
- Adjacent requests can share the same cache line, which might be
  written to by different worker threads (when setting request->done)

- The request_done bitmap is written to by worker threads every time
  a job completes. At high core counts with low numbers of job slots,
  this can result in high contention. For example, imagine we have
  16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
  much less than a cache line. Whenever a job completes, the cache line
  will be atomically updated by one of the 16 threads.

- The completion event (Threads.ev above) is written to by every thread.
  Again, this can result in contention at large core counts.

An orthogonal issue is the round-robin policy. This can give us fairness,
in that we guarantee that all workers get a similar number of jobs.
But giving one job at a time to each worker is suboptimal when the job
sizes are small-ish, because it misses out on the benefits of batching,
which amortize the cost of communication.
Given that the number of jobs that we have (at least in the benchmark)
are small, filling up a worker's queue before moving on to the next
can yield a significant speedup at high core counts.

I implemented the above on top of your series. The results are as follows:

                                         threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
                                              Host: AMD Opteron(tm) Processor 6376
                                          Thread pinning: #N+1 cores, same-socket first

         12 +-------------------------------------------------------------------------------------------------------+
            |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
            |                                     $                                                  before ***B*** |
         10 |-+                                  $$                                               +batching ###D###-|
            |                                    $$                                       +per-thread-state $$$A$$$ |
            |                                    $$  A        A                                                     |
            |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
          8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
            |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
            |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
          6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
            |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
            |           $                   B                                                        D              |
            |         $A                     **BB     B                                                             |
          4 |-+      A#                      B   *    **                                                          +-|
            |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
            |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
          2 |-+   A                                                                    B           B              +-|
            |     $                                                                                                 |
            |    A                                                                                                  |
            |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
          0 +-------------------------------------------------------------------------------------------------------+
                 1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
                                                             Threads
  png: https://imgur.com/Aj4yfGO

Note: "Threads" in the X axis means "worker threads".

Batching achieves higher performance at high core counts (>8),
since worker threads go through fewer sleep+wake cycles while
waiting for jobs. Performance however diminishes as more threads
are used (>=24) due to cache line contention.

Contention can be avoided by partitioning the request array, bitmaps
and completion event to be entirely local to worker threads
("+per-thread-state"). This avoids the performance decrease at >=24
threads that we observe with batching alone. Note that the overall
speedup does not go beyond ~8; this is explained by the fact that we
use a single requester thread. Thus, as we add more worker threads,
they become increasingly less busy, yet throughput remains roughly
constant. I say roughly because there's quite a bit of noise--this
is a 4-socket machine and I'm letting the scheduler move threads
around, although I'm limiting the cores that can be used with
taskset to maximize locality (this means that for 15 threads we're
using 16 host cores that are all in the same socket; note that
the additional thread is the requester one).

I have pushed the above changes, along with some minor fixes (e.g.
removing the Threads.name field) here:

  https://github.com/cota/qemu/tree/xiao

Note that the 0-len variable goes away, and that Threads become
read-only. I also use padding to make sure the events are in
separate cache lines.

Feel free to incorporate whatever you see fit from that branch into
a subsequent iteraiton.

I have also some minor comments, but we can postpone those for later.
There are some issues I'd like you to consider now, however:

- Make sure all bitmap ops are using atomic_set/read. Add additional
  helpers if necessary.

- Constify everywhere the Ops struct.

- Consider just registering a size_t instead of a function to get the
  job size from the Ops struct.

And then a possible optimization for the actual use case you have:

- Consider using a system-specific number of threads (determined at
  run-time) for compression/decompression. For example, if the host
  system has a single core, there's no point in spawning more than a single
  thread. If the host system has 32 cores, you're probably leaving performance 
  on the table if you just use the default. 
  Ideally determining this number would also take into account the
  size of each job, which should also determine the number of
  job slots per worker thread.

Thanks,

		Emilio
Xiao Guangrong Nov. 20, 2018, 10:25 a.m. UTC | #2
On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> This modules implements the lockless and efficient threaded workqueue.
> (snip)
>> +++ b/util/threaded-workqueue.c
>> +struct Threads {
>> +    /*
>> +     * in order to avoid contention, the @requests is partitioned to
>> +     * @threads_nr pieces, each thread exclusively handles
>> +     * @thread_request_nr requests in the array.
>> +     */
>> +    void *requests;
> (snip)
>> +    /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    unsigned long *request_fill_bitmap;
>> +    /* after handles the request, the thread flips the bit. */
>> +    unsigned long *request_done_bitmap;
> (snip)
>> +    /* the request is pushed to the thread with round-robin manner */
>> +    unsigned int current_thread_index;
> (snip)
>> +    QemuEvent ev;
> (snip)
>> +};
> 
> The fields I'm showing above are all shared by all worker threads.
> This can lead to unnecessary contention. For example:
> - Adjacent requests can share the same cache line, which might be
>    written to by different worker threads (when setting request->done)
> 
> - The request_done bitmap is written to by worker threads every time
>    a job completes. At high core counts with low numbers of job slots,
>    this can result in high contention. For example, imagine we have
>    16 threads with 4 jobs each. This only requires 64 bits == 8 bytes, i.e.
>    much less than a cache line. Whenever a job completes, the cache line
>    will be atomically updated by one of the 16 threads.
> 
> - The completion event (Threads.ev above) is written to by every thread.
>    Again, this can result in contention at large core counts.
> 
> An orthogonal issue is the round-robin policy. This can give us fairness,
> in that we guarantee that all workers get a similar number of jobs.
> But giving one job at a time to each worker is suboptimal when the job
> sizes are small-ish, because it misses out on the benefits of batching,
> which amortize the cost of communication.
> Given that the number of jobs that we have (at least in the benchmark)
> are small, filling up a worker's queue before moving on to the next
> can yield a significant speedup at high core counts.
> 
> I implemented the above on top of your series. The results are as follows:
> 
>                                           threaded-workqueue-bench -r 4 -m 2 -c 20 -t #N
>                                                Host: AMD Opteron(tm) Processor 6376
>                                            Thread pinning: #N+1 cores, same-socket first
> 
>           12 +-------------------------------------------------------------------------------------------------------+
>              |    +   +     +     +     +     +    A+     +     +     +     +     +     +     +     +     +     +    |
>              |                                     $                                                  before ***B*** |
>           10 |-+                                  $$                                               +batching ###D###-|
>              |                                    $$                                       +per-thread-state $$$A$$$ |
>              |                                    $$  A        A                                                     |
>              |                     $AD     D$A $A $ $ $A  A   $$               A        A  A$       A        A$ A    |
>            8 |-+               D$AA  A# D$AA# A  $#D$$  $ $$ A  $   $A $A      $$ A$ A$A $ $ AA   $A $  $A   $ A   +-|
>              |                AA  B* B$DA D  DD# A #$$   A  A   $$AA  A  A$A  $  A  A     A    $ A    AA  A$A        |
>              |               $DB*B  B $ $ BB    D   $$  #D #D   A           A$A                 A                    |
>            6 |-+          $AA*B       *A *  *       $# D  D  D#  #D #D   D#    D#DD#D   D# D#  # ##D      D#       +-|
>              |           A             BB   *       A D        DD  D  D#D  DD#D      D#D  D  DD  D  D# D#D  DD#DD    |
>              |           $                   B                                                        D              |
>              |         $A                     **BB     B                                                             |
>            4 |-+      A#                      B   *    **                                                          +-|
>              |        $                            B *B  BB* B*                    *BB*B   B*BB*BB*B  B *BB* B*BB    |
>              |      $A                              B       B  BB*BB*BB*BB*BB*BB*BB     **B         ** B    B        |
>            2 |-+   A                                                                    B           B              +-|
>              |     $                                                                                                 |
>              |    A                                                                                                  |
>              |    +   +     +     +     +     +     +     +     +     +     +     +     +     +     +     +     +    |
>            0 +-------------------------------------------------------------------------------------------------------+
>                   1   4     8     12    16    20    24    28    32    36    40    44    48    52    56    60    64
>                                                               Threads
>    png: https://imgur.com/Aj4yfGO
> 
> Note: "Threads" in the X axis means "worker threads".
> 
> Batching achieves higher performance at high core counts (>8),
> since worker threads go through fewer sleep+wake cycles while
> waiting for jobs. Performance however diminishes as more threads
> are used (>=24) due to cache line contention.
> 
> Contention can be avoided by partitioning the request array, bitmaps
> and completion event to be entirely local to worker threads
> ("+per-thread-state"). This avoids the performance decrease at >=24
> threads that we observe with batching alone. Note that the overall
> speedup does not go beyond ~8; this is explained by the fact that we
> use a single requester thread. Thus, as we add more worker threads,
> they become increasingly less busy, yet throughput remains roughly
> constant. I say roughly because there's quite a bit of noise--this
> is a 4-socket machine and I'm letting the scheduler move threads
> around, although I'm limiting the cores that can be used with
> taskset to maximize locality (this means that for 15 threads we're
> using 16 host cores that are all in the same socket; note that
> the additional thread is the requester one).

Hmm... I have carefully written the stuff step by step:
1. separate @requests from threads to each single thread
2. separate @completion from threads
3. use batch mode
4. separate bitmaps from threads
5. revert batch mode based on step 4
6. compare them with directly using Emilio's patches.

The big different between my modification and Emilio's patches is
that i still make per-thread-data be attached in the end of
@Threads.

I got these performance data:
	https://ibb.co/kcLnLL

Indeed, i can almost reproduce your conclusion. The confused part
is batch -  I used 16G memory to do the benchmark, the batch improved
nothing, I guess all the threads keep busy under this case anyway.


> 
> I have pushed the above changes, along with some minor fixes (e.g.
> removing the Threads.name field) here:
> 
>    https://github.com/cota/qemu/tree/xiao
> 
> Note that the 0-len variable goes away, and that Threads become
> read-only. I also use padding to make sure the events are in
> separate cache lines.
> 
> Feel free to incorporate whatever you see fit from that branch into
> a subsequent iteraiton.

Nice, I will add you to the author list if you do not object. :)

> 
> I have also some minor comments, but we can postpone those for later.
> There are some issues I'd like you to consider now, however:
> 
> - Make sure all bitmap ops are using atomic_set/read. Add additional
>    helpers if necessary.
> 

Good to me. And we have moved the requests to each thread, that is
requests-per-thread, i think 64 is big enough, so i am planning to
limit it to the max of 64 and use u64 as bitmap.

> - Constify everywhere the Ops struct.

Good suggestion.

> 
> - Consider just registering a size_t instead of a function to get the
>    job size from the Ops struct.
> 

Okay, that's good to me.

> And then a possible optimization for the actual use case you have:
> 
> - Consider using a system-specific number of threads (determined at
>    run-time) for compression/decompression. For example, if the host
>    system has a single core, there's no point in spawning more than a single
>    thread. If the host system has 32 cores, you're probably leaving performance
>    on the table if you just use the default.
>    Ideally determining this number would also take into account the
>    size of each job, which should also determine the number of
>    job slots per worker thread.
> 

It can not work well... It depends on the CPU usage rather than CPU number.
Furthermore, we developed adaptive migration that will dynimically adjust
thread number based on the resource usage. more detailed please see:
   https://kvmforum2018.sched.com/event/FzuU/adaptive-live-migration-xiao-guangrong-yulei-zhang-tencent-cloud

Thanks!
Emilio Cota Nov. 20, 2018, 4:33 p.m. UTC | #3
On Tue, Nov 20, 2018 at 18:25:25 +0800, Xiao Guangrong wrote:
> On 11/14/18 2:38 AM, Emilio G. Cota wrote:
> > On Tue, Nov 06, 2018 at 20:20:22 +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
(snip)
> > Batching achieves higher performance at high core counts (>8),
> > since worker threads go through fewer sleep+wake cycles while
> > waiting for jobs. Performance however diminishes as more threads
> > are used (>=24) due to cache line contention.
> > 
> > Contention can be avoided by partitioning the request array, bitmaps
> > and completion event to be entirely local to worker threads
> > ("+per-thread-state"). This avoids the performance decrease at >=24
> > threads that we observe with batching alone. Note that the overall
> > speedup does not go beyond ~8; this is explained by the fact that we
> > use a single requester thread. Thus, as we add more worker threads,
> > they become increasingly less busy, yet throughput remains roughly
> > constant. I say roughly because there's quite a bit of noise--this
> > is a 4-socket machine and I'm letting the scheduler move threads
> > around, although I'm limiting the cores that can be used with
> > taskset to maximize locality (this means that for 15 threads we're
> > using 16 host cores that are all in the same socket; note that
> > the additional thread is the requester one).
> 
> Hmm... I have carefully written the stuff step by step:
> 1. separate @requests from threads to each single thread
> 2. separate @completion from threads
> 3. use batch mode
> 4. separate bitmaps from threads
> 5. revert batch mode based on step 4
> 6. compare them with directly using Emilio's patches.
> 
> The big different between my modification and Emilio's patches is
> that i still make per-thread-data be attached in the end of
> @Threads.
>
> I got these performance data:
> 	https://ibb.co/kcLnLL
> 
> Indeed, i can almost reproduce your conclusion. The confused part
> is batch -  I used 16G memory to do the benchmark, the batch improved
> nothing, I guess all the threads keep busy under this case anyway.

I wouldn't worry too much about it, then. The machine I ran my tests
on is rather old, so it's possible that cross-core/socket communication
is much slower there than on your machine, which makes batching
more attractive. At the end of the day, the measurements you take on
the machines you care about are what matter =)

> > I have pushed the above changes, along with some minor fixes (e.g.
> > removing the Threads.name field) here:
> > 
> >    https://github.com/cota/qemu/tree/xiao
> > 
> > Note that the 0-len variable goes away, and that Threads become
> > read-only. I also use padding to make sure the events are in
> > separate cache lines.
> > 
> > Feel free to incorporate whatever you see fit from that branch into
> > a subsequent iteraiton.
> 
> Nice, I will add you to the author list if you do not object. :)

That's not necessary. You deserve the full credit -- I just reviewed
the code ;-)

Thanks,

		Emilio
diff mbox series

Patch

diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
new file mode 100644
index 0000000000..d7eb66c8d2
--- /dev/null
+++ b/include/qemu/threaded-workqueue.h
@@ -0,0 +1,94 @@ 
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_THREADED_WORKQUEUE_H
+#define QEMU_THREADED_WORKQUEUE_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+
+/*
+ * This modules implements the lockless and efficient threaded workqueue.
+ *
+ * Three abstracted objects are used in this module:
+ * - Request.
+ *   It not only contains the data that the workqueue fetches out
+ *   to finish the request but also offers the space to save the result
+ *   after the workqueue handles the request.
+ *
+ *   It's flowed between user and workqueue. The user fills the request
+ *   data into it when it is owned by user. After it is submitted to the
+ *   workqueue, the workqueue fetched data out and save the result into
+ *   it after the request is handled.
+ *
+ *   All the requests are pre-allocated and carefully partitioned between
+ *   threads so there is no contention on the request, that make threads
+ *   be parallel as much as possible.
+ *
+ * - User, i.e, the submitter
+ *   It's the one fills the request and submits it to the workqueue,
+ *   the result will be collected after it is handled by the work queue.
+ *
+ *   The user can consecutively submit requests without waiting the previous
+ *   requests been handled.
+ *   It only supports one submitter, you should do serial submission by
+ *   yourself if you want more, e.g, use lock on you side.
+ *
+ * - Workqueue, i.e, thread
+ *   Each workqueue is represented by a running thread that fetches
+ *   the request submitted by the user, do the specified work and save
+ *   the result to the request.
+ */
+
+typedef struct Threads Threads;
+
+struct ThreadedWorkqueueOps {
+    /* return the size of each request */
+    int (*thread_get_request_size)(void);
+
+    /* constructor of the request */
+    int (*thread_request_init)(void *request);
+    /*  destructor of the request */
+    void (*thread_request_uninit)(void *request);
+
+    /* the handler of the request that is called by the thread */
+    void (*thread_request_handler)(void *request);
+    /* called by the user after the request has been handled */
+    void (*thread_request_done)(void *request);
+};
+typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
+
+/* the default number of requests that thread need handle */
+#define DEFAULT_THREAD_REQUEST_NR 4
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   int thread_request_nr,
+                                   ThreadedWorkqueueOps *ops);
+
+void threaded_workqueue_destroy(Threads *threads);
+
+/*
+ * find a free request where the user can store the data that is needed to
+ * finish the request
+ *
+ * If all requests are used up, return NULL
+ */
+void *threaded_workqueue_get_request(Threads *threads);
+/* submit the request and notify the thread */
+void threaded_workqueue_submit_request(Threads *threads, void *request);
+
+/*
+ * wait all threads to complete the request to make sure there is no
+ * previous request exists.
+ */
+void threaded_workqueue_wait_for_requests(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..f26dfe5182 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@  util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += threaded-workqueue.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
new file mode 100644
index 0000000000..966479631a
--- /dev/null
+++ b/util/threaded-workqueue.c
@@ -0,0 +1,466 @@ 
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/bitmap.h"
+#include "qemu/threaded-workqueue.h"
+
+#define SMP_CACHE_BYTES 64
+#define BITS_ALIGNED_TO_CACHE(_bits_)   \
+    QEMU_ALIGN_UP(_bits_, SMP_CACHE_BYTES * BITS_PER_BYTE)
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it is the header of user-defined data.
+ *
+ * It should be aligned to the nature size of CPU.
+ */
+struct ThreadRequest {
+    /*
+     * the request has been handled by the thread and need the user
+     * to fetch result out.
+     */
+    bool done;
+    /*
+     * the index to Threads::requests.
+     * Save it to the padding space although it can be calculated at runtime.
+     */
+    int index;
+};
+typedef struct ThreadRequest ThreadRequest;
+
+struct ThreadLocal {
+    struct Threads *threads;
+
+    /*
+     * the request region in Threads::requests that the thread
+     * need handle
+     */
+    int start_request_index;
+    int end_request_index;
+
+    /*
+     * the interim bitmap used by the thread to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+
+    QemuThread thread;
+
+    /* the event used to wake up the thread */
+    QemuEvent ev;
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    /*
+     * in order to avoid contention, the @requests is partitioned to
+     * @threads_nr pieces, each thread exclusively handles
+     * @thread_request_nr requests in the array.
+     */
+    void *requests;
+
+    /*
+     * the bit in these two bitmaps indicates the index of the @requests
+     * respectively. If it's the same, the corresponding request is free
+     * and owned by the user, i.e, where the user fills a request. Otherwise,
+     * it is valid and owned by the thread, i.e, where the thread fetches
+     * the request and write the result.
+     */
+
+    /* after the user fills the request, the bit is flipped. */
+    unsigned long *request_fill_bitmap;
+    /* after handles the request, the thread flips the bit. */
+    unsigned long *request_done_bitmap;
+
+    /*
+     * the interim bitmap used by the user to avoid frequent
+     * memory allocation
+     */
+    unsigned long *result_bitmap;
+
+    /* the request header, ThreadRequest, is contained */
+    unsigned int request_size;
+
+    /* the number of requests that each thread need handle */
+    unsigned int thread_request_nr;
+    unsigned int total_requests;
+
+    unsigned int threads_nr;
+
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    ThreadedWorkqueueOps *ops;
+
+    const char *name;
+    QemuEvent ev;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static ThreadRequest *index_to_request(Threads *threads, int request_index)
+{
+    ThreadRequest *request;
+
+    request = threads->requests + request_index * threads->request_size;
+
+    assert(request->index == request_index);
+    return request;
+}
+
+static int request_to_index(ThreadRequest *request)
+{
+    return request->index;
+}
+
+static int thread_to_first_request_index(Threads *threads, int thread_id)
+{
+    thread_id %= threads->threads_nr;
+    return thread_id * threads->thread_request_nr;
+}
+
+static int request_index_to_thread(Threads *threads, int request_index)
+{
+    return request_index / threads->thread_request_nr;
+}
+
+/*
+ * free request: the request is not used by any thread, however, it might
+ *   contian the result need the user to call thread_request_done()
+ *
+ * valid request: the request contains the request data and it's commited
+ *   to the thread, i,e. it's owned by thread.
+ */
+static unsigned long *get_free_request_bitmap(Threads *threads)
+{
+    bitmap_xor(threads->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+
+    /*
+     * paired with smp_wmb() in mark_request_free() to make sure that we
+     * read request_done_bitmap before fetch the result out.
+     */
+    smp_rmb();
+
+    return threads->result_bitmap;
+}
+
+static int find_free_request_index(Threads *threads)
+{
+    unsigned long *result_bitmap = get_free_request_bitmap(threads);
+    int index, cur_index;
+
+    cur_index = thread_to_first_request_index(threads,
+                                              threads->current_thread_index);
+
+retry:
+    index = find_next_zero_bit(result_bitmap, threads->total_requests,
+                               cur_index);
+    if (index < threads->total_requests) {
+        return index;
+    }
+
+    /* if we get nothing, start it over. */
+    if (cur_index != 0) {
+        cur_index = 0;
+        goto retry;
+    }
+
+    return -1;
+}
+
+static void mark_request_valid(Threads *threads, int request_index)
+{
+    /*
+     * paired with smp_rmb() in find_first_valid_request_index() to make
+     * sure the request has been filled before the bit is flipped that
+     * will make the request be visible to the thread
+     */
+    smp_wmb();
+
+    change_bit(request_index, threads->request_fill_bitmap);
+}
+
+static int thread_find_first_valid_request_index(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    int index;
+
+    bitmap_xor(thread->result_bitmap, threads->request_fill_bitmap,
+               threads->request_done_bitmap, threads->total_requests);
+    /*
+     * paired with smp_wmb() in mark_request_valid() to make sure that
+     * we read request_fill_bitmap before fetch the request out.
+     */
+    smp_rmb();
+
+    index = find_next_bit(thread->result_bitmap, threads->total_requests,
+                          thread->start_request_index);
+    return index > thread->end_request_index ? -1 : index;
+}
+
+static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
+{
+    int index = request_to_index(request);
+
+    /*
+     * smp_wmb() is implied in change_bit_atomic() that is paired with
+     * smp_rmb() in get_free_request_bitmap() to make sure the result
+     * has been saved before the bit is flipped.
+     */
+    change_bit_atomic(index, thread->threads->request_done_bitmap);
+}
+
+/* retry to see if there is available request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *
+thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    int index, count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        index = thread_find_first_valid_request_index(thread);
+        if (index >= 0) {
+            assert(index >= thread->start_request_index &&
+                   index <= thread->end_request_index);
+            return index_to_request(thread->threads, index);
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(void *request) = threads->ops->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->ev);
+            continue;
+        }
+
+        assert(!request->done);
+
+        handler(request + 1);
+        request->done = true;
+        mark_request_free(self_data, request);
+        qemu_event_set(&threads->ev);
+    }
+
+    return NULL;
+}
+
+static void uninit_requests(Threads *threads, int free_nr)
+{
+    ThreadRequest *request;
+    int i;
+
+    for (request = threads->requests, i = 0; i < free_nr; i++) {
+        threads->ops->thread_request_uninit(request + 1);
+        request = (void *)request + threads->request_size;
+    }
+
+    g_free(threads->result_bitmap);
+    g_free(threads->request_fill_bitmap);
+    g_free(threads->request_done_bitmap);
+    g_free(threads->requests);
+}
+
+static int init_requests(Threads *threads)
+{
+    ThreadRequest *request;
+    int aligned_requests, free_nr = 0, ret = -1;
+
+    aligned_requests = BITS_ALIGNED_TO_CACHE(threads->total_requests);
+    threads->request_fill_bitmap = bitmap_new(aligned_requests);
+    threads->request_done_bitmap = bitmap_new(aligned_requests);
+    threads->result_bitmap = bitmap_new(threads->total_requests);
+
+    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
+
+    threads->request_size = threads->ops->thread_get_request_size();
+    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
+    threads->request_size += sizeof(ThreadRequest);
+    threads->requests = g_try_malloc0_n(threads->total_requests,
+                                        threads->request_size);
+    if (!threads->requests) {
+        goto exit;
+    }
+
+    for (request = threads->requests; free_nr < threads->total_requests;
+        free_nr++) {
+        ret = threads->ops->thread_request_init(request + 1);
+        if (ret < 0) {
+            goto exit;
+        }
+
+        request->index = free_nr;
+        request = (void *)request + threads->request_size;
+    }
+
+    return 0;
+
+exit:
+    uninit_requests(threads, free_nr);
+    return ret;
+}
+
+static void uninit_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].ev);
+        g_free(thread_local[i].result_bitmap);
+    }
+}
+
+static void init_thread_data(Threads *threads)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int start_index, end_index, i;
+
+    for (i = 0; i < threads->threads_nr; i++) {
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+
+        start_index = thread_to_first_request_index(threads, i);
+        end_index = start_index + threads->thread_request_nr - 1;
+        thread_local[i].start_request_index = start_index;
+        thread_local[i].end_request_index = end_index;
+
+        thread_local[i].result_bitmap = bitmap_new(threads->total_requests);
+
+        qemu_event_init(&thread_local[i].ev, false);
+
+        name = g_strdup_printf("%s/%d", threads->name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+}
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                               int thread_request_nr, ThreadedWorkqueueOps *ops)
+{
+    Threads *threads;
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->name = name;
+    threads->ops = ops;
+
+    threads->threads_nr = threads_nr;
+    threads->thread_request_nr = thread_request_nr;
+
+    threads->total_requests = thread_request_nr * threads_nr;
+    if (init_requests(threads) < 0) {
+        g_free(threads);
+        return NULL;
+    }
+
+    qemu_event_init(&threads->ev, false);
+    init_thread_data(threads);
+    return threads;
+}
+
+void threaded_workqueue_destroy(Threads *threads)
+{
+    uninit_thread_data(threads);
+    uninit_requests(threads, threads->total_requests);
+    qemu_event_destroy(&threads->ev);
+    g_free(threads);
+}
+
+static void request_done(Threads *threads, ThreadRequest *request)
+{
+    if (!request->done) {
+        return;
+    }
+
+    threads->ops->thread_request_done(request + 1);
+    request->done = false;
+}
+
+void *threaded_workqueue_get_request(Threads *threads)
+{
+    ThreadRequest *request;
+    int index;
+
+    index = find_free_request_index(threads);
+    if (index < 0) {
+        return NULL;
+    }
+
+    request = index_to_request(threads, index);
+    request_done(threads, request);
+    return request + 1;
+}
+
+void threaded_workqueue_submit_request(Threads *threads, void *request)
+{
+    ThreadRequest *req = request - sizeof(ThreadRequest);
+    int request_index = request_to_index(req);
+    int thread_index = request_index_to_thread(threads, request_index);
+    ThreadLocal *thread_local = &threads->per_thread_data[thread_index];
+
+    assert(!req->done);
+
+    mark_request_valid(threads, request_index);
+
+    threads->current_thread_index = ++thread_index;
+    qemu_event_set(&thread_local->ev);
+}
+
+void threaded_workqueue_wait_for_requests(Threads *threads)
+{
+    unsigned long *result_bitmap;
+    int index = 0;
+
+retry:
+    qemu_event_reset(&threads->ev);
+    result_bitmap = get_free_request_bitmap(threads);
+    for (; index < threads->total_requests; index++) {
+        if (test_bit(index, result_bitmap)) {
+            qemu_event_wait(&threads->ev);
+            goto retry;
+        };
+
+        request_done(threads, index_to_request(threads, index));
+    }
+}