diff mbox series

[v3,2/5] util: introduce threaded workqueue

Message ID 20181122072028.22819-3-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show
Series migration: improve multithreads | expand

Commit Message

Xiao Guangrong Nov. 22, 2018, 7:20 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

This modules implements the lockless and efficient threaded workqueue.

Three abstracted objects are used in this module:
- Request.
     It not only contains the data that the workqueue fetches out
    to finish the request but also offers the space to save the result
    after the workqueue handles the request.

    It's flowed between user and workqueue. The user fills the request
    data into it when it is owned by user. After it is submitted to the
    workqueue, the workqueue fetched data out and save the result into
    it after the request is handled.

    All the requests are pre-allocated and carefully partitioned between
    threads so there is no contention on the request, that make threads
    be parallel as much as possible.

- User, i.e, the submitter
    It's the one fills the request and submits it to the workqueue,
    the result will be collected after it is handled by the work queue.

    The user can consecutively submit requests without waiting the previous
    requests been handled.
    It only supports one submitter, you should do serial submission by
    yourself if you want more, e.g, use lock on you side.

- Workqueue, i.e, thread
    Each workqueue is represented by a running thread that fetches
    the request submitted by the user, do the specified work and save
    the result to the request.

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 include/qemu/threaded-workqueue.h | 106 +++++++++
 util/Makefile.objs                |   1 +
 util/threaded-workqueue.c         | 463 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 570 insertions(+)
 create mode 100644 include/qemu/threaded-workqueue.h
 create mode 100644 util/threaded-workqueue.c

Comments

Dr. David Alan Gilbert Nov. 23, 2018, 11:02 a.m. UTC | #1
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> This modules implements the lockless and efficient threaded workqueue.
> 
> Three abstracted objects are used in this module:
> - Request.
>      It not only contains the data that the workqueue fetches out
>     to finish the request but also offers the space to save the result
>     after the workqueue handles the request.
> 
>     It's flowed between user and workqueue. The user fills the request
>     data into it when it is owned by user. After it is submitted to the
>     workqueue, the workqueue fetched data out and save the result into
>     it after the request is handled.
> 
>     All the requests are pre-allocated and carefully partitioned between
>     threads so there is no contention on the request, that make threads
>     be parallel as much as possible.
> 
> - User, i.e, the submitter
>     It's the one fills the request and submits it to the workqueue,
>     the result will be collected after it is handled by the work queue.
> 
>     The user can consecutively submit requests without waiting the previous
>     requests been handled.
>     It only supports one submitter, you should do serial submission by
>     yourself if you want more, e.g, use lock on you side.
> 
> - Workqueue, i.e, thread
>     Each workqueue is represented by a running thread that fetches
>     the request submitted by the user, do the specified work and save
>     the result to the request.
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  include/qemu/threaded-workqueue.h | 106 +++++++++
>  util/Makefile.objs                |   1 +
>  util/threaded-workqueue.c         | 463 ++++++++++++++++++++++++++++++++++++++
>  3 files changed, 570 insertions(+)
>  create mode 100644 include/qemu/threaded-workqueue.h
>  create mode 100644 util/threaded-workqueue.c
> 
> diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
> new file mode 100644
> index 0000000000..e0ede496d0
> --- /dev/null
> +++ b/include/qemu/threaded-workqueue.h
> @@ -0,0 +1,106 @@
> +/*
> + * Lockless and Efficient Threaded Workqueue Abstraction
> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#ifndef QEMU_THREADED_WORKQUEUE_H
> +#define QEMU_THREADED_WORKQUEUE_H
> +
> +#include "qemu/queue.h"
> +#include "qemu/thread.h"
> +
> +/*
> + * This modules implements the lockless and efficient threaded workqueue.
> + *
> + * Three abstracted objects are used in this module:
> + * - Request.
> + *   It not only contains the data that the workqueue fetches out
> + *   to finish the request but also offers the space to save the result
> + *   after the workqueue handles the request.
> + *
> + *   It's flowed between user and workqueue. The user fills the request
> + *   data into it when it is owned by user. After it is submitted to the
> + *   workqueue, the workqueue fetched data out and save the result into
> + *   it after the request is handled.
> + *
> + *   All the requests are pre-allocated and carefully partitioned between
> + *   threads so there is no contention on the request, that make threads
> + *   be parallel as much as possible.
> + *
> + * - User, i.e, the submitter
> + *   It's the one fills the request and submits it to the workqueue,
> + *   the result will be collected after it is handled by the work queue.
> + *
> + *   The user can consecutively submit requests without waiting the previous
> + *   requests been handled.
> + *   It only supports one submitter, you should do serial submission by
> + *   yourself if you want more, e.g, use lock on you side.
> + *
> + * - Workqueue, i.e, thread
> + *   Each workqueue is represented by a running thread that fetches
> + *   the request submitted by the user, do the specified work and save
> + *   the result to the request.
> + */
> +
> +typedef struct Threads Threads;
> +
> +struct ThreadedWorkqueueOps {
> +    /* constructor of the request */
> +    int (*thread_request_init)(void *request);
> +    /*  destructor of the request */
> +    void (*thread_request_uninit)(void *request);
> +
> +    /* the handler of the request that is called by the thread */
> +    void (*thread_request_handler)(void *request);
> +    /* called by the user after the request has been handled */
> +    void (*thread_request_done)(void *request);
> +
> +    size_t request_size;
> +};
> +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
> +
> +/* the default number of requests that thread need handle */
> +#define DEFAULT_THREAD_REQUEST_NR 4
> +/* the max number of requests that thread need handle */
> +#define MAX_THREAD_REQUEST_NR     (sizeof(uint64_t) * BITS_PER_BYTE)
> +
> +/*
> + * create a threaded queue. Other APIs will work on the Threads it returned
> + *
> + * @name: the identity of the workqueue which is used to construct the name
> + *    of threads only
> + * @threads_nr: the number of threads that the workqueue will create
> + * @thread_requests_nr: the number of requests that each single thread will
> + *    handle
> + * @ops: the handlers of the request
> + *
> + * Return NULL if it failed
> + */
> +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
> +                                   unsigned int thread_requests_nr,
> +                                   const ThreadedWorkqueueOps *ops);
> +void threaded_workqueue_destroy(Threads *threads);
> +
> +/*
> + * find a free request where the user can store the data that is needed to
> + * finish the request
> + *
> + * If all requests are used up, return NULL
> + */
> +void *threaded_workqueue_get_request(Threads *threads);
> +/* submit the request and notify the thread */
> +void threaded_workqueue_submit_request(Threads *threads, void *request);
> +
> +/*
> + * wait all threads to complete the request to make sure there is no
> + * previous request exists
> + */
> +void threaded_workqueue_wait_for_requests(Threads *threads);
> +#endif
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 0820923c18..f26dfe5182 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -50,5 +50,6 @@ util-obj-y += range.o
>  util-obj-y += stats64.o
>  util-obj-y += systemd.o
>  util-obj-y += iova-tree.o
> +util-obj-y += threaded-workqueue.o
>  util-obj-$(CONFIG_LINUX) += vfio-helpers.o
>  util-obj-$(CONFIG_OPENGL) += drm.o
> diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
> new file mode 100644
> index 0000000000..2ab37cee8d
> --- /dev/null
> +++ b/util/threaded-workqueue.c
> @@ -0,0 +1,463 @@
> +/*
> + * Lockless and Efficient Threaded Workqueue Abstraction
> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "qemu/bitmap.h"
> +#include "qemu/threaded-workqueue.h"
> +
> +#define SMP_CACHE_BYTES 64

That's architecture dependent isn't it?

> +
> +/*
> + * the request representation which contains the internally used mete data,
> + * it is the header of user-defined data.
> + *
> + * It should be aligned to the nature size of CPU.
> + */
> +struct ThreadRequest {
> +    /*
> +     * the request has been handled by the thread and need the user
> +     * to fetch result out.
> +     */
> +    uint8_t done;
> +
> +    /*
> +     * the index to Thread::requests.
> +     * Save it to the padding space although it can be calculated at runtime.
> +     */
> +    uint8_t request_index;
> +
> +    /* the index to Threads::per_thread_data */
> +    unsigned int thread_index;
> +} QEMU_ALIGNED(sizeof(unsigned long));
> +typedef struct ThreadRequest ThreadRequest;
> +
> +struct ThreadLocal {
> +    struct Threads *threads;
> +
> +    /* the index of the thread */
> +    int self;
> +
> +    /* thread is useless and needs to exit */
> +    bool quit;
> +
> +    QemuThread thread;
> +
> +    void *requests;
> +
> +   /*
> +     * the bit in these two bitmaps indicates the index of the @requests
> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> +    /* after handles the request, the thread flips the bit. */
> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

Patchew complained about some type mismatches; I think those are because
you're using the bitmap_* functions on these; those functions always
operate on 'long' not on uint64_t - and on some platforms they're
unfortunately not the same.


Dave

> +    /*
> +     * the event used to wake up the thread whenever a valid request has
> +     * been submitted
> +     */
> +    QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
> +
> +    /*
> +     * the event is notified whenever a request has been completed
> +     * (i.e, become free), which is used to wake up the user
> +     */
> +    QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
> +};
> +typedef struct ThreadLocal ThreadLocal;
> +
> +/*
> + * the main data struct represents multithreads which is shared by
> + * all threads
> + */
> +struct Threads {
> +    /* the request header, ThreadRequest, is contained */
> +    unsigned int request_size;
> +    unsigned int thread_requests_nr;
> +    unsigned int threads_nr;
> +
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
> +
> +    const ThreadedWorkqueueOps *ops;
> +
> +    ThreadLocal per_thread_data[0];
> +};
> +typedef struct Threads Threads;
> +
> +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index)
> +{
> +    ThreadRequest *request;
> +
> +    request = thread->requests + request_index * thread->threads->request_size;
> +    assert(request->request_index == request_index);
> +    assert(request->thread_index == thread->self);
> +    return request;
> +}
> +
> +static int request_to_index(ThreadRequest *request)
> +{
> +    return request->request_index;
> +}
> +
> +static int request_to_thread_index(ThreadRequest *request)
> +{
> +    return request->thread_index;
> +}
> +
> +/*
> + * free request: the request is not used by any thread, however, it might
> + *   contain the result need the user to call thread_request_done()
> + *
> + * valid request: the request contains the request data and it's committed
> + *   to the thread, i,e. it's owned by thread.
> + */
> +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
> +{
> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
> +
> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
> +               threads->thread_requests_nr);
> +
> +    /*
> +     * paired with smp_wmb() in mark_request_free() to make sure that we
> +     * read request_done_bitmap before fetching the result out.
> +     */
> +    smp_rmb();
> +
> +    return result_bitmap;
> +}
> +
> +static ThreadRequest
> +*find_thread_free_request(Threads *threads, ThreadLocal *thread)
> +{
> +    uint64_t result_bitmap = get_free_request_bitmap(threads, thread);
> +    int index;
> +
> +    index  = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr);
> +    if (index >= threads->thread_requests_nr) {
> +        return NULL;
> +    }
> +
> +    return index_to_request(thread, index);
> +}
> +
> +static ThreadRequest *threads_find_free_request(Threads *threads)
> +{
> +    ThreadLocal *thread;
> +    ThreadRequest *request;
> +    int cur_thread, thread_index;
> +
> +    cur_thread = threads->current_thread_index % threads->threads_nr;
> +    thread_index = cur_thread;
> +    do {
> +        thread = threads->per_thread_data + thread_index++;
> +        request = find_thread_free_request(threads, thread);
> +        if (request) {
> +            break;
> +        }
> +        thread_index %= threads->threads_nr;
> +    } while (thread_index != cur_thread);
> +
> +    return request;
> +}
> +
> +/*
> + * the change bit operation combined with READ_ONCE and WRITE_ONCE which
> + * only works on single uint64_t width
> + */
> +static void change_bit_once(long nr, uint64_t *addr)
> +{
> +    uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr);
> +
> +    atomic_rcu_set(addr, value);
> +}
> +
> +static void mark_request_valid(Threads *threads, ThreadRequest *request)
> +{
> +    int thread_index = request_to_thread_index(request);
> +    int request_index = request_to_index(request);
> +    ThreadLocal *thread = threads->per_thread_data + thread_index;
> +
> +    /*
> +     * paired with smp_rmb() in find_first_valid_request_index() to make
> +     * sure the request has been filled before the bit is flipped that
> +     * will make the request be visible to the thread
> +     */
> +    smp_wmb();
> +
> +    change_bit_once(request_index, &thread->request_fill_bitmap);
> +    qemu_event_set(&thread->request_valid_ev);
> +}
> +
> +static int thread_find_first_valid_request_index(ThreadLocal *thread)
> +{
> +    Threads *threads = thread->threads;
> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
> +    int index;
> +
> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
> +               threads->thread_requests_nr);
> +    /*
> +     * paired with smp_wmb() in mark_request_valid() to make sure that
> +     * we read request_fill_bitmap before fetch the request out.
> +     */
> +    smp_rmb();
> +
> +    index = find_first_bit(&result_bitmap, threads->thread_requests_nr);
> +    return index >= threads->thread_requests_nr ? -1 : index;
> +}
> +
> +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
> +{
> +    int index = request_to_index(request);
> +
> +    /*
> +     * smp_wmb() is implied in change_bit_atomic() that is paired with
> +     * smp_rmb() in get_free_request_bitmap() to make sure the result
> +     * has been saved before the bit is flipped.
> +     */
> +    change_bit_atomic(index, &thread->request_done_bitmap);
> +    qemu_event_set(&thread->request_free_ev);
> +}
> +
> +/* retry to see if there is available request before actually go to wait. */
> +#define BUSY_WAIT_COUNT 1000
> +
> +static ThreadRequest *
> +thread_busy_wait_for_request(ThreadLocal *thread)
> +{
> +    int index, count = 0;
> +
> +    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
> +        index = thread_find_first_valid_request_index(thread);
> +        if (index >= 0) {
> +            return index_to_request(thread, index);
> +        }
> +
> +        cpu_relax();
> +    }
> +
> +    return NULL;
> +}
> +
> +static void *thread_run(void *opaque)
> +{
> +    ThreadLocal *self_data = (ThreadLocal *)opaque;
> +    Threads *threads = self_data->threads;
> +    void (*handler)(void *request) = threads->ops->thread_request_handler;
> +    ThreadRequest *request;
> +
> +    for ( ; !atomic_read(&self_data->quit); ) {
> +        qemu_event_reset(&self_data->request_valid_ev);
> +
> +        request = thread_busy_wait_for_request(self_data);
> +        if (!request) {
> +            qemu_event_wait(&self_data->request_valid_ev);
> +            continue;
> +        }
> +
> +        assert(!request->done);
> +
> +        handler(request + 1);
> +        request->done = true;
> +        mark_request_free(self_data, request);
> +    }
> +
> +    return NULL;
> +}
> +
> +static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
> +{
> +    Threads *threads = thread->threads;
> +    ThreadRequest *request = thread->requests;
> +    int i;
> +
> +    for (i = 0; i < free_nr; i++) {
> +        threads->ops->thread_request_uninit(request + 1);
> +        request = (void *)request + threads->request_size;
> +    }
> +    g_free(thread->requests);
> +}
> +
> +static int init_thread_requests(ThreadLocal *thread)
> +{
> +    Threads *threads = thread->threads;
> +    ThreadRequest *request;
> +    int ret, i, thread_reqs_size;
> +
> +    thread_reqs_size = threads->thread_requests_nr * threads->request_size;
> +    thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES);
> +    thread->requests = g_malloc0(thread_reqs_size);
> +
> +    request = thread->requests;
> +    for (i = 0; i < threads->thread_requests_nr; i++) {
> +        ret = threads->ops->thread_request_init(request + 1);
> +        if (ret < 0) {
> +            goto exit;
> +        }
> +
> +        request->request_index = i;
> +        request->thread_index = thread->self;
> +        request = (void *)request + threads->request_size;
> +    }
> +    return 0;
> +
> +exit:
> +    uninit_thread_requests(thread, i);
> +    return -1;
> +}
> +
> +static void uninit_thread_data(Threads *threads, int free_nr)
> +{
> +    ThreadLocal *thread_local = threads->per_thread_data;
> +    int i;
> +
> +    for (i = 0; i < free_nr; i++) {
> +        thread_local[i].quit = true;
> +        qemu_event_set(&thread_local[i].request_valid_ev);
> +        qemu_thread_join(&thread_local[i].thread);
> +        qemu_event_destroy(&thread_local[i].request_valid_ev);
> +        qemu_event_destroy(&thread_local[i].request_free_ev);
> +        uninit_thread_requests(&thread_local[i], threads->thread_requests_nr);
> +    }
> +}
> +
> +static int
> +init_thread_data(Threads *threads, const char *thread_name, int thread_nr)
> +{
> +    ThreadLocal *thread_local = threads->per_thread_data;
> +    char *name;
> +    int i;
> +
> +    for (i = 0; i < thread_nr; i++) {
> +        thread_local[i].threads = threads;
> +        thread_local[i].self = i;
> +
> +        if (init_thread_requests(&thread_local[i]) < 0) {
> +            goto exit;
> +        }
> +
> +        qemu_event_init(&thread_local[i].request_free_ev, false);
> +        qemu_event_init(&thread_local[i].request_valid_ev, false);
> +
> +        name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self);
> +        qemu_thread_create(&thread_local[i].thread, name,
> +                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
> +        g_free(name);
> +    }
> +    return 0;
> +
> +exit:
> +    uninit_thread_data(threads, i);
> +    return -1;
> +}
> +
> +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
> +                                   unsigned int thread_requests_nr,
> +                                   const ThreadedWorkqueueOps *ops)
> +{
> +    Threads *threads;
> +
> +    if (threads_nr > MAX_THREAD_REQUEST_NR) {
> +        return NULL;
> +    }
> +
> +    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
> +    threads->ops = ops;
> +    threads->threads_nr = threads_nr;
> +    threads->thread_requests_nr = thread_requests_nr;
> +
> +    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
> +    threads->request_size = threads->ops->request_size;
> +    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
> +    threads->request_size += sizeof(ThreadRequest);
> +
> +    if (init_thread_data(threads, name, threads_nr) < 0) {
> +        g_free(threads);
> +        return NULL;
> +    }
> +
> +    return threads;
> +}
> +
> +void threaded_workqueue_destroy(Threads *threads)
> +{
> +    uninit_thread_data(threads, threads->threads_nr);
> +    g_free(threads);
> +}
> +
> +static void request_done(Threads *threads, ThreadRequest *request)
> +{
> +    if (!request->done) {
> +        return;
> +    }
> +
> +    threads->ops->thread_request_done(request + 1);
> +    request->done = false;
> +}
> +
> +void *threaded_workqueue_get_request(Threads *threads)
> +{
> +    ThreadRequest *request;
> +
> +    request = threads_find_free_request(threads);
> +    if (!request) {
> +        return NULL;
> +    }
> +
> +    request_done(threads, request);
> +    return request + 1;
> +}
> +
> +void threaded_workqueue_submit_request(Threads *threads, void *request)
> +{
> +    ThreadRequest *req = request - sizeof(ThreadRequest);
> +    int thread_index = request_to_thread_index(request);
> +
> +    assert(!req->done);
> +    mark_request_valid(threads, req);
> +    threads->current_thread_index = thread_index  + 1;
> +}
> +
> +void threaded_workqueue_wait_for_requests(Threads *threads)
> +{
> +    ThreadLocal *thread;
> +    uint64_t result_bitmap;
> +    int thread_index, index = 0;
> +
> +    for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) {
> +        thread = threads->per_thread_data + thread_index;
> +        index = 0;
> +retry:
> +        qemu_event_reset(&thread->request_free_ev);
> +        result_bitmap = get_free_request_bitmap(threads, thread);
> +
> +        for (; index < threads->thread_requests_nr; index++) {
> +            if (test_bit(index, &result_bitmap)) {
> +                qemu_event_wait(&thread->request_free_ev);
> +                goto retry;
> +            }
> +
> +            request_done(threads, index_to_request(thread, index));
> +        }
> +    }
> +}
> -- 
> 2.14.5
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Emilio Cota Nov. 24, 2018, 12:12 a.m. UTC | #2
On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote:
> +   /*
> +     * the bit in these two bitmaps indicates the index of the @requests

This @ is not ASCII, is it?

> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> +    /* after handles the request, the thread flips the bit. */
> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

Use DECLARE_BITMAP, otherwise you'll get type errors as David
pointed out.

Thanks,

		Emilio
Emilio Cota Nov. 24, 2018, 12:17 a.m. UTC | #3
On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote:
> +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
> +{
> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
> +
> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
> +               threads->thread_requests_nr);

This is not wrong, but it's a big ugly. Instead, I would:

- Introduce bitmap_xor_atomic in a previous patch
- Use bitmap_xor_atomic here, getting rid of the rcu reads

Thanks,

		Emilio
Xiao Guangrong Nov. 26, 2018, 7:57 a.m. UTC | #4
On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote:

>> +#include "qemu/osdep.h"
>> +#include "qemu/bitmap.h"
>> +#include "qemu/threaded-workqueue.h"
>> +
>> +#define SMP_CACHE_BYTES 64
> 
> That's architecture dependent isn't it?
> 

Yes, it's arch dependent indeed.

I just used 64 for simplification and i think it is <= 64 on most CPU arch-es
so that can work.

Should i introduce statically defined CACHE LINE SIZE for all arch-es? :(

>> +   /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
>> +    /* after handles the request, the thread flips the bit. */
>> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
> Patchew complained about some type mismatches; I think those are because
> you're using the bitmap_* functions on these; those functions always
> operate on 'long' not on uint64_t - and on some platforms they're
> unfortunately not the same.

I guess you were taking about this error:
ERROR: externs should be avoided in .c files
#233: FILE: util/threaded-workqueue.c:65:
+    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone
when the aligned thing is removed...

The issue you pointed out can be avoid by using type-casting, like:
bitmap_xor(..., (void *)&thread->request_fill_bitmap)
cannot we?

Thanks!
Xiao Guangrong Nov. 26, 2018, 8:06 a.m. UTC | #5
On 11/24/18 8:12 AM, Emilio G. Cota wrote:
> On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote:
>> +   /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
> 
> This @ is not ASCII, is it?
> 

Good eyes. :)

Will fix it.

>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
>> +    /* after handles the request, the thread flips the bit. */
>> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
> Use DECLARE_BITMAP, otherwise you'll get type errors as David
> pointed out.

If we do it, the field becomes a pointer... that complicates the
thing.

Hmm, i am using the same trick applied by kvm module when it handles
vcpu->requests:
static inline bool kvm_test_request(int req, struct kvm_vcpu *vcpu)
{
	return test_bit(req & KVM_REQUEST_MASK, (void *)&vcpu->requests);
}

Is it good?
Xiao Guangrong Nov. 26, 2018, 8:18 a.m. UTC | #6
On 11/24/18 8:17 AM, Emilio G. Cota wrote:
> On Thu, Nov 22, 2018 at 15:20:25 +0800, guangrong.xiao@gmail.com wrote:
>> +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
>> +{
>> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
>> +
>> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
>> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
>> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
>> +               threads->thread_requests_nr);
> 
> This is not wrong, but it's a big ugly. Instead, I would:
> 
> - Introduce bitmap_xor_atomic in a previous patch
> - Use bitmap_xor_atomic here, getting rid of the rcu reads

Hmm, however, we do not need atomic xor operation here... that should be slower than
just two READ_ONCE calls.
Paolo Bonzini Nov. 26, 2018, 10:28 a.m. UTC | #7
On 26/11/18 09:18, Xiao Guangrong wrote:
>>
>>> +static uint64_t get_free_request_bitmap(Threads *threads,
>>> ThreadLocal *thread)
>>> +{
>>> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
>>> +
>>> +    request_fill_bitmap =
>>> atomic_rcu_read(&thread->request_fill_bitmap);
>>> +    request_done_bitmap =
>>> atomic_rcu_read(&thread->request_done_bitmap);
>>> +    bitmap_xor(&result_bitmap, &request_fill_bitmap,
>>> &request_done_bitmap,
>>> +               threads->thread_requests_nr);
>>
>> This is not wrong, but it's a big ugly. Instead, I would:
>>
>> - Introduce bitmap_xor_atomic in a previous patch
>> - Use bitmap_xor_atomic here, getting rid of the rcu reads
> 
> Hmm, however, we do not need atomic xor operation here... that should be
> slower than
> just two READ_ONCE calls.

Yeah, I'd just go with Guangrong's version.  Alternatively, add
find_{first,next}_{same,different}_bit functions (whatever subset of the
4 you need).

Paolo
Dr. David Alan Gilbert Nov. 26, 2018, 10:56 a.m. UTC | #8
* Xiao Guangrong (guangrong.xiao@gmail.com) wrote:
> 
> 
> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote:
> 
> > > +#include "qemu/osdep.h"
> > > +#include "qemu/bitmap.h"
> > > +#include "qemu/threaded-workqueue.h"
> > > +
> > > +#define SMP_CACHE_BYTES 64
> > 
> > That's architecture dependent isn't it?
> > 
> 
> Yes, it's arch dependent indeed.
> 
> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es
> so that can work.
> 
> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :(

I think it depends why you need it; but we shouldn't have a constant
that is wrong, and we shouldn't define something architecture dependent
in here.

> > > +   /*
> > > +     * the bit in these two bitmaps indicates the index of the @requests
> > > +     * respectively. If it's the same, the corresponding request is free
> > > +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> > > +     * it is valid and owned by the thread, i.e, where the thread fetches
> > > +     * the request and write the result.
> > > +     */
> > > +
> > > +    /* after the user fills the request, the bit is flipped. */
> > > +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> > > +    /* after handles the request, the thread flips the bit. */
> > > +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> > 
> > Patchew complained about some type mismatches; I think those are because
> > you're using the bitmap_* functions on these; those functions always
> > operate on 'long' not on uint64_t - and on some platforms they're
> > unfortunately not the same.
> 
> I guess you were taking about this error:
> ERROR: externs should be avoided in .c files
> #233: FILE: util/threaded-workqueue.c:65:
> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
> The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone
> when the aligned thing is removed...
> 
> The issue you pointed out can be avoid by using type-casting, like:
> bitmap_xor(..., (void *)&thread->request_fill_bitmap)
> cannot we?

I thought the error was just due to long vs uint64_t ratehr than the
qemu_aligned.  I don't think it's just a casting problem, since I don't
think the long's are always 64bit.

Dave

> Thanks!
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Emilio Cota Nov. 26, 2018, 6:55 p.m. UTC | #9
On Mon, Nov 26, 2018 at 15:57:25 +0800, Xiao Guangrong wrote:
> 
> 
> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote:
> 
> > > +#include "qemu/osdep.h"
> > > +#include "qemu/bitmap.h"
> > > +#include "qemu/threaded-workqueue.h"
> > > +
> > > +#define SMP_CACHE_BYTES 64
> > 
> > That's architecture dependent isn't it?
> > 
> 
> Yes, it's arch dependent indeed.
> 
> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es
> so that can work.
> 
> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :(

No, at compile-time this is impossible to know.

We do query this info at run-time though (see util/cacheinfo.c),
but using that info here would complicate things too much.

You can just give it a different name, and perhaps add a comment.
See for instance what we do in qht.c with QHT_BUCKET_ALIGN.

Thanks,

		Emilio
Xiao Guangrong Nov. 27, 2018, 7:17 a.m. UTC | #10
On 11/26/18 6:56 PM, Dr. David Alan Gilbert wrote:
> * Xiao Guangrong (guangrong.xiao@gmail.com) wrote:
>>
>>
>> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote:
>>
>>>> +#include "qemu/osdep.h"
>>>> +#include "qemu/bitmap.h"
>>>> +#include "qemu/threaded-workqueue.h"
>>>> +
>>>> +#define SMP_CACHE_BYTES 64
>>>
>>> That's architecture dependent isn't it?
>>>
>>
>> Yes, it's arch dependent indeed.
>>
>> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es
>> so that can work.
>>
>> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :(
> 
> I think it depends why you need it; but we shouldn't have a constant
> that is wrong, and we shouldn't define something architecture dependent
> in here.
> 

I see. I will address Emilio's suggestion that rename SMP_CACHE_BYTES to
THREAD_QUEUE_ALIGN and additional comments.

>>>> +   /*
>>>> +     * the bit in these two bitmaps indicates the index of the @requests
>>>> +     * respectively. If it's the same, the corresponding request is free
>>>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>>>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>>>> +     * the request and write the result.
>>>> +     */
>>>> +
>>>> +    /* after the user fills the request, the bit is flipped. */
>>>> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
>>>> +    /* after handles the request, the thread flips the bit. */
>>>> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
>>>
>>> Patchew complained about some type mismatches; I think those are because
>>> you're using the bitmap_* functions on these; those functions always
>>> operate on 'long' not on uint64_t - and on some platforms they're
>>> unfortunately not the same.
>>
>> I guess you were taking about this error:
>> ERROR: externs should be avoided in .c files
>> #233: FILE: util/threaded-workqueue.c:65:
>> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
>>
>> The complained thing is "QEMU_ALIGNED(SMP_CACHE_BYTES)" as it gone
>> when the aligned thing is removed...
>>
>> The issue you pointed out can be avoid by using type-casting, like:
>> bitmap_xor(..., (void *)&thread->request_fill_bitmap)
>> cannot we?
> 
> I thought the error was just due to long vs uint64_t ratehr than the
> qemu_aligned.  I don't think it's just a casting problem, since I don't
> think the long's are always 64bit.

Well, i made some adjustments that makes check_patch.sh really happy :),
as followings:
$ git diff util/
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
index 2ab37cee8d..e34c65a8eb 100644
--- a/util/threaded-workqueue.c
+++ b/util/threaded-workqueue.c
@@ -62,21 +62,30 @@ struct ThreadLocal {
       */

      /* after the user fills the request, the bit is flipped. */
-    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+    struct {
+        uint64_t request_fill_bitmap;
+    } QEMU_ALIGNED(SMP_CACHE_BYTES);
+
      /* after handles the request, the thread flips the bit. */
-    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+    struct {
+        uint64_t request_done_bitmap;
+    } QEMU_ALIGNED(SMP_CACHE_BYTES);

      /*
       * the event used to wake up the thread whenever a valid request has
       * been submitted
       */
-    QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+    struct {
+        QemuEvent request_valid_ev;
+    } QEMU_ALIGNED(SMP_CACHE_BYTES);

      /*
       * the event is notified whenever a request has been completed
       * (i.e, become free), which is used to wake up the user
       */
-    QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+    struct {
+        QemuEvent request_free_ev;
+    } QEMU_ALIGNED(SMP_CACHE_BYTES);
  };
  typedef struct ThreadLocal ThreadLocal;

$ ./scripts/checkpatch.pl -f util/threaded-workqueue.c
total: 0 errors, 0 warnings, 472 lines checked

util/threaded-workqueue.c has no obvious style problems and is ready for submission.

check_patch.sh somehow treats QEMU_ALIGNED as a function before the modification.

And yes, u64 is not a long type on 32 bit arch, it's long[2] instead. that's fine
when we pass the &(u64) to the function whose parameter is (long *). I thing this
trick is widely used. e.g, the example in kvm that i replied to Emilio:

static inline bool kvm_test_request(int req, struct kvm_vcpu *vcpu)
{
     return test_bit(req & KVM_REQUEST_MASK, (void *)&vcpu->requests);
}
Xiao Guangrong Nov. 27, 2018, 8:30 a.m. UTC | #11
On 11/27/18 2:55 AM, Emilio G. Cota wrote:
> On Mon, Nov 26, 2018 at 15:57:25 +0800, Xiao Guangrong wrote:
>>
>>
>> On 11/23/18 7:02 PM, Dr. David Alan Gilbert wrote:
>>
>>>> +#include "qemu/osdep.h"
>>>> +#include "qemu/bitmap.h"
>>>> +#include "qemu/threaded-workqueue.h"
>>>> +
>>>> +#define SMP_CACHE_BYTES 64
>>>
>>> That's architecture dependent isn't it?
>>>
>>
>> Yes, it's arch dependent indeed.
>>
>> I just used 64 for simplification and i think it is <= 64 on most CPU arch-es
>> so that can work.
>>
>> Should i introduce statically defined CACHE LINE SIZE for all arch-es? :(
> 
> No, at compile-time this is impossible to know.
> 
> We do query this info at run-time though (see util/cacheinfo.c),
> but using that info here would complicate things too much.

I see.

> 
> You can just give it a different name, and perhaps add a comment.
> See for instance what we do in qht.c with QHT_BUCKET_ALIGN.

That's really a good lesson to me, will follow it. :)
Xiao Guangrong Nov. 27, 2018, 8:31 a.m. UTC | #12
On 11/26/18 6:28 PM, Paolo Bonzini wrote:
> On 26/11/18 09:18, Xiao Guangrong wrote:
>>>
>>>> +static uint64_t get_free_request_bitmap(Threads *threads,
>>>> ThreadLocal *thread)
>>>> +{
>>>> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
>>>> +
>>>> +    request_fill_bitmap =
>>>> atomic_rcu_read(&thread->request_fill_bitmap);
>>>> +    request_done_bitmap =
>>>> atomic_rcu_read(&thread->request_done_bitmap);
>>>> +    bitmap_xor(&result_bitmap, &request_fill_bitmap,
>>>> &request_done_bitmap,
>>>> +               threads->thread_requests_nr);
>>>
>>> This is not wrong, but it's a big ugly. Instead, I would:
>>>
>>> - Introduce bitmap_xor_atomic in a previous patch
>>> - Use bitmap_xor_atomic here, getting rid of the rcu reads
>>
>> Hmm, however, we do not need atomic xor operation here... that should be
>> slower than
>> just two READ_ONCE calls.
> 
> Yeah, I'd just go with Guangrong's version.  Alternatively, add
> find_{first,next}_{same,different}_bit functions (whatever subset of the
> 4 you need).

That's good to me. will try it. ;)
Christophe de Dinechin Nov. 27, 2018, 12:49 p.m. UTC | #13
(I did not finish the review, but decided to send what I already had).


> On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote:
> 
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> This modules implements the lockless and efficient threaded workqueue.

I’m not entirely convinced that it’s either “lockless” or “efficient”
in the current iteration. I believe that it’s relatively easy to fix, though.

> 
> Three abstracted objects are used in this module:
> - Request.
>     It not only contains the data that the workqueue fetches out
>    to finish the request but also offers the space to save the result
>    after the workqueue handles the request.
> 
>    It's flowed between user and workqueue. The user fills the request
>    data into it when it is owned by user. After it is submitted to the
>    workqueue, the workqueue fetched data out and save the result into
>    it after the request is handled.

fetched -> fetches
save -> saves

> 
>    All the requests are pre-allocated and carefully partitioned between
>    threads so there is no contention on the request, that make threads
>    be parallel as much as possible.

That sentence confused me (it’s also in a comment in the text).
I think I’m mostly confused by “there is no contention”. Perhaps you
meant “so as to avoid contention if possible”? If there is a reason
why there would never be any contention even if requests arrive faster than
completions, I did not figure it out.

I personally see serious contention on the fields in the Threads structure,
for example, but also possibly on the targets of the “modulo” operation in
thread_find_free_request. Specifically, if three CPUs are entering
thread_find_free_request at the same time, they will all run the same
loop and all, presumably, “attack” the same memory locations.

Sorry if I mis-read the code, but at the moment, it does not seem to
avoid contention as intended. I don’t see how it could without having
some way to discriminate between CPUs to start with, which I did not find.


> 
> - User, i.e, the submitter
>    It's the one fills the request and submits it to the workqueue,
the one -> the one who
>    the result will be collected after it is handled by the work queue.
> 
>    The user can consecutively submit requests without waiting the previous
waiting -> waiting for
>    requests been handled.
>    It only supports one submitter, you should do serial submission by
>    yourself if you want more, e.g, use lock on you side.

I’m also confused by this last statement. The proposal purports
to be “lockless”, which I read as working correctly without a lock…
Reading the code, I indeed see issues if different threads
try to place requests at the same time. So I believe the word
“lockless” is a bit misleading.

> 
> - Workqueue, i.e, thread
>    Each workqueue is represented by a running thread that fetches
>    the request submitted by the user, do the specified work and save
>    the result to the request.
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
> include/qemu/threaded-workqueue.h | 106 +++++++++
> util/Makefile.objs                |   1 +
> util/threaded-workqueue.c         | 463 ++++++++++++++++++++++++++++++++++++++
> 3 files changed, 570 insertions(+)
> create mode 100644 include/qemu/threaded-workqueue.h
> create mode 100644 util/threaded-workqueue.c
> 
> diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
> new file mode 100644
> index 0000000000..e0ede496d0
> --- /dev/null
> +++ b/include/qemu/threaded-workqueue.h
> @@ -0,0 +1,106 @@
> +/*
> + * Lockless and Efficient Threaded Workqueue Abstraction
> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#ifndef QEMU_THREADED_WORKQUEUE_H
> +#define QEMU_THREADED_WORKQUEUE_H
> +
> +#include "qemu/queue.h"
> +#include "qemu/thread.h"
> +
> +/*
> + * This modules implements the lockless and efficient threaded workqueue.
> + *
> + * Three abstracted objects are used in this module:
> + * - Request.
> + *   It not only contains the data that the workqueue fetches out
> + *   to finish the request but also offers the space to save the result
> + *   after the workqueue handles the request.
> + *
> + *   It's flowed between user and workqueue. The user fills the request
> + *   data into it when it is owned by user. After it is submitted to the
> + *   workqueue, the workqueue fetched data out and save the result into
> + *   it after the request is handled.
> + *
> + *   All the requests are pre-allocated and carefully partitioned between
> + *   threads so there is no contention on the request, that make threads
> + *   be parallel as much as possible.
> + *
> + * - User, i.e, the submitter
> + *   It's the one fills the request and submits it to the workqueue,
> + *   the result will be collected after it is handled by the work queue.
> + *
> + *   The user can consecutively submit requests without waiting the previous
> + *   requests been handled.
> + *   It only supports one submitter, you should do serial submission by
> + *   yourself if you want more, e.g, use lock on you side.
> + *
> + * - Workqueue, i.e, thread
> + *   Each workqueue is represented by a running thread that fetches
> + *   the request submitted by the user, do the specified work and save
> + *   the result to the request.
> + */
> +
> +typedef struct Threads Threads;
> +
> +struct ThreadedWorkqueueOps {
> +    /* constructor of the request */
> +    int (*thread_request_init)(void *request);
> +    /*  destructor of the request */
> +    void (*thread_request_uninit)(void *request);
> +
> +    /* the handler of the request that is called by the thread */
> +    void (*thread_request_handler)(void *request);
> +    /* called by the user after the request has been handled */
> +    void (*thread_request_done)(void *request);
> +
> +    size_t request_size;
> +};
> +typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
> +
> +/* the default number of requests that thread need handle */
> +#define DEFAULT_THREAD_REQUEST_NR 4
> +/* the max number of requests that thread need handle */
> +#define MAX_THREAD_REQUEST_NR     (sizeof(uint64_t) * BITS_PER_BYTE)
> +
> +/*
> + * create a threaded queue. Other APIs will work on the Threads it returned
> + *
> + * @name: the identity of the workqueue which is used to construct the name
> + *    of threads only
> + * @threads_nr: the number of threads that the workqueue will create
> + * @thread_requests_nr: the number of requests that each single thread will
> + *    handle
> + * @ops: the handlers of the request
> + *
> + * Return NULL if it failed
> + */
> +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
> +                                   unsigned int thread_requests_nr,
> +                                   const ThreadedWorkqueueOps *ops);
> +void threaded_workqueue_destroy(Threads *threads);
> +
> +/*
> + * find a free request where the user can store the data that is needed to
> + * finish the request
> + *
> + * If all requests are used up, return NULL
> + */
> +void *threaded_workqueue_get_request(Threads *threads);

Using void * to represent the payload makes it easy to get
the wrong pointer in there without the compiler noticing.
Consider adding a type for the payload?


> +/* submit the request and notify the thread */
> +void threaded_workqueue_submit_request(Threads *threads, void *request);
> +
> +/*
> + * wait all threads to complete the request to make sure there is no
> + * previous request exists
> + */
> +void threaded_workqueue_wait_for_requests(Threads *threads);
> +#endif
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 0820923c18..f26dfe5182 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -50,5 +50,6 @@ util-obj-y += range.o
> util-obj-y += stats64.o
> util-obj-y += systemd.o
> util-obj-y += iova-tree.o
> +util-obj-y += threaded-workqueue.o
> util-obj-$(CONFIG_LINUX) += vfio-helpers.o
> util-obj-$(CONFIG_OPENGL) += drm.o
> diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
> new file mode 100644
> index 0000000000..2ab37cee8d
> --- /dev/null
> +++ b/util/threaded-workqueue.c
> @@ -0,0 +1,463 @@
> +/*
> + * Lockless and Efficient Threaded Workqueue Abstraction


> + *
> + * Author:
> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
> + *
> + * Copyright(C) 2018 Tencent Corporation.
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +#include "qemu/bitmap.h"
> +#include "qemu/threaded-workqueue.h"
> +
> +#define SMP_CACHE_BYTES 64

+1 on comments already made by others

> +
> +/*
> + * the request representation which contains the internally used mete data,

mete -> meta

> + * it is the header of user-defined data.
> + *
> + * It should be aligned to the nature size of CPU.
> + */
> +struct ThreadRequest {
> +    /*
> +     * the request has been handled by the thread and need the user
> +     * to fetch result out.
> +     */
> +    uint8_t done;
> +
> +    /*
> +     * the index to Thread::requests.
> +     * Save it to the padding space although it can be calculated at runtime.
> +     */
> +    uint8_t request_index;

So no more than 256?

This is blocked by MAX_THREAD_REQUEST_NR test at the beginning
of threaded_workqueue_create, but I would make it more explicit either
with a compile-time assert that MAX_THREAD_REQUEST_NR is
below UINT8_MAX, or by adding a second test for UINT8_MAX in
threaded_workqueue_create.

Also, an obvious extension would be to make bitmaps into arrays.

Do you think someone would want to use the package to assign
requests per CPU or per VCPU? If so, that could quickly go above 64.


> +
> +    /* the index to Threads::per_thread_data */
> +    unsigned int thread_index;

Don’t you want to use a size_t for that?

> +} QEMU_ALIGNED(sizeof(unsigned long));

Nit: the alignment type is inconsistent with that given
to QEMU_BUILD_BUG_ON in threaded_workqueue_create.
(long vs. unsigned long).

Also, why is the alignment required? Aren’t you more interested
in cache-line alignment?


> +typedef struct ThreadRequest ThreadRequest;


> +
> +struct ThreadLocal {
> +    struct Threads *threads;
> +
> +    /* the index of the thread */
> +    int self;

Why signed?

> +
> +    /* thread is useless and needs to exit */
> +    bool quit;
> +
> +    QemuThread thread;
> +
> +    void *requests;
> +
> +   /*
> +     * the bit in these two bitmaps indicates the index of the @requests
> +     * respectively. If it's the same, the corresponding request is free
> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
> +     * it is valid and owned by the thread, i.e, where the thread fetches
> +     * the request and write the result.
> +     */
> +
> +    /* after the user fills the request, the bit is flipped. */
> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);

I believe you are trying to ensure that data accessed from multiple CPUs
is on different cache lines. As others have pointed out, the real value for
SMP_CACHE_BYTES can only be known at run-time. So this is not really
helping. Also, the ThreadLocal structure itself is not necessarily aligned
within struct Threads. Therefore, it’s possible that “requests” for example
could be on the same cache line as request_fill_bitmap if planets align
the wrong way.

In order to mitigate these effects, I would group the data that the user
writes and the data that the thread writes, i.e. reorder declarations,
put request_fill_bitmap and request_valid_ev together, and try
to put them in the same cache line so that only one cache line is invalidated
from within mark_request_valid instead of two.

Then you end up with a single alignment directive instead of 4, to
separate requests from completions.

That being said, I’m not sure why you use a bitmap here. What is the
expected benefit relative to atomic lists (which would also make it really
lock-free)?

> +    /* after handles the request, the thread flips the bit. */
> +    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> +
> +    /*
> +     * the event used to wake up the thread whenever a valid request has
> +     * been submitted
> +     */
> +    QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
> +
> +    /*
> +     * the event is notified whenever a request has been completed
> +     * (i.e, become free), which is used to wake up the user
> +     */
> +    QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
> +};
> +typedef struct ThreadLocal ThreadLocal;


> +
> +/*
> + * the main data struct represents multithreads which is shared by
> + * all threads
> + */
> +struct Threads {
> +    /* the request header, ThreadRequest, is contained */
> +    unsigned int request_size;

size_t?

> +    unsigned int thread_requests_nr;
> +    unsigned int threads_nr;
> +
> +    /* the request is pushed to the thread with round-robin manner */
> +    unsigned int current_thread_index;
> +
> +    const ThreadedWorkqueueOps *ops;


> +
> +    ThreadLocal per_thread_data[0];
> +};
> +typedef struct Threads Threads;
> +
> +static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index)
> +{
> +    ThreadRequest *request;
> +
> +    request = thread->requests + request_index * thread->threads->request_size;
> +    assert(request->request_index == request_index);
> +    assert(request->thread_index == thread->self);
> +    return request;
> +}
> +
> +static int request_to_index(ThreadRequest *request)
> +{
> +    return request->request_index;
> +}
> +
> +static int request_to_thread_index(ThreadRequest *request)
> +{
> +    return request->thread_index;
> +}
> +
> +/*
> + * free request: the request is not used by any thread, however, it might
> + *   contain the result need the user to call thread_request_done()

might contain the result -> might still contain the result
result need the user to call -> result. The user needs to call

> + *
> + * valid request: the request contains the request data and it's committed
> + *   to the thread, i,e. it's owned by thread.
> + */
> +static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
> +{
> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
> +
> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
> +               threads->thread_requests_nr);
> +
> +    /*
> +     * paired with smp_wmb() in mark_request_free() to make sure that we
> +     * read request_done_bitmap before fetching the result out.
> +     */
> +    smp_rmb();
> +
> +    return result_bitmap;
> +}

It seems that this part would be much simpler to understand using atomic lists.

> +
> +static ThreadRequest
> +*find_thread_free_request(Threads *threads, ThreadLocal *thread)
> +{
> +    uint64_t result_bitmap = get_free_request_bitmap(threads, thread);
> +    int index;
> +
> +    index  = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr);
> +    if (index >= threads->thread_requests_nr) {
> +        return NULL;
> +    }
> +
> +    return index_to_request(thread, index);
> +}
> +
> +static ThreadRequest *threads_find_free_request(Threads *threads)
> +{
> +    ThreadLocal *thread;
> +    ThreadRequest *request;
> +    int cur_thread, thread_index;
> +
> +    cur_thread = threads->current_thread_index % threads->threads_nr;
> +    thread_index = cur_thread;
> +    do {
> +        thread = threads->per_thread_data + thread_index++;
> +        request = find_thread_free_request(threads, thread);
> +        if (request) {
> +            break;
> +        }
> +        thread_index %= threads->threads_nr;
> +    } while (thread_index != cur_thread);
> +
> +    return request;
> +}
> +
> +/*
> + * the change bit operation combined with READ_ONCE and WRITE_ONCE which
> + * only works on single uint64_t width
> + */
> +static void change_bit_once(long nr, uint64_t *addr)
> +{
> +    uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr);
> +
> +    atomic_rcu_set(addr, value);
> +}
> +
> +static void mark_request_valid(Threads *threads, ThreadRequest *request)
> +{
> +    int thread_index = request_to_thread_index(request);
> +    int request_index = request_to_index(request);
> +    ThreadLocal *thread = threads->per_thread_data + thread_index;
> +
> +    /*
> +     * paired with smp_rmb() in find_first_valid_request_index() to make
> +     * sure the request has been filled before the bit is flipped that
> +     * will make the request be visible to the thread
> +     */
> +    smp_wmb();
> +
> +    change_bit_once(request_index, &thread->request_fill_bitmap);
> +    qemu_event_set(&thread->request_valid_ev);
> +}
> +
> +static int thread_find_first_valid_request_index(ThreadLocal *thread)
> +{
> +    Threads *threads = thread->threads;
> +    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
> +    int index;
> +
> +    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
> +    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
> +    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
> +               threads->thread_requests_nr);
> +    /*
> +     * paired with smp_wmb() in mark_request_valid() to make sure that
> +     * we read request_fill_bitmap before fetch the request out.
> +     */
> +    smp_rmb();
> +
> +    index = find_first_bit(&result_bitmap, threads->thread_requests_nr);
> +    return index >= threads->thread_requests_nr ? -1 : index;
> +}
> +
> +static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
> +{
> +    int index = request_to_index(request);
> +
> +    /*
> +     * smp_wmb() is implied in change_bit_atomic() that is paired with
> +     * smp_rmb() in get_free_request_bitmap() to make sure the result
> +     * has been saved before the bit is flipped.
> +     */
> +    change_bit_atomic(index, &thread->request_done_bitmap);
> +    qemu_event_set(&thread->request_free_ev);
> +}
> +
> +/* retry to see if there is available request before actually go to wait. */
> +#define BUSY_WAIT_COUNT 1000
> +
> +static ThreadRequest *
> +thread_busy_wait_for_request(ThreadLocal *thread)
> +{
> +    int index, count = 0;
> +
> +    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
> +        index = thread_find_first_valid_request_index(thread);
> +        if (index >= 0) {
> +            return index_to_request(thread, index);
> +        }
> +
> +        cpu_relax();
> +    }
> +
> +    return NULL;
> +}
> +
> +static void *thread_run(void *opaque)
> +{
> +    ThreadLocal *self_data = (ThreadLocal *)opaque;
> +    Threads *threads = self_data->threads;
> +    void (*handler)(void *request) = threads->ops->thread_request_handler;
> +    ThreadRequest *request;
> +
> +    for ( ; !atomic_read(&self_data->quit); ) {
> +        qemu_event_reset(&self_data->request_valid_ev);
> +
> +        request = thread_busy_wait_for_request(self_data);
> +        if (!request) {
> +            qemu_event_wait(&self_data->request_valid_ev);
> +            continue;
> +        }
> +
> +        assert(!request->done);
> +
> +        handler(request + 1);
> +        request->done = true;
> +        mark_request_free(self_data, request);
> +    }
> +
> +    return NULL;
> +}
> +
> +static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
> +{
> +    Threads *threads = thread->threads;
> +    ThreadRequest *request = thread->requests;
> +    int i;
> +
> +    for (i = 0; i < free_nr; i++) {
> +        threads->ops->thread_request_uninit(request + 1);
> +        request = (void *)request + threads->request_size;

Despite GCC’s tolerance for it and rather lengthy debates,
pointer arithmetic on void * is illegal in C [1].

Consider using char * arithmetic, and using macros such as:

#define request_to_payload(req) (((ThreadRequest *) req) + 1)
#define payload_to_request(req) (((ThreadRequest *) req) - 1)
#define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size))

where appropriate, that would clarify the intent.

[1] https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c

> +    }
> +    g_free(thread->requests);
> +}
> +
> +static int init_thread_requests(ThreadLocal *thread)
> +{
> +    Threads *threads = thread->threads;
> +    ThreadRequest *request;
> +    int ret, i, thread_reqs_size;
> +
> +    thread_reqs_size = threads->thread_requests_nr * threads->request_size;
> +    thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES);
> +    thread->requests = g_malloc0(thread_reqs_size);
> +
> +    request = thread->requests;
> +    for (i = 0; i < threads->thread_requests_nr; i++) {
> +        ret = threads->ops->thread_request_init(request + 1);
> +        if (ret < 0) {
> +            goto exit;
> +        }
> +
> +        request->request_index = i;
> +        request->thread_index = thread->self;
> +        request = (void *)request + threads->request_size;

Pointer arithmetic on void * is illegal in C, see above.

> +    }
> +    return 0;
> +
> +exit:
> +    uninit_thread_requests(thread, i);
> +    return -1;
> +}
> +
> +static void uninit_thread_data(Threads *threads, int free_nr)
> +{
> +    ThreadLocal *thread_local = threads->per_thread_data;

thread_local is a keyword in C++11. I would avoid it as a name,
consider replacing with “per_thread_data” as in struct Threads?


> +    int i;
> +
> +    for (i = 0; i < free_nr; i++) {
> +        thread_local[i].quit = true;
> +        qemu_event_set(&thread_local[i].request_valid_ev);
> +        qemu_thread_join(&thread_local[i].thread);
> +        qemu_event_destroy(&thread_local[i].request_valid_ev);
> +        qemu_event_destroy(&thread_local[i].request_free_ev);
> +        uninit_thread_requests(&thread_local[i], threads->thread_requests_nr);
> +    }
> +}
> +
> +static int
> +init_thread_data(Threads *threads, const char *thread_name, int thread_nr)
> +{
> +    ThreadLocal *thread_local = threads->per_thread_data;
> +    char *name;
> +    int i;
> +
> +    for (i = 0; i < thread_nr; i++) {
> +        thread_local[i].threads = threads;
> +        thread_local[i].self = i;
> +
> +        if (init_thread_requests(&thread_local[i]) < 0) {
> +            goto exit;
> +        }
> +
> +        qemu_event_init(&thread_local[i].request_free_ev, false);
> +        qemu_event_init(&thread_local[i].request_valid_ev, false);
> +
> +        name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self);
> +        qemu_thread_create(&thread_local[i].thread, name,
> +                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
> +        g_free(name);
> +    }
> +    return 0;
> +
> +exit:
> +    uninit_thread_data(threads, i);
> +    return -1;
> +}
> +
> +Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
> +                                   unsigned int thread_requests_nr,
> +                                   const ThreadedWorkqueueOps *ops)
> +{
> +    Threads *threads;
> +
> +    if (threads_nr > MAX_THREAD_REQUEST_NR) {
> +        return NULL;
> +    }
> +
> +    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
> +    threads->ops = ops;
> +    threads->threads_nr = threads_nr;
> +    threads->thread_requests_nr = thread_requests_nr;
> +
> +    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
> +    threads->request_size = threads->ops->request_size;
> +    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
> +    threads->request_size += sizeof(ThreadRequest);
> +
> +    if (init_thread_data(threads, name, threads_nr) < 0) {
> +        g_free(threads);
> +        return NULL;
> +    }
> +
> +    return threads;
> +}
> +
> +void threaded_workqueue_destroy(Threads *threads)
> +{
> +    uninit_thread_data(threads, threads->threads_nr);
> +    g_free(threads);
> +}
> +
> +static void request_done(Threads *threads, ThreadRequest *request)
> +{
> +    if (!request->done) {
> +        return;
> +    }
> +
> +    threads->ops->thread_request_done(request + 1);
> +    request->done = false;
> +}
> +
> +void *threaded_workqueue_get_request(Threads *threads)
> +{
> +    ThreadRequest *request;
> +
> +    request = threads_find_free_request(threads);
> +    if (!request) {
> +        return NULL;
> +    }
> +
> +    request_done(threads, request);
> +    return request + 1;
> +}
> +
> +void threaded_workqueue_submit_request(Threads *threads, void *request)
> +{
> +    ThreadRequest *req = request - sizeof(ThreadRequest);

Pointer arithmetic on void *…

Please consider rewriting as:

	ThreadRequest *req = (ThreadRequest *) request - 1;

which achieves the same objective, is legal C, and is the symmetric
counterpart of “return request + 1” above.


> +    int thread_index = request_to_thread_index(request);
> +
> +    assert(!req->done);
> +    mark_request_valid(threads, req);
> +    threads->current_thread_index = thread_index  + 1;
> +}
> +
> +void threaded_workqueue_wait_for_requests(Threads *threads)
> +{
> +    ThreadLocal *thread;
> +    uint64_t result_bitmap;
> +    int thread_index, index = 0;
> +
> +    for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) {
> +        thread = threads->per_thread_data + thread_index;
> +        index = 0;
> +retry:
> +        qemu_event_reset(&thread->request_free_ev);
> +        result_bitmap = get_free_request_bitmap(threads, thread);
> +
> +        for (; index < threads->thread_requests_nr; index++) {
> +            if (test_bit(index, &result_bitmap)) {
> +                qemu_event_wait(&thread->request_free_ev);
> +                goto retry;
> +            }
> +
> +            request_done(threads, index_to_request(thread, index));
> +        }
> +    }
> +}
> -- 
> 2.14.5
Paolo Bonzini Nov. 27, 2018, 1:51 p.m. UTC | #14
On 27/11/18 13:49, Christophe de Dinechin wrote:
> So this is not really
> helping. Also, the ThreadLocal structure itself is not necessarily aligned
> within struct Threads. Therefore, it’s possible that “requests” for example
> could be on the same cache line as request_fill_bitmap if planets align
> the wrong way.

I think this is a bit exaggerated.  Linux and QEMU's own qht work just
fine with compile-time directives.

> In order to mitigate these effects, I would group the data that the user
> writes and the data that the thread writes, i.e. reorder declarations,
> put request_fill_bitmap and request_valid_ev together, and try
> to put them in the same cache line so that only one cache line is invalidated
> from within mark_request_valid instead of two.
> 
> Then you end up with a single alignment directive instead of 4, to
> separate requests from completions.

Yeah, I agree with this.

> That being said, I’m not sure why you use a bitmap here. What is the
> expected benefit relative to atomic lists (which would also make it really
> lock-free)?
> 

I don't think lock-free lists are easier.  Bitmaps smaller than 64
elements are both faster and easier to manage.

Paolo
Emilio Cota Nov. 27, 2018, 5:39 p.m. UTC | #15
On Tue, Nov 27, 2018 at 13:49:13 +0100, Christophe de Dinechin wrote:
> (I did not finish the review, but decided to send what I already had).
> 
> > On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote:
> > 
> > From: Xiao Guangrong <xiaoguangrong@tencent.com>
> > 
> > This modules implements the lockless and efficient threaded workqueue.
> 
> I’m not entirely convinced that it’s either “lockless” or “efficient”
> in the current iteration. I believe that it’s relatively easy to fix, though.
(snip)
> > 
> >    All the requests are pre-allocated and carefully partitioned between
> >    threads so there is no contention on the request, that make threads
> >    be parallel as much as possible.
> 
> That sentence confused me (it’s also in a comment in the text).
> I think I’m mostly confused by “there is no contention”. Perhaps you
> meant “so as to avoid contention if possible”? If there is a reason
> why there would never be any contention even if requests arrive faster than
> completions, I did not figure it out.
> 
> I personally see serious contention on the fields in the Threads structure,
> for example, but also possibly on the targets of the “modulo” operation in
> thread_find_free_request. Specifically, if three CPUs are entering
> thread_find_free_request at the same time, they will all run the same
> loop and all, presumably, “attack” the same memory locations.
> 
> Sorry if I mis-read the code, but at the moment, it does not seem to
> avoid contention as intended. I don’t see how it could without having
> some way to discriminate between CPUs to start with, which I did not find.

You might have missed that only one thread can request jobs. So contention
should only happen between that thread and the worker threads, but
not among worker threads (they should only share cache lines with the
requester thread).

> > - User, i.e, the submitter
> >    It's the one fills the request and submits it to the workqueue,
> the one -> the one who
> >    the result will be collected after it is handled by the work queue.
> > 
> >    The user can consecutively submit requests without waiting the previous
> waiting -> waiting for
> >    requests been handled.
> >    It only supports one submitter, you should do serial submission by
> >    yourself if you want more, e.g, use lock on you side.
> 
> I’m also confused by this last statement. The proposal purports
> to be “lockless”, which I read as working correctly without a lock…
> Reading the code, I indeed see issues if different threads
> try to place requests at the same time. So I believe the word
> “lockless” is a bit misleading.

ditto, it is lockless as presented here, i.e. one requester thread.

> > +static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
> > +{
> > +    Threads *threads = thread->threads;
> > +    ThreadRequest *request = thread->requests;
> > +    int i;
> > +
> > +    for (i = 0; i < free_nr; i++) {
> > +        threads->ops->thread_request_uninit(request + 1);
> > +        request = (void *)request + threads->request_size;
> 
> Despite GCC’s tolerance for it and rather lengthy debates,
> pointer arithmetic on void * is illegal in C [1].
> 
> Consider using char * arithmetic, and using macros such as:
> 
> #define request_to_payload(req) (((ThreadRequest *) req) + 1)
> #define payload_to_request(req) (((ThreadRequest *) req) - 1)
> #define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size))
> 
> where appropriate, that would clarify the intent.
> 
> [1] https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c

FWIW, we use void pointer arithmetic in other places in QEMU, so
I wouldn't worry about it being illegal.

I like those little macros though; even better as inlines.

Thanks,

		Emilio
Xiao Guangrong Nov. 28, 2018, 8:55 a.m. UTC | #16
On 11/27/18 8:49 PM, Christophe de Dinechin wrote:
> (I did not finish the review, but decided to send what I already had).
> 
> 
>> On 22 Nov 2018, at 08:20, guangrong.xiao@gmail.com wrote:
>>
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> This modules implements the lockless and efficient threaded workqueue.
> 
> I’m not entirely convinced that it’s either “lockless” or “efficient”
> in the current iteration. I believe that it’s relatively easy to fix, though.
> 

I think Emilio has already replied to your concern why it is "lockless". :)

>>
>> Three abstracted objects are used in this module:
>> - Request.
>>      It not only contains the data that the workqueue fetches out
>>     to finish the request but also offers the space to save the result
>>     after the workqueue handles the request.
>>
>>     It's flowed between user and workqueue. The user fills the request
>>     data into it when it is owned by user. After it is submitted to the
>>     workqueue, the workqueue fetched data out and save the result into
>>     it after the request is handled.
> 
> fetched -> fetches
> save -> saves

Will fix... My English is even worse than C. :(

>> +
>> +/*
>> + * find a free request where the user can store the data that is needed to
>> + * finish the request
>> + *
>> + * If all requests are used up, return NULL
>> + */
>> +void *threaded_workqueue_get_request(Threads *threads);
> 
> Using void * to represent the payload makes it easy to get
> the wrong pointer in there without the compiler noticing.
> Consider adding a type for the payload?
> 

Another option could be taken is exporting the ThreadRequest to the user
and it's put at the very beginning in the user-defined data struct.

However, it will export the internal designed things to the user, i am
not sure it is a good idea...

>> + *
>> + * Author:
>> + *   Xiao Guangrong <xiaoguangrong@tencent.com>
>> + *
>> + * Copyright(C) 2018 Tencent Corporation.
>> + *
>> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
>> + * See the COPYING.LIB file in the top-level directory.
>> + */
>> +
>> +#include "qemu/osdep.h"
>> +#include "qemu/bitmap.h"
>> +#include "qemu/threaded-workqueue.h"
>> +
>> +#define SMP_CACHE_BYTES 64
> 
> +1 on comments already made by others

Will improve it.

> 
>> +
>> +/*
>> + * the request representation which contains the internally used mete data,
> 
> mete -> meta

Will fix.

> 
>> + * it is the header of user-defined data.
>> + *
>> + * It should be aligned to the nature size of CPU.
>> + */
>> +struct ThreadRequest {
>> +    /*
>> +     * the request has been handled by the thread and need the user
>> +     * to fetch result out.
>> +     */
>> +    uint8_t done;
>> +
>> +    /*
>> +     * the index to Thread::requests.
>> +     * Save it to the padding space although it can be calculated at runtime.
>> +     */
>> +    uint8_t request_index;
> 
> So no more than 256?
> 
> This is blocked by MAX_THREAD_REQUEST_NR test at the beginning
> of threaded_workqueue_create, but I would make it more explicit either
> with a compile-time assert that MAX_THREAD_REQUEST_NR is
> below UINT8_MAX, or by adding a second test for UINT8_MAX in
> threaded_workqueue_create.

It's good to me.

I prefer the former one that "compile-time assert that MAX_THREAD_REQUEST_NR
is below UINT8_MAX"

> 
> Also, an obvious extension would be to make bitmaps into arrays.
> 
> Do you think someone would want to use the package to assign
> requests per CPU or per VCPU? If so, that could quickly go above 64.
> 

Well... it specifies the depth of each single thread, it has negative
affection if larger depth is used, as it causes
threaded_workqueue_wait_for_requests() too slow, at that point, the
user needs to wait all the threads to exhaust all its requests.

Another impact is that u64 is more efficient than bitmaps, we can see
it from the performance data:
    https://ibb.co/hq7u5V

Based on those, i think 64 should be enough, at least for the present
user, migration thread.

> 
>> +
>> +    /* the index to Threads::per_thread_data */
>> +    unsigned int thread_index;
> 
> Don’t you want to use a size_t for that?

size_t is 8 bytes... i'd like to make the request header more tiny...

> 
>> +} QEMU_ALIGNED(sizeof(unsigned long));
> 
> Nit: the alignment type is inconsistent with that given
> to QEMU_BUILD_BUG_ON in threaded_workqueue_create.
> (long vs. unsigned long).
> 

Yup, will make them consistent.

> Also, why is the alignment required? Aren’t you more interested
> in cache-line alignment?
> 

ThreadRequest actually is the header put at the very beginning of
the request. If is not aligned to "long", the user-defined data
struct could be accessed without properly aligned.

> 
>> +typedef struct ThreadRequest ThreadRequest;
> 
> 
>> +
>> +struct ThreadLocal {
>> +    struct Threads *threads;
>> +
>> +    /* the index of the thread */
>> +    int self;
> 
> Why signed?

Mistake, will fix.

> 
>> +
>> +    /* thread is useless and needs to exit */
>> +    bool quit;
>> +
>> +    QemuThread thread;
>> +
>> +    void *requests;
>> +
>> +   /*
>> +     * the bit in these two bitmaps indicates the index of the @requests
>> +     * respectively. If it's the same, the corresponding request is free
>> +     * and owned by the user, i.e, where the user fills a request. Otherwise,
>> +     * it is valid and owned by the thread, i.e, where the thread fetches
>> +     * the request and write the result.
>> +     */
>> +
>> +    /* after the user fills the request, the bit is flipped. */
>> +    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
> 
> I believe you are trying to ensure that data accessed from multiple CPUs
> is on different cache lines. As others have pointed out, the real value for
> SMP_CACHE_BYTES can only be known at run-time. So this is not really
> helping. Also, the ThreadLocal structure itself is not necessarily aligned
> within struct Threads. Therefore, it’s possible that “requests” for example
> could be on the same cache line as request_fill_bitmap if planets align
> the wrong way.
> 
> In order to mitigate these effects, I would group the data that the user
> writes and the data that the thread writes, i.e. reorder declarations,
> put request_fill_bitmap and request_valid_ev together, and try
> to put them in the same cache line so that only one cache line is invalidated
> from within mark_request_valid instead of two.
> 

However, QemuEvent is atomically updated at both sides, it is not good to mix
it with other fields, isn't?


> Then you end up with a single alignment directive instead of 4, to
> separate requests from completions.
> 
> That being said, I’m not sure why you use a bitmap here. What is the
> expected benefit relative to atomic lists (which would also make it really
> lock-free)?
> 

I agree Paolo's comments in another mail. :)

> 
>> +
>> +/*
>> + * the main data struct represents multithreads which is shared by
>> + * all threads
>> + */
>> +struct Threads {
>> +    /* the request header, ThreadRequest, is contained */
>> +    unsigned int request_size;
> 
> size_t?

Please see the comments above about "unsigned int thread_index;" in
ThreadRequest.

>> +/*
>> + * free request: the request is not used by any thread, however, it might
>> + *   contain the result need the user to call thread_request_done()
> 
> might contain the result -> might still contain the result
> result need the user to call -> result. The user needs to call
> 

Will fix.

>> +static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
>> +{
>> +    Threads *threads = thread->threads;
>> +    ThreadRequest *request = thread->requests;
>> +    int i;
>> +
>> +    for (i = 0; i < free_nr; i++) {
>> +        threads->ops->thread_request_uninit(request + 1);
>> +        request = (void *)request + threads->request_size;
> 
> Despite GCC’s tolerance for it and rather lengthy debates,
> pointer arithmetic on void * is illegal in C [1].
> 
> Consider using char * arithmetic, and using macros such as:
> 
> #define request_to_payload(req) (((ThreadRequest *) req) + 1)
> #define payload_to_request(req) (((ThreadRequest *) req) - 1)
> #define request_to_next(req,threads) ((ThreadRequest *) ((char *) req) + threads->request_size))
> 

These definitions are really nice, will use them instead.

>> +static void uninit_thread_data(Threads *threads, int free_nr)
>> +{
>> +    ThreadLocal *thread_local = threads->per_thread_data;
> 
> thread_local is a keyword in C++11. I would avoid it as a name,
> consider replacing with “per_thread_data” as in struct Threads?
> 

Sure, it's good to me.

>> +void threaded_workqueue_submit_request(Threads *threads, void *request)
>> +{
>> +    ThreadRequest *req = request - sizeof(ThreadRequest);
> 
> Pointer arithmetic on void *…
> 
> Please consider rewriting as:
> 
> 	ThreadRequest *req = (ThreadRequest *) request - 1;
> 
> which achieves the same objective, is legal C, and is the symmetric
> counterpart of “return request + 1” above.
> 

It's nice, indeed.
Christophe de Dinechin Dec. 4, 2018, 3:49 p.m. UTC | #17
> On 27 Nov 2018, at 14:51, Paolo Bonzini <pbonzini@redhat.com> wrote:
> 
> On 27/11/18 13:49, Christophe de Dinechin wrote:
>> So this is not really
>> helping. Also, the ThreadLocal structure itself is not necessarily aligned
>> within struct Threads. Therefore, it’s possible that “requests” for example
>> could be on the same cache line as request_fill_bitmap if planets align
>> the wrong way.
> 
> I think this is a bit exaggerated.

Hence my “if planets align the wrong way” :-)

But I understand that my wording came out too strong. My apologies.

I think the fix is to align ThreadLocal as well.


>  Linux and QEMU's own qht work just fine with compile-time directives.

Wouldn’t it work fine without any compile-time directive at all?
Alignment is just a performance optimization.

> 
>> In order to mitigate these effects, I would group the data that the user
>> writes and the data that the thread writes, i.e. reorder declarations,
>> put request_fill_bitmap and request_valid_ev together, and try
>> to put them in the same cache line so that only one cache line is invalidated
>> from within mark_request_valid instead of two.
>> 
>> Then you end up with a single alignment directive instead of 4, to
>> separate requests from completions.
> 
> Yeah, I agree with this.
> 
>> That being said, I’m not sure why you use a bitmap here. What is the
>> expected benefit relative to atomic lists (which would also make it really
>> lock-free)?
>> 
> 
> I don't think lock-free lists are easier.  Bitmaps smaller than 64
> elements are both faster and easier to manage.

I believe that this is only true if you use a linked list for both freelist
management and for thread notification (i.e. to replace the bitmaps).
However, if you use an atomic list only for the free list, and keep
bitmaps for signaling, then performance is at least equal, often better.
Plus you get the added benefit of having a thread-safe API, i.e.
something that is truly lock-free.

I did a small experiment to test / prove this. Last commit on branch:
https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue
Take with a grain of salt, microbenchmarks are always suspect ;-)

The code in “thread_test.c” includes Xiao’s code with two variations,
plus some testing code lifted from the flight recorder library.
1. The FREE_LIST variation (sl_test) is what I would like to propose.
2. The BITMAP variation (bm_test) is the baseline
3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach

To run it, you need to do “make opt-test”, then run “test_script”
which outputs a CSV file. The summary of my findings testing on
a ThinkPad, a Xeon machine and a MacBook is here:
https://imgur.com/a/4HmbB9K

Overall, the proposed approach:

- makes the API thread safe and lock free, addressing the one
drawback that Xiao was mentioning.

- delivers up to 30% more requests on the Macbook, while being
“within noise” (sometimes marginally better) for the other two.
I suspect an optimization opportunity found by clang, because
the Macbook delivers really high numbers.

- spends less time blocking when all threads are busy, which
accounts for the higher number of client loops.

If you think that makes sense, then either Xiao can adapt the code
from the branch above, or I can send a follow-up patch.


Thanks
Christophe
Paolo Bonzini Dec. 4, 2018, 5:16 p.m. UTC | #18
On 04/12/18 16:49, Christophe de Dinechin wrote:
>>  Linux and QEMU's own qht work just fine with compile-time directives.
> 
> Wouldn’t it work fine without any compile-time directive at all?

Yes, that's what I meant.  Though there are certainly cases in which the
difference without proper cacheline alignment is an order of magnitude
less throughput or something like that; it would certainly be noticeable.

>> I don't think lock-free lists are easier.  Bitmaps smaller than 64
>> elements are both faster and easier to manage.
> 
> I believe that this is only true if you use a linked list for both freelist
> management and for thread notification (i.e. to replace the bitmaps).
> However, if you use an atomic list only for the free list, and keep
> bitmaps for signaling, then performance is at least equal, often better.
> Plus you get the added benefit of having a thread-safe API, i.e.
> something that is truly lock-free.
> 
> I did a small experiment to test / prove this. Last commit on branch:
> https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue
> Take with a grain of salt, microbenchmarks are always suspect ;-)
> 
> The code in “thread_test.c” includes Xiao’s code with two variations,
> plus some testing code lifted from the flight recorder library.
> 1. The FREE_LIST variation (sl_test) is what I would like to propose.
> 2. The BITMAP variation (bm_test) is the baseline
> 3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach
> 
> To run it, you need to do “make opt-test”, then run “test_script”
> which outputs a CSV file. The summary of my findings testing on
> a ThinkPad, a Xeon machine and a MacBook is here:
> https://imgur.com/a/4HmbB9K
> 
> Overall, the proposed approach:
> 
> - makes the API thread safe and lock free, addressing the one
> drawback that Xiao was mentioning.
> 
> - delivers up to 30% more requests on the Macbook, while being
> “within noise” (sometimes marginally better) for the other two.
> I suspect an optimization opportunity found by clang, because
> the Macbook delivers really high numbers.
> 
> - spends less time blocking when all threads are busy, which
> accounts for the higher number of client loops.
> 
> If you think that makes sense, then either Xiao can adapt the code
> from the branch above, or I can send a follow-up patch.

Having a follow-up patch would be best I think.  Thanks for
experimenting with this, it's always fun stuff. :)

Paolo
Xiao Guangrong Dec. 10, 2018, 3:23 a.m. UTC | #19
On 12/5/18 1:16 AM, Paolo Bonzini wrote:
> On 04/12/18 16:49, Christophe de Dinechin wrote:
>>>   Linux and QEMU's own qht work just fine with compile-time directives.
>>
>> Wouldn’t it work fine without any compile-time directive at all?
> 
> Yes, that's what I meant.  Though there are certainly cases in which the
> difference without proper cacheline alignment is an order of magnitude
> less throughput or something like that; it would certainly be noticeable.
> 
>>> I don't think lock-free lists are easier.  Bitmaps smaller than 64
>>> elements are both faster and easier to manage.
>>
>> I believe that this is only true if you use a linked list for both freelist
>> management and for thread notification (i.e. to replace the bitmaps).
>> However, if you use an atomic list only for the free list, and keep
>> bitmaps for signaling, then performance is at least equal, often better.
>> Plus you get the added benefit of having a thread-safe API, i.e.
>> something that is truly lock-free.
>>
>> I did a small experiment to test / prove this. Last commit on branch:
>> https://github.com/c3d/recorder/commits/181122-xiao_guangdong_introduce-threaded-workqueue
>> Take with a grain of salt, microbenchmarks are always suspect ;-)
>>
>> The code in “thread_test.c” includes Xiao’s code with two variations,
>> plus some testing code lifted from the flight recorder library.
>> 1. The FREE_LIST variation (sl_test) is what I would like to propose.
>> 2. The BITMAP variation (bm_test) is the baseline
>> 3. The DOUBLE_LIST variation (ll_test) is the slow double-list approach
>>
>> To run it, you need to do “make opt-test”, then run “test_script”
>> which outputs a CSV file. The summary of my findings testing on
>> a ThinkPad, a Xeon machine and a MacBook is here:
>> https://imgur.com/a/4HmbB9K
>>
>> Overall, the proposed approach:
>>
>> - makes the API thread safe and lock free, addressing the one
>> drawback that Xiao was mentioning.
>>
>> - delivers up to 30% more requests on the Macbook, while being
>> “within noise” (sometimes marginally better) for the other two.
>> I suspect an optimization opportunity found by clang, because
>> the Macbook delivers really high numbers.
>>
>> - spends less time blocking when all threads are busy, which
>> accounts for the higher number of client loops.
>>
>> If you think that makes sense, then either Xiao can adapt the code
>> from the branch above, or I can send a follow-up patch.
> 
> Having a follow-up patch would be best I think.  Thanks for
> experimenting with this, it's always fun stuff. :)
> 

Yup, Christophe, please post the follow-up patches and add yourself
to the author list if you like. I am looking forward to it. :)

Thanks!
diff mbox series

Patch

diff --git a/include/qemu/threaded-workqueue.h b/include/qemu/threaded-workqueue.h
new file mode 100644
index 0000000000..e0ede496d0
--- /dev/null
+++ b/include/qemu/threaded-workqueue.h
@@ -0,0 +1,106 @@ 
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_THREADED_WORKQUEUE_H
+#define QEMU_THREADED_WORKQUEUE_H
+
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+
+/*
+ * This modules implements the lockless and efficient threaded workqueue.
+ *
+ * Three abstracted objects are used in this module:
+ * - Request.
+ *   It not only contains the data that the workqueue fetches out
+ *   to finish the request but also offers the space to save the result
+ *   after the workqueue handles the request.
+ *
+ *   It's flowed between user and workqueue. The user fills the request
+ *   data into it when it is owned by user. After it is submitted to the
+ *   workqueue, the workqueue fetched data out and save the result into
+ *   it after the request is handled.
+ *
+ *   All the requests are pre-allocated and carefully partitioned between
+ *   threads so there is no contention on the request, that make threads
+ *   be parallel as much as possible.
+ *
+ * - User, i.e, the submitter
+ *   It's the one fills the request and submits it to the workqueue,
+ *   the result will be collected after it is handled by the work queue.
+ *
+ *   The user can consecutively submit requests without waiting the previous
+ *   requests been handled.
+ *   It only supports one submitter, you should do serial submission by
+ *   yourself if you want more, e.g, use lock on you side.
+ *
+ * - Workqueue, i.e, thread
+ *   Each workqueue is represented by a running thread that fetches
+ *   the request submitted by the user, do the specified work and save
+ *   the result to the request.
+ */
+
+typedef struct Threads Threads;
+
+struct ThreadedWorkqueueOps {
+    /* constructor of the request */
+    int (*thread_request_init)(void *request);
+    /*  destructor of the request */
+    void (*thread_request_uninit)(void *request);
+
+    /* the handler of the request that is called by the thread */
+    void (*thread_request_handler)(void *request);
+    /* called by the user after the request has been handled */
+    void (*thread_request_done)(void *request);
+
+    size_t request_size;
+};
+typedef struct ThreadedWorkqueueOps ThreadedWorkqueueOps;
+
+/* the default number of requests that thread need handle */
+#define DEFAULT_THREAD_REQUEST_NR 4
+/* the max number of requests that thread need handle */
+#define MAX_THREAD_REQUEST_NR     (sizeof(uint64_t) * BITS_PER_BYTE)
+
+/*
+ * create a threaded queue. Other APIs will work on the Threads it returned
+ *
+ * @name: the identity of the workqueue which is used to construct the name
+ *    of threads only
+ * @threads_nr: the number of threads that the workqueue will create
+ * @thread_requests_nr: the number of requests that each single thread will
+ *    handle
+ * @ops: the handlers of the request
+ *
+ * Return NULL if it failed
+ */
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   unsigned int thread_requests_nr,
+                                   const ThreadedWorkqueueOps *ops);
+void threaded_workqueue_destroy(Threads *threads);
+
+/*
+ * find a free request where the user can store the data that is needed to
+ * finish the request
+ *
+ * If all requests are used up, return NULL
+ */
+void *threaded_workqueue_get_request(Threads *threads);
+/* submit the request and notify the thread */
+void threaded_workqueue_submit_request(Threads *threads, void *request);
+
+/*
+ * wait all threads to complete the request to make sure there is no
+ * previous request exists
+ */
+void threaded_workqueue_wait_for_requests(Threads *threads);
+#endif
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 0820923c18..f26dfe5182 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -50,5 +50,6 @@  util-obj-y += range.o
 util-obj-y += stats64.o
 util-obj-y += systemd.o
 util-obj-y += iova-tree.o
+util-obj-y += threaded-workqueue.o
 util-obj-$(CONFIG_LINUX) += vfio-helpers.o
 util-obj-$(CONFIG_OPENGL) += drm.o
diff --git a/util/threaded-workqueue.c b/util/threaded-workqueue.c
new file mode 100644
index 0000000000..2ab37cee8d
--- /dev/null
+++ b/util/threaded-workqueue.c
@@ -0,0 +1,463 @@ 
+/*
+ * Lockless and Efficient Threaded Workqueue Abstraction
+ *
+ * Author:
+ *   Xiao Guangrong <xiaoguangrong@tencent.com>
+ *
+ * Copyright(C) 2018 Tencent Corporation.
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/bitmap.h"
+#include "qemu/threaded-workqueue.h"
+
+#define SMP_CACHE_BYTES 64
+
+/*
+ * the request representation which contains the internally used mete data,
+ * it is the header of user-defined data.
+ *
+ * It should be aligned to the nature size of CPU.
+ */
+struct ThreadRequest {
+    /*
+     * the request has been handled by the thread and need the user
+     * to fetch result out.
+     */
+    uint8_t done;
+
+    /*
+     * the index to Thread::requests.
+     * Save it to the padding space although it can be calculated at runtime.
+     */
+    uint8_t request_index;
+
+    /* the index to Threads::per_thread_data */
+    unsigned int thread_index;
+} QEMU_ALIGNED(sizeof(unsigned long));
+typedef struct ThreadRequest ThreadRequest;
+
+struct ThreadLocal {
+    struct Threads *threads;
+
+    /* the index of the thread */
+    int self;
+
+    /* thread is useless and needs to exit */
+    bool quit;
+
+    QemuThread thread;
+
+    void *requests;
+
+   /*
+     * the bit in these two bitmaps indicates the index of the @requests
+     * respectively. If it's the same, the corresponding request is free
+     * and owned by the user, i.e, where the user fills a request. Otherwise,
+     * it is valid and owned by the thread, i.e, where the thread fetches
+     * the request and write the result.
+     */
+
+    /* after the user fills the request, the bit is flipped. */
+    uint64_t request_fill_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+    /* after handles the request, the thread flips the bit. */
+    uint64_t request_done_bitmap QEMU_ALIGNED(SMP_CACHE_BYTES);
+
+    /*
+     * the event used to wake up the thread whenever a valid request has
+     * been submitted
+     */
+    QemuEvent request_valid_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+
+    /*
+     * the event is notified whenever a request has been completed
+     * (i.e, become free), which is used to wake up the user
+     */
+    QemuEvent request_free_ev QEMU_ALIGNED(SMP_CACHE_BYTES);
+};
+typedef struct ThreadLocal ThreadLocal;
+
+/*
+ * the main data struct represents multithreads which is shared by
+ * all threads
+ */
+struct Threads {
+    /* the request header, ThreadRequest, is contained */
+    unsigned int request_size;
+    unsigned int thread_requests_nr;
+    unsigned int threads_nr;
+
+    /* the request is pushed to the thread with round-robin manner */
+    unsigned int current_thread_index;
+
+    const ThreadedWorkqueueOps *ops;
+
+    ThreadLocal per_thread_data[0];
+};
+typedef struct Threads Threads;
+
+static ThreadRequest *index_to_request(ThreadLocal *thread, int request_index)
+{
+    ThreadRequest *request;
+
+    request = thread->requests + request_index * thread->threads->request_size;
+    assert(request->request_index == request_index);
+    assert(request->thread_index == thread->self);
+    return request;
+}
+
+static int request_to_index(ThreadRequest *request)
+{
+    return request->request_index;
+}
+
+static int request_to_thread_index(ThreadRequest *request)
+{
+    return request->thread_index;
+}
+
+/*
+ * free request: the request is not used by any thread, however, it might
+ *   contain the result need the user to call thread_request_done()
+ *
+ * valid request: the request contains the request data and it's committed
+ *   to the thread, i,e. it's owned by thread.
+ */
+static uint64_t get_free_request_bitmap(Threads *threads, ThreadLocal *thread)
+{
+    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
+
+    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
+    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
+    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
+               threads->thread_requests_nr);
+
+    /*
+     * paired with smp_wmb() in mark_request_free() to make sure that we
+     * read request_done_bitmap before fetching the result out.
+     */
+    smp_rmb();
+
+    return result_bitmap;
+}
+
+static ThreadRequest
+*find_thread_free_request(Threads *threads, ThreadLocal *thread)
+{
+    uint64_t result_bitmap = get_free_request_bitmap(threads, thread);
+    int index;
+
+    index  = find_first_zero_bit(&result_bitmap, threads->thread_requests_nr);
+    if (index >= threads->thread_requests_nr) {
+        return NULL;
+    }
+
+    return index_to_request(thread, index);
+}
+
+static ThreadRequest *threads_find_free_request(Threads *threads)
+{
+    ThreadLocal *thread;
+    ThreadRequest *request;
+    int cur_thread, thread_index;
+
+    cur_thread = threads->current_thread_index % threads->threads_nr;
+    thread_index = cur_thread;
+    do {
+        thread = threads->per_thread_data + thread_index++;
+        request = find_thread_free_request(threads, thread);
+        if (request) {
+            break;
+        }
+        thread_index %= threads->threads_nr;
+    } while (thread_index != cur_thread);
+
+    return request;
+}
+
+/*
+ * the change bit operation combined with READ_ONCE and WRITE_ONCE which
+ * only works on single uint64_t width
+ */
+static void change_bit_once(long nr, uint64_t *addr)
+{
+    uint64_t value = atomic_rcu_read(addr) ^ BIT_MASK(nr);
+
+    atomic_rcu_set(addr, value);
+}
+
+static void mark_request_valid(Threads *threads, ThreadRequest *request)
+{
+    int thread_index = request_to_thread_index(request);
+    int request_index = request_to_index(request);
+    ThreadLocal *thread = threads->per_thread_data + thread_index;
+
+    /*
+     * paired with smp_rmb() in find_first_valid_request_index() to make
+     * sure the request has been filled before the bit is flipped that
+     * will make the request be visible to the thread
+     */
+    smp_wmb();
+
+    change_bit_once(request_index, &thread->request_fill_bitmap);
+    qemu_event_set(&thread->request_valid_ev);
+}
+
+static int thread_find_first_valid_request_index(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    uint64_t request_fill_bitmap, request_done_bitmap, result_bitmap;
+    int index;
+
+    request_fill_bitmap = atomic_rcu_read(&thread->request_fill_bitmap);
+    request_done_bitmap = atomic_rcu_read(&thread->request_done_bitmap);
+    bitmap_xor(&result_bitmap, &request_fill_bitmap, &request_done_bitmap,
+               threads->thread_requests_nr);
+    /*
+     * paired with smp_wmb() in mark_request_valid() to make sure that
+     * we read request_fill_bitmap before fetch the request out.
+     */
+    smp_rmb();
+
+    index = find_first_bit(&result_bitmap, threads->thread_requests_nr);
+    return index >= threads->thread_requests_nr ? -1 : index;
+}
+
+static void mark_request_free(ThreadLocal *thread, ThreadRequest *request)
+{
+    int index = request_to_index(request);
+
+    /*
+     * smp_wmb() is implied in change_bit_atomic() that is paired with
+     * smp_rmb() in get_free_request_bitmap() to make sure the result
+     * has been saved before the bit is flipped.
+     */
+    change_bit_atomic(index, &thread->request_done_bitmap);
+    qemu_event_set(&thread->request_free_ev);
+}
+
+/* retry to see if there is available request before actually go to wait. */
+#define BUSY_WAIT_COUNT 1000
+
+static ThreadRequest *
+thread_busy_wait_for_request(ThreadLocal *thread)
+{
+    int index, count = 0;
+
+    for (count = 0; count < BUSY_WAIT_COUNT; count++) {
+        index = thread_find_first_valid_request_index(thread);
+        if (index >= 0) {
+            return index_to_request(thread, index);
+        }
+
+        cpu_relax();
+    }
+
+    return NULL;
+}
+
+static void *thread_run(void *opaque)
+{
+    ThreadLocal *self_data = (ThreadLocal *)opaque;
+    Threads *threads = self_data->threads;
+    void (*handler)(void *request) = threads->ops->thread_request_handler;
+    ThreadRequest *request;
+
+    for ( ; !atomic_read(&self_data->quit); ) {
+        qemu_event_reset(&self_data->request_valid_ev);
+
+        request = thread_busy_wait_for_request(self_data);
+        if (!request) {
+            qemu_event_wait(&self_data->request_valid_ev);
+            continue;
+        }
+
+        assert(!request->done);
+
+        handler(request + 1);
+        request->done = true;
+        mark_request_free(self_data, request);
+    }
+
+    return NULL;
+}
+
+static void uninit_thread_requests(ThreadLocal *thread, int free_nr)
+{
+    Threads *threads = thread->threads;
+    ThreadRequest *request = thread->requests;
+    int i;
+
+    for (i = 0; i < free_nr; i++) {
+        threads->ops->thread_request_uninit(request + 1);
+        request = (void *)request + threads->request_size;
+    }
+    g_free(thread->requests);
+}
+
+static int init_thread_requests(ThreadLocal *thread)
+{
+    Threads *threads = thread->threads;
+    ThreadRequest *request;
+    int ret, i, thread_reqs_size;
+
+    thread_reqs_size = threads->thread_requests_nr * threads->request_size;
+    thread_reqs_size = QEMU_ALIGN_UP(thread_reqs_size, SMP_CACHE_BYTES);
+    thread->requests = g_malloc0(thread_reqs_size);
+
+    request = thread->requests;
+    for (i = 0; i < threads->thread_requests_nr; i++) {
+        ret = threads->ops->thread_request_init(request + 1);
+        if (ret < 0) {
+            goto exit;
+        }
+
+        request->request_index = i;
+        request->thread_index = thread->self;
+        request = (void *)request + threads->request_size;
+    }
+    return 0;
+
+exit:
+    uninit_thread_requests(thread, i);
+    return -1;
+}
+
+static void uninit_thread_data(Threads *threads, int free_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    int i;
+
+    for (i = 0; i < free_nr; i++) {
+        thread_local[i].quit = true;
+        qemu_event_set(&thread_local[i].request_valid_ev);
+        qemu_thread_join(&thread_local[i].thread);
+        qemu_event_destroy(&thread_local[i].request_valid_ev);
+        qemu_event_destroy(&thread_local[i].request_free_ev);
+        uninit_thread_requests(&thread_local[i], threads->thread_requests_nr);
+    }
+}
+
+static int
+init_thread_data(Threads *threads, const char *thread_name, int thread_nr)
+{
+    ThreadLocal *thread_local = threads->per_thread_data;
+    char *name;
+    int i;
+
+    for (i = 0; i < thread_nr; i++) {
+        thread_local[i].threads = threads;
+        thread_local[i].self = i;
+
+        if (init_thread_requests(&thread_local[i]) < 0) {
+            goto exit;
+        }
+
+        qemu_event_init(&thread_local[i].request_free_ev, false);
+        qemu_event_init(&thread_local[i].request_valid_ev, false);
+
+        name = g_strdup_printf("%s/%d", thread_name, thread_local[i].self);
+        qemu_thread_create(&thread_local[i].thread, name,
+                           thread_run, &thread_local[i], QEMU_THREAD_JOINABLE);
+        g_free(name);
+    }
+    return 0;
+
+exit:
+    uninit_thread_data(threads, i);
+    return -1;
+}
+
+Threads *threaded_workqueue_create(const char *name, unsigned int threads_nr,
+                                   unsigned int thread_requests_nr,
+                                   const ThreadedWorkqueueOps *ops)
+{
+    Threads *threads;
+
+    if (threads_nr > MAX_THREAD_REQUEST_NR) {
+        return NULL;
+    }
+
+    threads = g_malloc0(sizeof(*threads) + threads_nr * sizeof(ThreadLocal));
+    threads->ops = ops;
+    threads->threads_nr = threads_nr;
+    threads->thread_requests_nr = thread_requests_nr;
+
+    QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(sizeof(ThreadRequest), sizeof(long)));
+    threads->request_size = threads->ops->request_size;
+    threads->request_size = QEMU_ALIGN_UP(threads->request_size, sizeof(long));
+    threads->request_size += sizeof(ThreadRequest);
+
+    if (init_thread_data(threads, name, threads_nr) < 0) {
+        g_free(threads);
+        return NULL;
+    }
+
+    return threads;
+}
+
+void threaded_workqueue_destroy(Threads *threads)
+{
+    uninit_thread_data(threads, threads->threads_nr);
+    g_free(threads);
+}
+
+static void request_done(Threads *threads, ThreadRequest *request)
+{
+    if (!request->done) {
+        return;
+    }
+
+    threads->ops->thread_request_done(request + 1);
+    request->done = false;
+}
+
+void *threaded_workqueue_get_request(Threads *threads)
+{
+    ThreadRequest *request;
+
+    request = threads_find_free_request(threads);
+    if (!request) {
+        return NULL;
+    }
+
+    request_done(threads, request);
+    return request + 1;
+}
+
+void threaded_workqueue_submit_request(Threads *threads, void *request)
+{
+    ThreadRequest *req = request - sizeof(ThreadRequest);
+    int thread_index = request_to_thread_index(request);
+
+    assert(!req->done);
+    mark_request_valid(threads, req);
+    threads->current_thread_index = thread_index  + 1;
+}
+
+void threaded_workqueue_wait_for_requests(Threads *threads)
+{
+    ThreadLocal *thread;
+    uint64_t result_bitmap;
+    int thread_index, index = 0;
+
+    for (thread_index = 0; thread_index < threads->threads_nr; thread_index++) {
+        thread = threads->per_thread_data + thread_index;
+        index = 0;
+retry:
+        qemu_event_reset(&thread->request_free_ev);
+        result_bitmap = get_free_request_bitmap(threads, thread);
+
+        for (; index < threads->thread_requests_nr; index++) {
+            if (test_bit(index, &result_bitmap)) {
+                qemu_event_wait(&thread->request_free_ev);
+                goto retry;
+            }
+
+            request_done(threads, index_to_request(thread, index));
+        }
+    }
+}