diff mbox series

[v2,05/17] thread-pool: Implement non-AIO (generic) pool support

Message ID 54947c3a1df713f5b69d8296938f3da41116ffe0.1724701542.git.maciej.szmigiero@oracle.com (mailing list archive)
State New, archived
Headers show
Series Multifd | expand

Commit Message

Maciej S. Szmigiero Aug. 27, 2024, 5:54 p.m. UTC
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Migration code wants to manage device data sending threads in one place.

QEMU has an existing thread pool implementation, however it was limited
to queuing AIO operations only and essentially had a 1:1 mapping between
the current AioContext and the ThreadPool in use.

Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
too.

This brings a few new operations on a pool:
* thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
thread count in the pool.

* thread_pool_join() operation waits until all the submitted work requests
have finished.

* thread_pool_poll() lets the new thread and / or thread completion bottom
halves run (if they are indeed scheduled to be run).
It is useful for thread pool users that need to launch or terminate new
threads without returning to the QEMU main loop.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 include/block/thread-pool.h   | 10 ++++-
 tests/unit/test-thread-pool.c |  2 +-
 util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
 3 files changed, 76 insertions(+), 13 deletions(-)

Comments

Fabiano Rosas Sept. 2, 2024, 10:07 p.m. UTC | #1
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it was limited
> to queuing AIO operations only and essentially had a 1:1 mapping between
> the current AioContext and the ThreadPool in use.
>
> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> too.
>
> This brings a few new operations on a pool:
> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> thread count in the pool.
>
> * thread_pool_join() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_poll() lets the new thread and / or thread completion bottom
> halves run (if they are indeed scheduled to be run).
> It is useful for thread pool users that need to launch or terminate new
> threads without returning to the QEMU main loop.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>  include/block/thread-pool.h   | 10 ++++-
>  tests/unit/test-thread-pool.c |  2 +-
>  util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>  3 files changed, 76 insertions(+), 13 deletions(-)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index b484c4780ea6..1769496056cd 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>                                     void *arg, GDestroyNotify arg_destroy,
>                                     BlockCompletionFunc *cb, void *opaque);
>  int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> -void thread_pool_submit(ThreadPoolFunc *func,
> -                        void *arg, GDestroyNotify arg_destroy);
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> +                               void *arg, GDestroyNotify arg_destroy,
> +                               BlockCompletionFunc *cb, void *opaque);

These kinds of changes (create wrappers, change signatures, etc), could
be in their own patch as it's just code motion that should not have
functional impact. The "no_requests" stuff would be better discussed in
a separate patch.

>  
> +void thread_pool_join(ThreadPool *pool);
> +void thread_pool_poll(ThreadPool *pool);
> +
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> +                                    int min_threads, int max_threads);
>  void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>  
>  #endif
> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
> index e4afb9e36292..469c0f7057b6 100644
> --- a/tests/unit/test-thread-pool.c
> +++ b/tests/unit/test-thread-pool.c
> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>  static void test_submit(void)
>  {
>      WorkerTestData data = { .n = 0 };
> -    thread_pool_submit(worker_cb, &data, NULL);
> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>      while (data.n == 0) {
>          aio_poll(ctx, true);
>      }
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 69a87ee79252..2bf3be875a51 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -60,6 +60,7 @@ struct ThreadPool {
>      QemuMutex lock;
>      QemuCond worker_stopped;
>      QemuCond request_cond;
> +    QemuCond no_requests_cond;
>      QEMUBH *new_thread_bh;
>  
>      /* The following variables are only accessed from one AioContext. */
> @@ -73,6 +74,7 @@ struct ThreadPool {
>      int pending_threads; /* threads created but not running yet */
>      int min_threads;
>      int max_threads;
> +    size_t requests_executing;

What's with size_t? Should this be a uint32_t instead?

>  };
>  
>  static void *worker_thread(void *opaque)
> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>          req = QTAILQ_FIRST(&pool->request_list);
>          QTAILQ_REMOVE(&pool->request_list, req, reqs);
>          req->state = THREAD_ACTIVE;
> +
> +        assert(pool->requests_executing < SIZE_MAX);
> +        pool->requests_executing++;
> +
>          qemu_mutex_unlock(&pool->lock);
>  
>          ret = req->func(req->arg);
> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>  
>          qemu_bh_schedule(pool->completion_bh);
>          qemu_mutex_lock(&pool->lock);
> +
> +        assert(pool->requests_executing > 0);
> +        pool->requests_executing--;
> +
> +        if (pool->requests_executing == 0 &&
> +            QTAILQ_EMPTY(&pool->request_list)) {
> +            qemu_cond_signal(&pool->no_requests_cond);
> +        }

An empty requests list and no request in flight means the worker will
now exit after the timeout, no? Can you just kick the worker out of the
wait and use pool->worker_stopped instead of the new condition variable?

>      }
>  
>      pool->cur_threads--;
> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>      .cancel_async       = thread_pool_cancel,
>  };
>  
> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> -                                   void *arg, GDestroyNotify arg_destroy,
> -                                   BlockCompletionFunc *cb, void *opaque)
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> +                               void *arg, GDestroyNotify arg_destroy,
> +                               BlockCompletionFunc *cb, void *opaque)
>  {
>      ThreadPoolElement *req;
>      AioContext *ctx = qemu_get_current_aio_context();
> -    ThreadPool *pool = aio_get_thread_pool(ctx);
> +
> +    if (!pool) {
> +        pool = aio_get_thread_pool(ctx);
> +    }

I'd go for a separate implementation to really drive the point that this
new usage is different. See the code snippet below.

It seems we're a short step away to being able to use this
implementation in a general way. Is there something that can be done
with the 'common' field in the ThreadPoolElement?

========
static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement *req)
{
    req->state = THREAD_QUEUED;
    req->pool = pool;

    QLIST_INSERT_HEAD(&pool->head, req, all);

    trace_thread_pool_submit(pool, req, req->arg);

    qemu_mutex_lock(&pool->lock);
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
        spawn_thread(pool);
    }
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    qemu_mutex_unlock(&pool->lock);
    qemu_cond_signal(&pool->request_cond);
}

BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
                                   BlockCompletionFunc *cb, void *opaque)
{
    ThreadPoolElement *req;
    AioContext *ctx = qemu_get_current_aio_context();
    ThreadPool *pool = aio_get_thread_pool(ctx);

    /* Assert that the thread submitting work is the same running the pool */
    assert(pool->ctx == qemu_get_current_aio_context());

    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
    req->func = func;
    req->arg = arg;

    thread_pool_submit_request(pool, req);
    return &req->common;
}

void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
{
    ThreadPoolElement *req;

    req = g_malloc(sizeof(ThreadPoolElement));
    req->func = func;
    req->arg = arg;

    thread_pool_submit_request(pool, req);
}
=================

>  
>      /* Assert that the thread submitting work is the same running the pool */
>      assert(pool->ctx == qemu_get_current_aio_context());
> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>      return &req->common;
>  }
>  
> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> +                                   void *arg, GDestroyNotify arg_destroy,
> +                                   BlockCompletionFunc *cb, void *opaque)
> +{
> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
> +}
> +
> +void thread_pool_poll(ThreadPool *pool)
> +{
> +    aio_bh_poll(pool->ctx);
> +}
> +
>  typedef struct ThreadPoolCo {
>      Coroutine *co;
>      int ret;
> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>      return tpc.ret;
>  }
>  
> -void thread_pool_submit(ThreadPoolFunc *func,
> -                        void *arg, GDestroyNotify arg_destroy)
> +void thread_pool_join(ThreadPool *pool)

This is misleading because it's about the requests, not the threads in
the pool. Compare with what thread_pool_free does:

    /* Wait for worker threads to terminate */
    pool->max_threads = 0;
    qemu_cond_broadcast(&pool->request_cond);
    while (pool->cur_threads > 0) {
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    }

>  {
> -    thread_pool_submit_aio(func, arg, arg_destroy, NULL, NULL);
> +    /* Assert that the thread waiting is the same running the pool */
> +    assert(pool->ctx == qemu_get_current_aio_context());
> +
> +    qemu_mutex_lock(&pool->lock);
> +
> +    if (pool->requests_executing > 0 ||
> +        !QTAILQ_EMPTY(&pool->request_list)) {
> +        qemu_cond_wait(&pool->no_requests_cond, &pool->lock);
> +    }
> +    assert(pool->requests_executing == 0 &&
> +           QTAILQ_EMPTY(&pool->request_list));
> +
> +    qemu_mutex_unlock(&pool->lock);
> +
> +    aio_bh_poll(pool->ctx);
> +
> +    assert(QLIST_EMPTY(&pool->head));
>  }
>  
> -void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> +                                    int min_threads, int max_threads)
>  {
> +    assert(min_threads >= 0);
> +    assert(max_threads > 0);
> +    assert(max_threads >= min_threads);
> +
>      qemu_mutex_lock(&pool->lock);
>  
> -    pool->min_threads = ctx->thread_pool_min;
> -    pool->max_threads = ctx->thread_pool_max;
> +    pool->min_threads = min_threads;
> +    pool->max_threads = max_threads;
>  
>      /*
>       * We either have to:
> @@ -330,6 +379,12 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
>      qemu_mutex_unlock(&pool->lock);
>  }
>  
> +void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +{
> +    thread_pool_set_minmax_threads(pool,
> +                                   ctx->thread_pool_min, ctx->thread_pool_max);
> +}
> +
>  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
>  {
>      if (!ctx) {
> @@ -342,6 +397,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
>      qemu_mutex_init(&pool->lock);
>      qemu_cond_init(&pool->worker_stopped);
>      qemu_cond_init(&pool->request_cond);
> +    qemu_cond_init(&pool->no_requests_cond);
>      pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
>  
>      QLIST_INIT(&pool->head);
> @@ -382,6 +438,7 @@ void thread_pool_free(ThreadPool *pool)
>      qemu_mutex_unlock(&pool->lock);
>  
>      qemu_bh_delete(pool->completion_bh);
> +    qemu_cond_destroy(&pool->no_requests_cond);
>      qemu_cond_destroy(&pool->request_cond);
>      qemu_cond_destroy(&pool->worker_stopped);
>      qemu_mutex_destroy(&pool->lock);
Maciej S. Szmigiero Sept. 3, 2024, 12:02 p.m. UTC | #2
On 3.09.2024 00:07, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Migration code wants to manage device data sending threads in one place.
>>
>> QEMU has an existing thread pool implementation, however it was limited
>> to queuing AIO operations only and essentially had a 1:1 mapping between
>> the current AioContext and the ThreadPool in use.
>>
>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>> too.
>>
>> This brings a few new operations on a pool:
>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>> thread count in the pool.
>>
>> * thread_pool_join() operation waits until all the submitted work requests
>> have finished.
>>
>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>> halves run (if they are indeed scheduled to be run).
>> It is useful for thread pool users that need to launch or terminate new
>> threads without returning to the QEMU main loop.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   include/block/thread-pool.h   | 10 ++++-
>>   tests/unit/test-thread-pool.c |  2 +-
>>   util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>>   3 files changed, 76 insertions(+), 13 deletions(-)
>>
>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>> index b484c4780ea6..1769496056cd 100644
>> --- a/include/block/thread-pool.h
>> +++ b/include/block/thread-pool.h
>> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>                                      void *arg, GDestroyNotify arg_destroy,
>>                                      BlockCompletionFunc *cb, void *opaque);
>>   int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>> -void thread_pool_submit(ThreadPoolFunc *func,
>> -                        void *arg, GDestroyNotify arg_destroy);
>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> +                               void *arg, GDestroyNotify arg_destroy,
>> +                               BlockCompletionFunc *cb, void *opaque);
> 
> These kinds of changes (create wrappers, change signatures, etc), could
> be in their own patch as it's just code motion that should not have
> functional impact. The "no_requests" stuff would be better discussed in
> a separate patch.

These changes *all* should have no functional impact on existing callers.

But I get your overall point, will try to separate these really trivial
parts.

>>   
>> +void thread_pool_join(ThreadPool *pool);
>> +void thread_pool_poll(ThreadPool *pool);
>> +
>> +void thread_pool_set_minmax_threads(ThreadPool *pool,
>> +                                    int min_threads, int max_threads);
>>   void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>>   
>>   #endif
>> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
>> index e4afb9e36292..469c0f7057b6 100644
>> --- a/tests/unit/test-thread-pool.c
>> +++ b/tests/unit/test-thread-pool.c
>> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>>   static void test_submit(void)
>>   {
>>       WorkerTestData data = { .n = 0 };
>> -    thread_pool_submit(worker_cb, &data, NULL);
>> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>>       while (data.n == 0) {
>>           aio_poll(ctx, true);
>>       }
>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>> index 69a87ee79252..2bf3be875a51 100644
>> --- a/util/thread-pool.c
>> +++ b/util/thread-pool.c
>> @@ -60,6 +60,7 @@ struct ThreadPool {
>>       QemuMutex lock;
>>       QemuCond worker_stopped;
>>       QemuCond request_cond;
>> +    QemuCond no_requests_cond;
>>       QEMUBH *new_thread_bh;
>>   
>>       /* The following variables are only accessed from one AioContext. */
>> @@ -73,6 +74,7 @@ struct ThreadPool {
>>       int pending_threads; /* threads created but not running yet */
>>       int min_threads;
>>       int max_threads;
>> +    size_t requests_executing;
> 
> What's with size_t? Should this be a uint32_t instead?

Sizes of objects are normally size_t, since otherwise bad
things happen if objects are bigger than 4 GiB.

Considering that the minimum object size is 1 byte the
max count of distinct objects also needs a size_t to not
risk an overflow.

I think that while 2^32 requests executing seems unlikely
saving 4 bytes seems not worth worrying that someone will
find a vulnerability triggered by overflowing a 32-bit
variable (not necessary in the migration code but in some
other thread pool user).

>>   };
>>   
>>   static void *worker_thread(void *opaque)
>> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>>           req = QTAILQ_FIRST(&pool->request_list);
>>           QTAILQ_REMOVE(&pool->request_list, req, reqs);
>>           req->state = THREAD_ACTIVE;
>> +
>> +        assert(pool->requests_executing < SIZE_MAX);
>> +        pool->requests_executing++;
>> +
>>           qemu_mutex_unlock(&pool->lock);
>>   
>>           ret = req->func(req->arg);
>> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>>   
>>           qemu_bh_schedule(pool->completion_bh);
>>           qemu_mutex_lock(&pool->lock);
>> +
>> +        assert(pool->requests_executing > 0);
>> +        pool->requests_executing--;
>> +
>> +        if (pool->requests_executing == 0 &&
>> +            QTAILQ_EMPTY(&pool->request_list)) {
>> +            qemu_cond_signal(&pool->no_requests_cond);
>> +        }
> 
> An empty requests list and no request in flight means the worker will
> now exit after the timeout, no? Can you just kick the worker out of the
> wait and use pool->worker_stopped instead of the new condition variable?

First, all threads won't terminate if either min_threads or max_threads
isn't 0.
It might be in the migration thread pool case but we are adding a
generic thread pool so it should be as universal as possible.
thread_pool_free() can get away with overwriting these values since
it is destroying the pool anyway.

Also, the *_join() (or whatever its final name will be) operation is
about waiting for all requests / work items to finish, not about waiting
for threads to terminate.
It's essentially a synchronization point for a thread pool, not a cleanup.

>>       }
>>   
>>       pool->cur_threads--;
>> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>>       .cancel_async       = thread_pool_cancel,
>>   };
>>   
>> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>> -                                   void *arg, GDestroyNotify arg_destroy,
>> -                                   BlockCompletionFunc *cb, void *opaque)
>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> +                               void *arg, GDestroyNotify arg_destroy,
>> +                               BlockCompletionFunc *cb, void *opaque)
>>   {
>>       ThreadPoolElement *req;
>>       AioContext *ctx = qemu_get_current_aio_context();
>> -    ThreadPool *pool = aio_get_thread_pool(ctx);
>> +
>> +    if (!pool) {
>> +        pool = aio_get_thread_pool(ctx);
>> +    }
> 
> I'd go for a separate implementation to really drive the point that this
> new usage is different. See the code snippet below.

I see your point there - will split these implementations.

> It seems we're a short step away to being able to use this
> implementation in a general way. Is there something that can be done
> with the 'common' field in the ThreadPoolElement?

The non-AIO request flow still need the completion callback from BlockAIOCB
(and its argument pointer) so removing the "common" field from these requests
would need introducing two "flavors" of ThreadPoolElement.

Not sure memory saving here are worth the increase in code complexity.

> ========
> static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement *req)
> {
>      req->state = THREAD_QUEUED;
>      req->pool = pool;
> 
>      QLIST_INSERT_HEAD(&pool->head, req, all);
> 
>      trace_thread_pool_submit(pool, req, req->arg);
> 
>      qemu_mutex_lock(&pool->lock);
>      if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
>          spawn_thread(pool);
>      }
>      QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
>      qemu_mutex_unlock(&pool->lock);
>      qemu_cond_signal(&pool->request_cond);
> }
> 
> BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>                                     BlockCompletionFunc *cb, void *opaque)
> {
>      ThreadPoolElement *req;
>      AioContext *ctx = qemu_get_current_aio_context();
>      ThreadPool *pool = aio_get_thread_pool(ctx);
> 
>      /* Assert that the thread submitting work is the same running the pool */
>      assert(pool->ctx == qemu_get_current_aio_context());
> 
>      req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>      req->func = func;
>      req->arg = arg;
> 
>      thread_pool_submit_request(pool, req);
>      return &req->common;
> }
> 
> void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
> {
>      ThreadPoolElement *req;
> 
>      req = g_malloc(sizeof(ThreadPoolElement));
>      req->func = func;
>      req->arg = arg;
> 
>      thread_pool_submit_request(pool, req);
> }
> =================
> 
>>   
>>       /* Assert that the thread submitting work is the same running the pool */
>>       assert(pool->ctx == qemu_get_current_aio_context());
>> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>       return &req->common;
>>   }
>>   
>> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>> +                                   void *arg, GDestroyNotify arg_destroy,
>> +                                   BlockCompletionFunc *cb, void *opaque)
>> +{
>> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
>> +}
>> +
>> +void thread_pool_poll(ThreadPool *pool)
>> +{
>> +    aio_bh_poll(pool->ctx);
>> +}
>> +
>>   typedef struct ThreadPoolCo {
>>       Coroutine *co;
>>       int ret;
>> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>>       return tpc.ret;
>>   }
>>   
>> -void thread_pool_submit(ThreadPoolFunc *func,
>> -                        void *arg, GDestroyNotify arg_destroy)
>> +void thread_pool_join(ThreadPool *pool)
> 
> This is misleading because it's about the requests, not the threads in
> the pool. Compare with what thread_pool_free does:
> 
>      /* Wait for worker threads to terminate */
>      pool->max_threads = 0;
>      qemu_cond_broadcast(&pool->request_cond);
>      while (pool->cur_threads > 0) {
>          qemu_cond_wait(&pool->worker_stopped, &pool->lock);
>      }
> 

I'm open to thread_pool_join() better naming proposals.

Thanks,
Maciej
Stefan Hajnoczi Sept. 3, 2024, 1:55 p.m. UTC | #3
On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
<mail@maciej.szmigiero.name> wrote:
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it was limited
> to queuing AIO operations only and essentially had a 1:1 mapping between
> the current AioContext and the ThreadPool in use.
>
> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> too.
>
> This brings a few new operations on a pool:
> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> thread count in the pool.
>
> * thread_pool_join() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_poll() lets the new thread and / or thread completion bottom
> halves run (if they are indeed scheduled to be run).
> It is useful for thread pool users that need to launch or terminate new
> threads without returning to the QEMU main loop.

Did you consider glib's GThreadPool?
https://docs.gtk.org/glib/struct.ThreadPool.html

QEMU's thread pool is integrated into the QEMU event loop. If your
goal is to bypass the QEMU event loop, then you may as well use the
glib API instead.

thread_pool_join() and thread_pool_poll() will lead to code that
blocks the event loop. QEMU's aio_poll() and nested event loops in
general are a source of hangs and re-entrancy bugs. I would prefer not
introducing these issues in the QEMU ThreadPool API.


>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>  include/block/thread-pool.h   | 10 ++++-
>  tests/unit/test-thread-pool.c |  2 +-
>  util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>  3 files changed, 76 insertions(+), 13 deletions(-)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index b484c4780ea6..1769496056cd 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>                                     void *arg, GDestroyNotify arg_destroy,
>                                     BlockCompletionFunc *cb, void *opaque);
>  int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> -void thread_pool_submit(ThreadPoolFunc *func,
> -                        void *arg, GDestroyNotify arg_destroy);
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> +                               void *arg, GDestroyNotify arg_destroy,
> +                               BlockCompletionFunc *cb, void *opaque);
>
> +void thread_pool_join(ThreadPool *pool);
> +void thread_pool_poll(ThreadPool *pool);
> +
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> +                                    int min_threads, int max_threads);
>  void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>
>  #endif
> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
> index e4afb9e36292..469c0f7057b6 100644
> --- a/tests/unit/test-thread-pool.c
> +++ b/tests/unit/test-thread-pool.c
> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>  static void test_submit(void)
>  {
>      WorkerTestData data = { .n = 0 };
> -    thread_pool_submit(worker_cb, &data, NULL);
> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>      while (data.n == 0) {
>          aio_poll(ctx, true);
>      }
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 69a87ee79252..2bf3be875a51 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -60,6 +60,7 @@ struct ThreadPool {
>      QemuMutex lock;
>      QemuCond worker_stopped;
>      QemuCond request_cond;
> +    QemuCond no_requests_cond;
>      QEMUBH *new_thread_bh;
>
>      /* The following variables are only accessed from one AioContext. */
> @@ -73,6 +74,7 @@ struct ThreadPool {
>      int pending_threads; /* threads created but not running yet */
>      int min_threads;
>      int max_threads;
> +    size_t requests_executing;
>  };
>
>  static void *worker_thread(void *opaque)
> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>          req = QTAILQ_FIRST(&pool->request_list);
>          QTAILQ_REMOVE(&pool->request_list, req, reqs);
>          req->state = THREAD_ACTIVE;
> +
> +        assert(pool->requests_executing < SIZE_MAX);
> +        pool->requests_executing++;
> +
>          qemu_mutex_unlock(&pool->lock);
>
>          ret = req->func(req->arg);
> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>
>          qemu_bh_schedule(pool->completion_bh);
>          qemu_mutex_lock(&pool->lock);
> +
> +        assert(pool->requests_executing > 0);
> +        pool->requests_executing--;
> +
> +        if (pool->requests_executing == 0 &&
> +            QTAILQ_EMPTY(&pool->request_list)) {
> +            qemu_cond_signal(&pool->no_requests_cond);
> +        }
>      }
>
>      pool->cur_threads--;
> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>      .cancel_async       = thread_pool_cancel,
>  };
>
> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> -                                   void *arg, GDestroyNotify arg_destroy,
> -                                   BlockCompletionFunc *cb, void *opaque)
> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> +                               void *arg, GDestroyNotify arg_destroy,
> +                               BlockCompletionFunc *cb, void *opaque)
>  {
>      ThreadPoolElement *req;
>      AioContext *ctx = qemu_get_current_aio_context();
> -    ThreadPool *pool = aio_get_thread_pool(ctx);
> +
> +    if (!pool) {
> +        pool = aio_get_thread_pool(ctx);
> +    }
>
>      /* Assert that the thread submitting work is the same running the pool */
>      assert(pool->ctx == qemu_get_current_aio_context());
> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>      return &req->common;
>  }
>
> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
> +                                   void *arg, GDestroyNotify arg_destroy,
> +                                   BlockCompletionFunc *cb, void *opaque)
> +{
> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
> +}
> +
> +void thread_pool_poll(ThreadPool *pool)
> +{
> +    aio_bh_poll(pool->ctx);
> +}
> +
>  typedef struct ThreadPoolCo {
>      Coroutine *co;
>      int ret;
> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>      return tpc.ret;
>  }
>
> -void thread_pool_submit(ThreadPoolFunc *func,
> -                        void *arg, GDestroyNotify arg_destroy)
> +void thread_pool_join(ThreadPool *pool)
>  {
> -    thread_pool_submit_aio(func, arg, arg_destroy, NULL, NULL);
> +    /* Assert that the thread waiting is the same running the pool */
> +    assert(pool->ctx == qemu_get_current_aio_context());
> +
> +    qemu_mutex_lock(&pool->lock);
> +
> +    if (pool->requests_executing > 0 ||
> +        !QTAILQ_EMPTY(&pool->request_list)) {
> +        qemu_cond_wait(&pool->no_requests_cond, &pool->lock);
> +    }
> +    assert(pool->requests_executing == 0 &&
> +           QTAILQ_EMPTY(&pool->request_list));
> +
> +    qemu_mutex_unlock(&pool->lock);
> +
> +    aio_bh_poll(pool->ctx);
> +
> +    assert(QLIST_EMPTY(&pool->head));
>  }
>
> -void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +void thread_pool_set_minmax_threads(ThreadPool *pool,
> +                                    int min_threads, int max_threads)
>  {
> +    assert(min_threads >= 0);
> +    assert(max_threads > 0);
> +    assert(max_threads >= min_threads);
> +
>      qemu_mutex_lock(&pool->lock);
>
> -    pool->min_threads = ctx->thread_pool_min;
> -    pool->max_threads = ctx->thread_pool_max;
> +    pool->min_threads = min_threads;
> +    pool->max_threads = max_threads;
>
>      /*
>       * We either have to:
> @@ -330,6 +379,12 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
>      qemu_mutex_unlock(&pool->lock);
>  }
>
> +void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
> +{
> +    thread_pool_set_minmax_threads(pool,
> +                                   ctx->thread_pool_min, ctx->thread_pool_max);
> +}
> +
>  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
>  {
>      if (!ctx) {
> @@ -342,6 +397,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
>      qemu_mutex_init(&pool->lock);
>      qemu_cond_init(&pool->worker_stopped);
>      qemu_cond_init(&pool->request_cond);
> +    qemu_cond_init(&pool->no_requests_cond);
>      pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
>
>      QLIST_INIT(&pool->head);
> @@ -382,6 +438,7 @@ void thread_pool_free(ThreadPool *pool)
>      qemu_mutex_unlock(&pool->lock);
>
>      qemu_bh_delete(pool->completion_bh);
> +    qemu_cond_destroy(&pool->no_requests_cond);
>      qemu_cond_destroy(&pool->request_cond);
>      qemu_cond_destroy(&pool->worker_stopped);
>      qemu_mutex_destroy(&pool->lock);
>
Fabiano Rosas Sept. 3, 2024, 2:26 p.m. UTC | #4
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> On 3.09.2024 00:07, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>> 
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it was limited
>>> to queuing AIO operations only and essentially had a 1:1 mapping between
>>> the current AioContext and the ThreadPool in use.
>>>
>>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>>> too.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>>> thread count in the pool.
>>>
>>> * thread_pool_join() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>>> halves run (if they are indeed scheduled to be run).
>>> It is useful for thread pool users that need to launch or terminate new
>>> threads without returning to the QEMU main loop.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   include/block/thread-pool.h   | 10 ++++-
>>>   tests/unit/test-thread-pool.c |  2 +-
>>>   util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>>>   3 files changed, 76 insertions(+), 13 deletions(-)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index b484c4780ea6..1769496056cd 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>                                      void *arg, GDestroyNotify arg_destroy,
>>>                                      BlockCompletionFunc *cb, void *opaque);
>>>   int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>> -                        void *arg, GDestroyNotify arg_destroy);
>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> +                               void *arg, GDestroyNotify arg_destroy,
>>> +                               BlockCompletionFunc *cb, void *opaque);
>> 
>> These kinds of changes (create wrappers, change signatures, etc), could
>> be in their own patch as it's just code motion that should not have
>> functional impact. The "no_requests" stuff would be better discussed in
>> a separate patch.
>
> These changes *all* should have no functional impact on existing callers.
>
> But I get your overall point, will try to separate these really trivial
> parts.

Yeah, I guess I meant that one set of changes has a larger potential for
introducing a bug while the other is clearly harmless.

>
>>>   
>>> +void thread_pool_join(ThreadPool *pool);
>>> +void thread_pool_poll(ThreadPool *pool);
>>> +
>>> +void thread_pool_set_minmax_threads(ThreadPool *pool,
>>> +                                    int min_threads, int max_threads);
>>>   void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>>>   
>>>   #endif
>>> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
>>> index e4afb9e36292..469c0f7057b6 100644
>>> --- a/tests/unit/test-thread-pool.c
>>> +++ b/tests/unit/test-thread-pool.c
>>> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>>>   static void test_submit(void)
>>>   {
>>>       WorkerTestData data = { .n = 0 };
>>> -    thread_pool_submit(worker_cb, &data, NULL);
>>> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>>>       while (data.n == 0) {
>>>           aio_poll(ctx, true);
>>>       }
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 69a87ee79252..2bf3be875a51 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -60,6 +60,7 @@ struct ThreadPool {
>>>       QemuMutex lock;
>>>       QemuCond worker_stopped;
>>>       QemuCond request_cond;
>>> +    QemuCond no_requests_cond;
>>>       QEMUBH *new_thread_bh;
>>>   
>>>       /* The following variables are only accessed from one AioContext. */
>>> @@ -73,6 +74,7 @@ struct ThreadPool {
>>>       int pending_threads; /* threads created but not running yet */
>>>       int min_threads;
>>>       int max_threads;
>>> +    size_t requests_executing;
>> 
>> What's with size_t? Should this be a uint32_t instead?
>
> Sizes of objects are normally size_t, since otherwise bad
> things happen if objects are bigger than 4 GiB.

Ok, but requests_executing is not the size of an object. It's the number
of objects in a linked list that satisfy a certain predicate. There are
no address space size considerations here.

>
> Considering that the minimum object size is 1 byte the
> max count of distinct objects also needs a size_t to not
> risk an overflow.

I'm not sure I get you, there's no overflow since you're bounds checking
with the assert. Or is this a more abstract line of thought about how
many ThreadPoolElements can be present in memory at a time and you'd
like a type that's certain to fit the theoretical amount of objects?

>
> I think that while 2^32 requests executing seems unlikely
> saving 4 bytes seems not worth worrying that someone will
> find a vulnerability triggered by overflowing a 32-bit
> variable (not necessary in the migration code but in some
> other thread pool user).
>
>>>   };
>>>   
>>>   static void *worker_thread(void *opaque)
>>> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>>>           req = QTAILQ_FIRST(&pool->request_list);
>>>           QTAILQ_REMOVE(&pool->request_list, req, reqs);
>>>           req->state = THREAD_ACTIVE;
>>> +
>>> +        assert(pool->requests_executing < SIZE_MAX);
>>> +        pool->requests_executing++;
>>> +
>>>           qemu_mutex_unlock(&pool->lock);
>>>   
>>>           ret = req->func(req->arg);
>>> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>>>   
>>>           qemu_bh_schedule(pool->completion_bh);
>>>           qemu_mutex_lock(&pool->lock);
>>> +
>>> +        assert(pool->requests_executing > 0);
>>> +        pool->requests_executing--;
>>> +
>>> +        if (pool->requests_executing == 0 &&
>>> +            QTAILQ_EMPTY(&pool->request_list)) {
>>> +            qemu_cond_signal(&pool->no_requests_cond);
>>> +        }
>> 
>> An empty requests list and no request in flight means the worker will
>> now exit after the timeout, no? Can you just kick the worker out of the
>> wait and use pool->worker_stopped instead of the new condition variable?
>
> First, all threads won't terminate if either min_threads or max_threads
> isn't 0.

Ah I overlooked the break condition, nevermind.

> It might be in the migration thread pool case but we are adding a
> generic thread pool so it should be as universal as possible.
> thread_pool_free() can get away with overwriting these values since
> it is destroying the pool anyway.
>
> Also, the *_join() (or whatever its final name will be) operation is
> about waiting for all requests / work items to finish, not about waiting
> for threads to terminate.

Right, but the idea was to piggyback on the thread termination to infer
(the obvious) requests service termination. We cannot do that, as you've
explained, fine.

> It's essentially a synchronization point for a thread pool, not a cleanup.
>
>>>       }
>>>   
>>>       pool->cur_threads--;
>>> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>>>       .cancel_async       = thread_pool_cancel,
>>>   };
>>>   
>>> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>> -                                   void *arg, GDestroyNotify arg_destroy,
>>> -                                   BlockCompletionFunc *cb, void *opaque)
>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> +                               void *arg, GDestroyNotify arg_destroy,
>>> +                               BlockCompletionFunc *cb, void *opaque)
>>>   {
>>>       ThreadPoolElement *req;
>>>       AioContext *ctx = qemu_get_current_aio_context();
>>> -    ThreadPool *pool = aio_get_thread_pool(ctx);
>>> +
>>> +    if (!pool) {
>>> +        pool = aio_get_thread_pool(ctx);
>>> +    }
>> 
>> I'd go for a separate implementation to really drive the point that this
>> new usage is different. See the code snippet below.
>
> I see your point there - will split these implementations.
>
>> It seems we're a short step away to being able to use this
>> implementation in a general way. Is there something that can be done
>> with the 'common' field in the ThreadPoolElement?
>
> The non-AIO request flow still need the completion callback from BlockAIOCB
> (and its argument pointer) so removing the "common" field from these requests
> would need introducing two "flavors" of ThreadPoolElement.
>
> Not sure memory saving here are worth the increase in code complexity.

I'm not asking that of you, but I think it should be done
eventually. The QEMU block layer is very particular and I wouldn't want
the use-cases for the thread-pool to get confused. But I can't see a way
out right now, so let's postpone this, see if anyone else has comments.

>
>> ========
>> static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement *req)
>> {
>>      req->state = THREAD_QUEUED;
>>      req->pool = pool;
>> 
>>      QLIST_INSERT_HEAD(&pool->head, req, all);
>> 
>>      trace_thread_pool_submit(pool, req, req->arg);
>> 
>>      qemu_mutex_lock(&pool->lock);
>>      if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
>>          spawn_thread(pool);
>>      }
>>      QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
>>      qemu_mutex_unlock(&pool->lock);
>>      qemu_cond_signal(&pool->request_cond);
>> }
>> 
>> BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>                                     BlockCompletionFunc *cb, void *opaque)
>> {
>>      ThreadPoolElement *req;
>>      AioContext *ctx = qemu_get_current_aio_context();
>>      ThreadPool *pool = aio_get_thread_pool(ctx);
>> 
>>      /* Assert that the thread submitting work is the same running the pool */
>>      assert(pool->ctx == qemu_get_current_aio_context());
>> 
>>      req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>>      req->func = func;
>>      req->arg = arg;
>> 
>>      thread_pool_submit_request(pool, req);
>>      return &req->common;
>> }
>> 
>> void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
>> {
>>      ThreadPoolElement *req;
>> 
>>      req = g_malloc(sizeof(ThreadPoolElement));
>>      req->func = func;
>>      req->arg = arg;
>> 
>>      thread_pool_submit_request(pool, req);
>> }
>> =================
>> 
>>>   
>>>       /* Assert that the thread submitting work is the same running the pool */
>>>       assert(pool->ctx == qemu_get_current_aio_context());
>>> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>       return &req->common;
>>>   }
>>>   
>>> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>> +                                   void *arg, GDestroyNotify arg_destroy,
>>> +                                   BlockCompletionFunc *cb, void *opaque)
>>> +{
>>> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
>>> +}
>>> +
>>> +void thread_pool_poll(ThreadPool *pool)
>>> +{
>>> +    aio_bh_poll(pool->ctx);
>>> +}
>>> +
>>>   typedef struct ThreadPoolCo {
>>>       Coroutine *co;
>>>       int ret;
>>> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>>>       return tpc.ret;
>>>   }
>>>   
>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>> -                        void *arg, GDestroyNotify arg_destroy)
>>> +void thread_pool_join(ThreadPool *pool)
>> 
>> This is misleading because it's about the requests, not the threads in
>> the pool. Compare with what thread_pool_free does:
>> 
>>      /* Wait for worker threads to terminate */
>>      pool->max_threads = 0;
>>      qemu_cond_broadcast(&pool->request_cond);
>>      while (pool->cur_threads > 0) {
>>          qemu_cond_wait(&pool->worker_stopped, &pool->lock);
>>      }
>> 
>
> I'm open to thread_pool_join() better naming proposals.

thread_pool_wait() might be better.

>
> Thanks,
> Maciej
Maciej S. Szmigiero Sept. 3, 2024, 4:54 p.m. UTC | #5
On 3.09.2024 15:55, Stefan Hajnoczi wrote:
> On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
> <mail@maciej.szmigiero.name> wrote:
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Migration code wants to manage device data sending threads in one place.
>>
>> QEMU has an existing thread pool implementation, however it was limited
>> to queuing AIO operations only and essentially had a 1:1 mapping between
>> the current AioContext and the ThreadPool in use.
>>
>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>> too.
>>
>> This brings a few new operations on a pool:
>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>> thread count in the pool.
>>
>> * thread_pool_join() operation waits until all the submitted work requests
>> have finished.
>>
>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>> halves run (if they are indeed scheduled to be run).
>> It is useful for thread pool users that need to launch or terminate new
>> threads without returning to the QEMU main loop.
> 
> Did you consider glib's GThreadPool?
> https://docs.gtk.org/glib/struct.ThreadPool.html
> 
> QEMU's thread pool is integrated into the QEMU event loop. If your
> goal is to bypass the QEMU event loop, then you may as well use the
> glib API instead.
> 
> thread_pool_join() and thread_pool_poll() will lead to code that
> blocks the event loop. QEMU's aio_poll() and nested event loops in
> general are a source of hangs and re-entrancy bugs. I would prefer not
> introducing these issues in the QEMU ThreadPool API.
> 

Unfortunately, the problem with the migration code is that it is
synchronous - it does not return to the main event loop until the
migration is done.

So the only way to handle things that need working event loop is to
pump it manually from inside the migration code.

The reason why I used the QEMU thread pool in the first place in this
patch set version is because Peter asked me to do so during the review
of its previous iteration [1].

Peter also asked me previously to move to QEMU synchronization
primitives from using the Glib ones in the early version of this
patch set [2].

I personally would rather use something common to many applications,
well tested and with more pairs of eyes looking at it rather to
re-invent things in QEMU.

Looking at GThreadPool it seems that it lacks ability to wait until
all queued work have finished, so this would need to be open-coded
in the migration code.

@Peter, what's your opinion on using Glib's thread pool instead of
QEMU's one, considering the above things?

Thanks,
Maciej

[1]: https://lore.kernel.org/qemu-devel/ZniFH14DT6ycjbrL@x1n/ point 5: "Worker thread model"
[2]: https://lore.kernel.org/qemu-devel/Zi_9SyJy__8wJTou@x1n/
Maciej S. Szmigiero Sept. 3, 2024, 6:14 p.m. UTC | #6
On 3.09.2024 16:26, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> On 3.09.2024 00:07, Fabiano Rosas wrote:
>>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>>
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> Migration code wants to manage device data sending threads in one place.
>>>>
>>>> QEMU has an existing thread pool implementation, however it was limited
>>>> to queuing AIO operations only and essentially had a 1:1 mapping between
>>>> the current AioContext and the ThreadPool in use.
>>>>
>>>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>>>> too.
>>>>
>>>> This brings a few new operations on a pool:
>>>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>>>> thread count in the pool.
>>>>
>>>> * thread_pool_join() operation waits until all the submitted work requests
>>>> have finished.
>>>>
>>>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>>>> halves run (if they are indeed scheduled to be run).
>>>> It is useful for thread pool users that need to launch or terminate new
>>>> threads without returning to the QEMU main loop.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>>    include/block/thread-pool.h   | 10 ++++-
>>>>    tests/unit/test-thread-pool.c |  2 +-
>>>>    util/thread-pool.c            | 77 ++++++++++++++++++++++++++++++-----
>>>>    3 files changed, 76 insertions(+), 13 deletions(-)
>>>>
>>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>>> index b484c4780ea6..1769496056cd 100644
>>>> --- a/include/block/thread-pool.h
>>>> +++ b/include/block/thread-pool.h
>>>> @@ -37,9 +37,15 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>>                                       void *arg, GDestroyNotify arg_destroy,
>>>>                                       BlockCompletionFunc *cb, void *opaque);
>>>>    int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>>> -                        void *arg, GDestroyNotify arg_destroy);
>>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>>> +                               void *arg, GDestroyNotify arg_destroy,
>>>> +                               BlockCompletionFunc *cb, void *opaque);
>>>
>>> These kinds of changes (create wrappers, change signatures, etc), could
>>> be in their own patch as it's just code motion that should not have
>>> functional impact. The "no_requests" stuff would be better discussed in
>>> a separate patch.
>>
>> These changes *all* should have no functional impact on existing callers.
>>
>> But I get your overall point, will try to separate these really trivial
>> parts.
> 
> Yeah, I guess I meant that one set of changes has a larger potential for
> introducing a bug while the other is clearly harmless.

I understand.

>>
>>>>    
>>>> +void thread_pool_join(ThreadPool *pool);
>>>> +void thread_pool_poll(ThreadPool *pool);
>>>> +
>>>> +void thread_pool_set_minmax_threads(ThreadPool *pool,
>>>> +                                    int min_threads, int max_threads);
>>>>    void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
>>>>    
>>>>    #endif
>>>> diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
>>>> index e4afb9e36292..469c0f7057b6 100644
>>>> --- a/tests/unit/test-thread-pool.c
>>>> +++ b/tests/unit/test-thread-pool.c
>>>> @@ -46,7 +46,7 @@ static void done_cb(void *opaque, int ret)
>>>>    static void test_submit(void)
>>>>    {
>>>>        WorkerTestData data = { .n = 0 };
>>>> -    thread_pool_submit(worker_cb, &data, NULL);
>>>> +    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
>>>>        while (data.n == 0) {
>>>>            aio_poll(ctx, true);
>>>>        }
>>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>>> index 69a87ee79252..2bf3be875a51 100644
>>>> --- a/util/thread-pool.c
>>>> +++ b/util/thread-pool.c
>>>> @@ -60,6 +60,7 @@ struct ThreadPool {
>>>>        QemuMutex lock;
>>>>        QemuCond worker_stopped;
>>>>        QemuCond request_cond;
>>>> +    QemuCond no_requests_cond;
>>>>        QEMUBH *new_thread_bh;
>>>>    
>>>>        /* The following variables are only accessed from one AioContext. */
>>>> @@ -73,6 +74,7 @@ struct ThreadPool {
>>>>        int pending_threads; /* threads created but not running yet */
>>>>        int min_threads;
>>>>        int max_threads;
>>>> +    size_t requests_executing;
>>>
>>> What's with size_t? Should this be a uint32_t instead?
>>
>> Sizes of objects are normally size_t, since otherwise bad
>> things happen if objects are bigger than 4 GiB.
> 
> Ok, but requests_executing is not the size of an object. It's the number
> of objects in a linked list that satisfy a certain predicate. There are
> no address space size considerations here.

Max object count = Address space size / Min object size

If min object size = 1 then Max object count = Address space size

>>
>> Considering that the minimum object size is 1 byte the
>> max count of distinct objects also needs a size_t to not
>> risk an overflow.
> 
> I'm not sure I get you, there's no overflow since you're bounds checking
> with the assert. Or is this a more abstract line of thought about how
> many ThreadPoolElements can be present in memory at a time and you'd
> like a type that's certain to fit the theoretical amount of objects?

It's more of theoretical thing (not to introduce an unnecessary
limit) but you are right that assert() would cover any possible
issues due to counter overflow, so I can change it to uint32_t
indeed.

>>
>> I think that while 2^32 requests executing seems unlikely
>> saving 4 bytes seems not worth worrying that someone will
>> find a vulnerability triggered by overflowing a 32-bit
>> variable (not necessary in the migration code but in some
>> other thread pool user).
>>
>>>>    };
>>>>    
>>>>    static void *worker_thread(void *opaque)
>>>> @@ -107,6 +109,10 @@ static void *worker_thread(void *opaque)
>>>>            req = QTAILQ_FIRST(&pool->request_list);
>>>>            QTAILQ_REMOVE(&pool->request_list, req, reqs);
>>>>            req->state = THREAD_ACTIVE;
>>>> +
>>>> +        assert(pool->requests_executing < SIZE_MAX);
>>>> +        pool->requests_executing++;
>>>> +
>>>>            qemu_mutex_unlock(&pool->lock);
>>>>    
>>>>            ret = req->func(req->arg);
>>>> @@ -118,6 +124,14 @@ static void *worker_thread(void *opaque)
>>>>    
>>>>            qemu_bh_schedule(pool->completion_bh);
>>>>            qemu_mutex_lock(&pool->lock);
>>>> +
>>>> +        assert(pool->requests_executing > 0);
>>>> +        pool->requests_executing--;
>>>> +
>>>> +        if (pool->requests_executing == 0 &&
>>>> +            QTAILQ_EMPTY(&pool->request_list)) {
>>>> +            qemu_cond_signal(&pool->no_requests_cond);
>>>> +        }
>>>
>>> An empty requests list and no request in flight means the worker will
>>> now exit after the timeout, no? Can you just kick the worker out of the
>>> wait and use pool->worker_stopped instead of the new condition variable?
>>
>> First, all threads won't terminate if either min_threads or max_threads
>> isn't 0.
> 
> Ah I overlooked the break condition, nevermind.
> 
>> It might be in the migration thread pool case but we are adding a
>> generic thread pool so it should be as universal as possible.
>> thread_pool_free() can get away with overwriting these values since
>> it is destroying the pool anyway.
>>
>> Also, the *_join() (or whatever its final name will be) operation is
>> about waiting for all requests / work items to finish, not about waiting
>> for threads to terminate.
> 
> Right, but the idea was to piggyback on the thread termination to infer
> (the obvious) requests service termination. We cannot do that, as you've
> explained, fine.
> 
>> It's essentially a synchronization point for a thread pool, not a cleanup.
>>
>>>>        }
>>>>    
>>>>        pool->cur_threads--;
>>>> @@ -243,13 +257,16 @@ static const AIOCBInfo thread_pool_aiocb_info = {
>>>>        .cancel_async       = thread_pool_cancel,
>>>>    };
>>>>    
>>>> -BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>> -                                   void *arg, GDestroyNotify arg_destroy,
>>>> -                                   BlockCompletionFunc *cb, void *opaque)
>>>> +BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>>> +                               void *arg, GDestroyNotify arg_destroy,
>>>> +                               BlockCompletionFunc *cb, void *opaque)
>>>>    {
>>>>        ThreadPoolElement *req;
>>>>        AioContext *ctx = qemu_get_current_aio_context();
>>>> -    ThreadPool *pool = aio_get_thread_pool(ctx);
>>>> +
>>>> +    if (!pool) {
>>>> +        pool = aio_get_thread_pool(ctx);
>>>> +    }
>>>
>>> I'd go for a separate implementation to really drive the point that this
>>> new usage is different. See the code snippet below.
>>
>> I see your point there - will split these implementations.
>>
>>> It seems we're a short step away to being able to use this
>>> implementation in a general way. Is there something that can be done
>>> with the 'common' field in the ThreadPoolElement?
>>
>> The non-AIO request flow still need the completion callback from BlockAIOCB
>> (and its argument pointer) so removing the "common" field from these requests
>> would need introducing two "flavors" of ThreadPoolElement.
>>
>> Not sure memory saving here are worth the increase in code complexity.
> 
> I'm not asking that of you, but I think it should be done
> eventually. The QEMU block layer is very particular and I wouldn't want
> the use-cases for the thread-pool to get confused. But I can't see a way
> out right now, so let's postpone this, see if anyone else has comments.

I understand.

>>
>>> ========
>>> static void thread_pool_submit_request(ThreadPool *pool, ThreadPoolElement *req)
>>> {
>>>       req->state = THREAD_QUEUED;
>>>       req->pool = pool;
>>>
>>>       QLIST_INSERT_HEAD(&pool->head, req, all);
>>>
>>>       trace_thread_pool_submit(pool, req, req->arg);
>>>
>>>       qemu_mutex_lock(&pool->lock);
>>>       if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
>>>           spawn_thread(pool);
>>>       }
>>>       QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
>>>       qemu_mutex_unlock(&pool->lock);
>>>       qemu_cond_signal(&pool->request_cond);
>>> }
>>>
>>> BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>>                                      BlockCompletionFunc *cb, void *opaque)
>>> {
>>>       ThreadPoolElement *req;
>>>       AioContext *ctx = qemu_get_current_aio_context();
>>>       ThreadPool *pool = aio_get_thread_pool(ctx);
>>>
>>>       /* Assert that the thread submitting work is the same running the pool */
>>>       assert(pool->ctx == qemu_get_current_aio_context());
>>>
>>>       req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
>>>       req->func = func;
>>>       req->arg = arg;
>>>
>>>       thread_pool_submit_request(pool, req);
>>>       return &req->common;
>>> }
>>>
>>> void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
>>> {
>>>       ThreadPoolElement *req;
>>>
>>>       req = g_malloc(sizeof(ThreadPoolElement));
>>>       req->func = func;
>>>       req->arg = arg;
>>>
>>>       thread_pool_submit_request(pool, req);
>>> }
>>> =================
>>>
>>>>    
>>>>        /* Assert that the thread submitting work is the same running the pool */
>>>>        assert(pool->ctx == qemu_get_current_aio_context());
>>>> @@ -275,6 +292,18 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>>        return &req->common;
>>>>    }
>>>>    
>>>> +BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>>> +                                   void *arg, GDestroyNotify arg_destroy,
>>>> +                                   BlockCompletionFunc *cb, void *opaque)
>>>> +{
>>>> +    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
>>>> +}
>>>> +
>>>> +void thread_pool_poll(ThreadPool *pool)
>>>> +{
>>>> +    aio_bh_poll(pool->ctx);
>>>> +}
>>>> +
>>>>    typedef struct ThreadPoolCo {
>>>>        Coroutine *co;
>>>>        int ret;
>>>> @@ -297,18 +326,38 @@ int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
>>>>        return tpc.ret;
>>>>    }
>>>>    
>>>> -void thread_pool_submit(ThreadPoolFunc *func,
>>>> -                        void *arg, GDestroyNotify arg_destroy)
>>>> +void thread_pool_join(ThreadPool *pool)
>>>
>>> This is misleading because it's about the requests, not the threads in
>>> the pool. Compare with what thread_pool_free does:
>>>
>>>       /* Wait for worker threads to terminate */
>>>       pool->max_threads = 0;
>>>       qemu_cond_broadcast(&pool->request_cond);
>>>       while (pool->cur_threads > 0) {
>>>           qemu_cond_wait(&pool->worker_stopped, &pool->lock);
>>>       }
>>>
>>
>> I'm open to thread_pool_join() better naming proposals.
> 
> thread_pool_wait() might be better.

Ack.

Thanks,
Maciej
Stefan Hajnoczi Sept. 3, 2024, 7:04 p.m. UTC | #7
On Tue, 3 Sept 2024 at 12:54, Maciej S. Szmigiero
<mail@maciej.szmigiero.name> wrote:
>
> On 3.09.2024 15:55, Stefan Hajnoczi wrote:
> > On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
> > <mail@maciej.szmigiero.name> wrote:
> >>
> >> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> >>
> >> Migration code wants to manage device data sending threads in one place.
> >>
> >> QEMU has an existing thread pool implementation, however it was limited
> >> to queuing AIO operations only and essentially had a 1:1 mapping between
> >> the current AioContext and the ThreadPool in use.
> >>
> >> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> >> too.
> >>
> >> This brings a few new operations on a pool:
> >> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> >> thread count in the pool.
> >>
> >> * thread_pool_join() operation waits until all the submitted work requests
> >> have finished.
> >>
> >> * thread_pool_poll() lets the new thread and / or thread completion bottom
> >> halves run (if they are indeed scheduled to be run).
> >> It is useful for thread pool users that need to launch or terminate new
> >> threads without returning to the QEMU main loop.
> >
> > Did you consider glib's GThreadPool?
> > https://docs.gtk.org/glib/struct.ThreadPool.html
> >
> > QEMU's thread pool is integrated into the QEMU event loop. If your
> > goal is to bypass the QEMU event loop, then you may as well use the
> > glib API instead.
> >
> > thread_pool_join() and thread_pool_poll() will lead to code that
> > blocks the event loop. QEMU's aio_poll() and nested event loops in
> > general are a source of hangs and re-entrancy bugs. I would prefer not
> > introducing these issues in the QEMU ThreadPool API.
> >
>
> Unfortunately, the problem with the migration code is that it is
> synchronous - it does not return to the main event loop until the
> migration is done.
>
> So the only way to handle things that need working event loop is to
> pump it manually from inside the migration code.
>
> The reason why I used the QEMU thread pool in the first place in this
> patch set version is because Peter asked me to do so during the review
> of its previous iteration [1].
>
> Peter also asked me previously to move to QEMU synchronization
> primitives from using the Glib ones in the early version of this
> patch set [2].
>
> I personally would rather use something common to many applications,
> well tested and with more pairs of eyes looking at it rather to
> re-invent things in QEMU.
>
> Looking at GThreadPool it seems that it lacks ability to wait until
> all queued work have finished, so this would need to be open-coded
> in the migration code.
>
> @Peter, what's your opinion on using Glib's thread pool instead of
> QEMU's one, considering the above things?

I'll add a bit more about my thinking:

Using QEMU's event-driven model is usually preferred because it makes
integrating with the rest of QEMU easy and avoids having lots of
single-purpose threads that are hard to observe/manage (e.g. through
the QMP monitor).

When there is a genuine need to spawn a thread and write synchronous
code (e.g. a blocking ioctl(2) call or something CPU-intensive), then
it's okay to do that. Use QEMUBH, EventNotifier, or other QEMU APIs to
synchronize between event loop threads and special-purpose synchronous
threads.

I haven't looked at the patch series enough to have an opinion about
whether this use case needs a special-purpose thread or not. I am
assuming it really needs to be a special-purpose thread. Peter and you
could discuss that further if you want.

I agree with Peter's request to use QEMU's synchronization primitives.
They do not depend on the event loop so they can be used outside the
event loop.

The issue I'm raising with this patch is that adding new join()/poll()
APIs that shouldn't be called from the event loop is bug-prone. It
will make the QEMU ThreadPool code harder to understand and maintain
because now there are two different contexts where different subsets
of this API can be used and mixing them leads to problems. To me the
non-event loop case is beyond the scope of QEMU's ThreadPool. I have
CCed Paolo, who wrote the thread pool in its current form in case he
wants to participate in the discussion.

Using glib's ThreadPool solves the issue while still reusing an
existing thread pool implementation. Waiting for all work to complete
can be done using QemuSemaphore.

Thanks,
Stefan

> Thanks,
> Maciej
>
> [1]: https://lore.kernel.org/qemu-devel/ZniFH14DT6ycjbrL@x1n/ point 5: "Worker thread model"
> [2]: https://lore.kernel.org/qemu-devel/Zi_9SyJy__8wJTou@x1n/
>
Peter Xu Sept. 9, 2024, 4:45 p.m. UTC | #8
Hi, Stefan, Maciej,

Sorry to be slow on responding.

On Tue, Sep 03, 2024 at 03:04:54PM -0400, Stefan Hajnoczi wrote:
> On Tue, 3 Sept 2024 at 12:54, Maciej S. Szmigiero
> <mail@maciej.szmigiero.name> wrote:
> >
> > On 3.09.2024 15:55, Stefan Hajnoczi wrote:
> > > On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
> > > <mail@maciej.szmigiero.name> wrote:
> > >>
> > >> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> > >>
> > >> Migration code wants to manage device data sending threads in one place.
> > >>
> > >> QEMU has an existing thread pool implementation, however it was limited
> > >> to queuing AIO operations only and essentially had a 1:1 mapping between
> > >> the current AioContext and the ThreadPool in use.
> > >>
> > >> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> > >> too.
> > >>
> > >> This brings a few new operations on a pool:
> > >> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> > >> thread count in the pool.
> > >>
> > >> * thread_pool_join() operation waits until all the submitted work requests
> > >> have finished.
> > >>
> > >> * thread_pool_poll() lets the new thread and / or thread completion bottom
> > >> halves run (if they are indeed scheduled to be run).
> > >> It is useful for thread pool users that need to launch or terminate new
> > >> threads without returning to the QEMU main loop.
> > >
> > > Did you consider glib's GThreadPool?
> > > https://docs.gtk.org/glib/struct.ThreadPool.html
> > >
> > > QEMU's thread pool is integrated into the QEMU event loop. If your
> > > goal is to bypass the QEMU event loop, then you may as well use the
> > > glib API instead.
> > >
> > > thread_pool_join() and thread_pool_poll() will lead to code that
> > > blocks the event loop. QEMU's aio_poll() and nested event loops in
> > > general are a source of hangs and re-entrancy bugs. I would prefer not
> > > introducing these issues in the QEMU ThreadPool API.
> > >
> >
> > Unfortunately, the problem with the migration code is that it is
> > synchronous - it does not return to the main event loop until the
> > migration is done.
> >
> > So the only way to handle things that need working event loop is to
> > pump it manually from inside the migration code.
> >
> > The reason why I used the QEMU thread pool in the first place in this
> > patch set version is because Peter asked me to do so during the review
> > of its previous iteration [1].
> >
> > Peter also asked me previously to move to QEMU synchronization
> > primitives from using the Glib ones in the early version of this
> > patch set [2].
> >
> > I personally would rather use something common to many applications,
> > well tested and with more pairs of eyes looking at it rather to
> > re-invent things in QEMU.
> >
> > Looking at GThreadPool it seems that it lacks ability to wait until
> > all queued work have finished, so this would need to be open-coded
> > in the migration code.
> >
> > @Peter, what's your opinion on using Glib's thread pool instead of
> > QEMU's one, considering the above things?
> 
> I'll add a bit more about my thinking:
> 
> Using QEMU's event-driven model is usually preferred because it makes
> integrating with the rest of QEMU easy and avoids having lots of
> single-purpose threads that are hard to observe/manage (e.g. through
> the QMP monitor).
> 
> When there is a genuine need to spawn a thread and write synchronous
> code (e.g. a blocking ioctl(2) call or something CPU-intensive), then

Right, AFAIU this is the current use case for VFIO, and anything beyond in
migration context, where we want to use genuine threads with no need to
integrate with the main even loop.

Currently the VFIO workfn should read() the VFIO fd in a blocked way, then
dump them to multifd threads (further dump to migration channels), during
which it can wait on a semaphore.

> it's okay to do that. Use QEMUBH, EventNotifier, or other QEMU APIs to
> synchronize between event loop threads and special-purpose synchronous
> threads.
> 
> I haven't looked at the patch series enough to have an opinion about
> whether this use case needs a special-purpose thread or not. I am
> assuming it really needs to be a special-purpose thread. Peter and you
> could discuss that further if you want.
> 
> I agree with Peter's request to use QEMU's synchronization primitives.
> They do not depend on the event loop so they can be used outside the
> event loop.
> 
> The issue I'm raising with this patch is that adding new join()/poll()
> APIs that shouldn't be called from the event loop is bug-prone. It
> will make the QEMU ThreadPool code harder to understand and maintain
> because now there are two different contexts where different subsets
> of this API can be used and mixing them leads to problems. To me the
> non-event loop case is beyond the scope of QEMU's ThreadPool. I have
> CCed Paolo, who wrote the thread pool in its current form in case he
> wants to participate in the discussion.
> 
> Using glib's ThreadPool solves the issue while still reusing an
> existing thread pool implementation. Waiting for all work to complete
> can be done using QemuSemaphore.

Right.  It's a pity that g_thread_pool_unprocessed() only monitors
unqueuing of tasks, and looks like there's no g_thread_pool_flush().

Indeed the current thread poll is very aio-centric, and if we worry about
misuse of the APIs we can switch to glib's threadpool.  Sorry Maciej, looks
like I routed you to a direction that I didn't see the side effects..

I think the fundamental request from my side (on behalf of migration) is we
should avoid a specific vmstate handler managing threads on its own.  E.g.,
any future devices (vdpa, vcpu, etc.) that may also be able to offload
save() processes concurrently to threads (just like what VFIO can already
do right now) should share the same pool of threads.  As long as that can
be achieved I am ok.

Thanks,
Maciej S. Szmigiero Sept. 9, 2024, 6:38 p.m. UTC | #9
On 9.09.2024 18:45, Peter Xu wrote:
> Hi, Stefan, Maciej,
> 
> Sorry to be slow on responding.
> 
> On Tue, Sep 03, 2024 at 03:04:54PM -0400, Stefan Hajnoczi wrote:
>> On Tue, 3 Sept 2024 at 12:54, Maciej S. Szmigiero
>> <mail@maciej.szmigiero.name> wrote:
>>>
>>> On 3.09.2024 15:55, Stefan Hajnoczi wrote:
>>>> On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
>>>> <mail@maciej.szmigiero.name> wrote:
>>>>>
>>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>>
>>>>> Migration code wants to manage device data sending threads in one place.
>>>>>
>>>>> QEMU has an existing thread pool implementation, however it was limited
>>>>> to queuing AIO operations only and essentially had a 1:1 mapping between
>>>>> the current AioContext and the ThreadPool in use.
>>>>>
>>>>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>>>>> too.
>>>>>
>>>>> This brings a few new operations on a pool:
>>>>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>>>>> thread count in the pool.
>>>>>
>>>>> * thread_pool_join() operation waits until all the submitted work requests
>>>>> have finished.
>>>>>
>>>>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>>>>> halves run (if they are indeed scheduled to be run).
>>>>> It is useful for thread pool users that need to launch or terminate new
>>>>> threads without returning to the QEMU main loop.
>>>>
>>>> Did you consider glib's GThreadPool?
>>>> https://docs.gtk.org/glib/struct.ThreadPool.html
>>>>
>>>> QEMU's thread pool is integrated into the QEMU event loop. If your
>>>> goal is to bypass the QEMU event loop, then you may as well use the
>>>> glib API instead.
>>>>
>>>> thread_pool_join() and thread_pool_poll() will lead to code that
>>>> blocks the event loop. QEMU's aio_poll() and nested event loops in
>>>> general are a source of hangs and re-entrancy bugs. I would prefer not
>>>> introducing these issues in the QEMU ThreadPool API.
>>>>
>>>
>>> Unfortunately, the problem with the migration code is that it is
>>> synchronous - it does not return to the main event loop until the
>>> migration is done.
>>>
>>> So the only way to handle things that need working event loop is to
>>> pump it manually from inside the migration code.
>>>
>>> The reason why I used the QEMU thread pool in the first place in this
>>> patch set version is because Peter asked me to do so during the review
>>> of its previous iteration [1].
>>>
>>> Peter also asked me previously to move to QEMU synchronization
>>> primitives from using the Glib ones in the early version of this
>>> patch set [2].
>>>
>>> I personally would rather use something common to many applications,
>>> well tested and with more pairs of eyes looking at it rather to
>>> re-invent things in QEMU.
>>>
>>> Looking at GThreadPool it seems that it lacks ability to wait until
>>> all queued work have finished, so this would need to be open-coded
>>> in the migration code.
>>>
>>> @Peter, what's your opinion on using Glib's thread pool instead of
>>> QEMU's one, considering the above things?
>>
>> I'll add a bit more about my thinking:
>>
>> Using QEMU's event-driven model is usually preferred because it makes
>> integrating with the rest of QEMU easy and avoids having lots of
>> single-purpose threads that are hard to observe/manage (e.g. through
>> the QMP monitor).
>>
>> When there is a genuine need to spawn a thread and write synchronous
>> code (e.g. a blocking ioctl(2) call or something CPU-intensive), then
> 
> Right, AFAIU this is the current use case for VFIO, and anything beyond in
> migration context, where we want to use genuine threads with no need to
> integrate with the main even loop.
> 
> Currently the VFIO workfn should read() the VFIO fd in a blocked way, then
> dump them to multifd threads (further dump to migration channels), during
> which it can wait on a semaphore.
> 
>> it's okay to do that. Use QEMUBH, EventNotifier, or other QEMU APIs to
>> synchronize between event loop threads and special-purpose synchronous
>> threads.
>>
>> I haven't looked at the patch series enough to have an opinion about
>> whether this use case needs a special-purpose thread or not. I am
>> assuming it really needs to be a special-purpose thread. Peter and you
>> could discuss that further if you want.
>>
>> I agree with Peter's request to use QEMU's synchronization primitives.
>> They do not depend on the event loop so they can be used outside the
>> event loop.
>>
>> The issue I'm raising with this patch is that adding new join()/poll()
>> APIs that shouldn't be called from the event loop is bug-prone. It
>> will make the QEMU ThreadPool code harder to understand and maintain
>> because now there are two different contexts where different subsets
>> of this API can be used and mixing them leads to problems. To me the
>> non-event loop case is beyond the scope of QEMU's ThreadPool. I have
>> CCed Paolo, who wrote the thread pool in its current form in case he
>> wants to participate in the discussion.
>>
>> Using glib's ThreadPool solves the issue while still reusing an
>> existing thread pool implementation. Waiting for all work to complete
>> can be done using QemuSemaphore.
> 
> Right.  It's a pity that g_thread_pool_unprocessed() only monitors
> unqueuing of tasks, and looks like there's no g_thread_pool_flush().
> 
> Indeed the current thread poll is very aio-centric, and if we worry about
> misuse of the APIs we can switch to glib's threadpool.  Sorry Maciej, looks
> like I routed you to a direction that I didn't see the side effects..
> 
> I think the fundamental request from my side (on behalf of migration) is we
> should avoid a specific vmstate handler managing threads on its own.  E.g.,
> any future devices (vdpa, vcpu, etc.) that may also be able to offload
> save() processes concurrently to threads (just like what VFIO can already
> do right now) should share the same pool of threads.  As long as that can
> be achieved I am ok.

So, to be clear - do you still prefer using the (extended) QEMU's thread pool
or rather prefer switching to the Glib thread pool instead (with
thread_pool_wait() equivalent reimplemented inside QEMU since Glib lacks it)?
  
> Thanks,
> 

Thanks,
Maciej
Peter Xu Sept. 9, 2024, 7:12 p.m. UTC | #10
On Mon, Sep 09, 2024 at 08:38:45PM +0200, Maciej S. Szmigiero wrote:
> On 9.09.2024 18:45, Peter Xu wrote:
> > Hi, Stefan, Maciej,
> > 
> > Sorry to be slow on responding.
> > 
> > On Tue, Sep 03, 2024 at 03:04:54PM -0400, Stefan Hajnoczi wrote:
> > > On Tue, 3 Sept 2024 at 12:54, Maciej S. Szmigiero
> > > <mail@maciej.szmigiero.name> wrote:
> > > > 
> > > > On 3.09.2024 15:55, Stefan Hajnoczi wrote:
> > > > > On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
> > > > > <mail@maciej.szmigiero.name> wrote:
> > > > > > 
> > > > > > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> > > > > > 
> > > > > > Migration code wants to manage device data sending threads in one place.
> > > > > > 
> > > > > > QEMU has an existing thread pool implementation, however it was limited
> > > > > > to queuing AIO operations only and essentially had a 1:1 mapping between
> > > > > > the current AioContext and the ThreadPool in use.
> > > > > > 
> > > > > > Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
> > > > > > too.
> > > > > > 
> > > > > > This brings a few new operations on a pool:
> > > > > > * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
> > > > > > thread count in the pool.
> > > > > > 
> > > > > > * thread_pool_join() operation waits until all the submitted work requests
> > > > > > have finished.
> > > > > > 
> > > > > > * thread_pool_poll() lets the new thread and / or thread completion bottom
> > > > > > halves run (if they are indeed scheduled to be run).
> > > > > > It is useful for thread pool users that need to launch or terminate new
> > > > > > threads without returning to the QEMU main loop.
> > > > > 
> > > > > Did you consider glib's GThreadPool?
> > > > > https://docs.gtk.org/glib/struct.ThreadPool.html
> > > > > 
> > > > > QEMU's thread pool is integrated into the QEMU event loop. If your
> > > > > goal is to bypass the QEMU event loop, then you may as well use the
> > > > > glib API instead.
> > > > > 
> > > > > thread_pool_join() and thread_pool_poll() will lead to code that
> > > > > blocks the event loop. QEMU's aio_poll() and nested event loops in
> > > > > general are a source of hangs and re-entrancy bugs. I would prefer not
> > > > > introducing these issues in the QEMU ThreadPool API.
> > > > > 
> > > > 
> > > > Unfortunately, the problem with the migration code is that it is
> > > > synchronous - it does not return to the main event loop until the
> > > > migration is done.
> > > > 
> > > > So the only way to handle things that need working event loop is to
> > > > pump it manually from inside the migration code.
> > > > 
> > > > The reason why I used the QEMU thread pool in the first place in this
> > > > patch set version is because Peter asked me to do so during the review
> > > > of its previous iteration [1].
> > > > 
> > > > Peter also asked me previously to move to QEMU synchronization
> > > > primitives from using the Glib ones in the early version of this
> > > > patch set [2].
> > > > 
> > > > I personally would rather use something common to many applications,
> > > > well tested and with more pairs of eyes looking at it rather to
> > > > re-invent things in QEMU.
> > > > 
> > > > Looking at GThreadPool it seems that it lacks ability to wait until
> > > > all queued work have finished, so this would need to be open-coded
> > > > in the migration code.
> > > > 
> > > > @Peter, what's your opinion on using Glib's thread pool instead of
> > > > QEMU's one, considering the above things?
> > > 
> > > I'll add a bit more about my thinking:
> > > 
> > > Using QEMU's event-driven model is usually preferred because it makes
> > > integrating with the rest of QEMU easy and avoids having lots of
> > > single-purpose threads that are hard to observe/manage (e.g. through
> > > the QMP monitor).
> > > 
> > > When there is a genuine need to spawn a thread and write synchronous
> > > code (e.g. a blocking ioctl(2) call or something CPU-intensive), then
> > 
> > Right, AFAIU this is the current use case for VFIO, and anything beyond in
> > migration context, where we want to use genuine threads with no need to
> > integrate with the main even loop.
> > 
> > Currently the VFIO workfn should read() the VFIO fd in a blocked way, then
> > dump them to multifd threads (further dump to migration channels), during
> > which it can wait on a semaphore.
> > 
> > > it's okay to do that. Use QEMUBH, EventNotifier, or other QEMU APIs to
> > > synchronize between event loop threads and special-purpose synchronous
> > > threads.
> > > 
> > > I haven't looked at the patch series enough to have an opinion about
> > > whether this use case needs a special-purpose thread or not. I am
> > > assuming it really needs to be a special-purpose thread. Peter and you
> > > could discuss that further if you want.
> > > 
> > > I agree with Peter's request to use QEMU's synchronization primitives.
> > > They do not depend on the event loop so they can be used outside the
> > > event loop.
> > > 
> > > The issue I'm raising with this patch is that adding new join()/poll()
> > > APIs that shouldn't be called from the event loop is bug-prone. It
> > > will make the QEMU ThreadPool code harder to understand and maintain
> > > because now there are two different contexts where different subsets
> > > of this API can be used and mixing them leads to problems. To me the
> > > non-event loop case is beyond the scope of QEMU's ThreadPool. I have
> > > CCed Paolo, who wrote the thread pool in its current form in case he
> > > wants to participate in the discussion.
> > > 
> > > Using glib's ThreadPool solves the issue while still reusing an
> > > existing thread pool implementation. Waiting for all work to complete
> > > can be done using QemuSemaphore.
> > 
> > Right.  It's a pity that g_thread_pool_unprocessed() only monitors
> > unqueuing of tasks, and looks like there's no g_thread_pool_flush().
> > 
> > Indeed the current thread poll is very aio-centric, and if we worry about
> > misuse of the APIs we can switch to glib's threadpool.  Sorry Maciej, looks
> > like I routed you to a direction that I didn't see the side effects..
> > 
> > I think the fundamental request from my side (on behalf of migration) is we
> > should avoid a specific vmstate handler managing threads on its own.  E.g.,
> > any future devices (vdpa, vcpu, etc.) that may also be able to offload
> > save() processes concurrently to threads (just like what VFIO can already
> > do right now) should share the same pool of threads.  As long as that can
> > be achieved I am ok.
> 
> So, to be clear - do you still prefer using the (extended) QEMU's thread pool
> or rather prefer switching to the Glib thread pool instead (with
> thread_pool_wait() equivalent reimplemented inside QEMU since Glib lacks it)?

After reading Stefan's comment, I prefer the latter.

I wonder whether we should rename the current ThreadPool to AioThreadPool
or similar, so that it'll be crystal clear we want it to stick to aio
context.  Then the new pool can be the raw thread pool (and I also wonder
whether at some point the aio thread pool can still reuse the raw thread
pool to some degree).

And yes, it'll be nice we can wrap glib's with a wait() semantics.

Thanks,
Maciej S. Szmigiero Sept. 9, 2024, 7:16 p.m. UTC | #11
On 9.09.2024 21:12, Peter Xu wrote:
> On Mon, Sep 09, 2024 at 08:38:45PM +0200, Maciej S. Szmigiero wrote:
>> On 9.09.2024 18:45, Peter Xu wrote:
>>> Hi, Stefan, Maciej,
>>>
>>> Sorry to be slow on responding.
>>>
>>> On Tue, Sep 03, 2024 at 03:04:54PM -0400, Stefan Hajnoczi wrote:
>>>> On Tue, 3 Sept 2024 at 12:54, Maciej S. Szmigiero
>>>> <mail@maciej.szmigiero.name> wrote:
>>>>>
>>>>> On 3.09.2024 15:55, Stefan Hajnoczi wrote:
>>>>>> On Tue, 27 Aug 2024 at 13:58, Maciej S. Szmigiero
>>>>>> <mail@maciej.szmigiero.name> wrote:
>>>>>>>
>>>>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>>>>
>>>>>>> Migration code wants to manage device data sending threads in one place.
>>>>>>>
>>>>>>> QEMU has an existing thread pool implementation, however it was limited
>>>>>>> to queuing AIO operations only and essentially had a 1:1 mapping between
>>>>>>> the current AioContext and the ThreadPool in use.
>>>>>>>
>>>>>>> Implement what is necessary to queue generic (non-AIO) work on a ThreadPool
>>>>>>> too.
>>>>>>>
>>>>>>> This brings a few new operations on a pool:
>>>>>>> * thread_pool_set_minmax_threads() explicitly sets the minimum and maximum
>>>>>>> thread count in the pool.
>>>>>>>
>>>>>>> * thread_pool_join() operation waits until all the submitted work requests
>>>>>>> have finished.
>>>>>>>
>>>>>>> * thread_pool_poll() lets the new thread and / or thread completion bottom
>>>>>>> halves run (if they are indeed scheduled to be run).
>>>>>>> It is useful for thread pool users that need to launch or terminate new
>>>>>>> threads without returning to the QEMU main loop.
>>>>>>
>>>>>> Did you consider glib's GThreadPool?
>>>>>> https://docs.gtk.org/glib/struct.ThreadPool.html
>>>>>>
>>>>>> QEMU's thread pool is integrated into the QEMU event loop. If your
>>>>>> goal is to bypass the QEMU event loop, then you may as well use the
>>>>>> glib API instead.
>>>>>>
>>>>>> thread_pool_join() and thread_pool_poll() will lead to code that
>>>>>> blocks the event loop. QEMU's aio_poll() and nested event loops in
>>>>>> general are a source of hangs and re-entrancy bugs. I would prefer not
>>>>>> introducing these issues in the QEMU ThreadPool API.
>>>>>>
>>>>>
>>>>> Unfortunately, the problem with the migration code is that it is
>>>>> synchronous - it does not return to the main event loop until the
>>>>> migration is done.
>>>>>
>>>>> So the only way to handle things that need working event loop is to
>>>>> pump it manually from inside the migration code.
>>>>>
>>>>> The reason why I used the QEMU thread pool in the first place in this
>>>>> patch set version is because Peter asked me to do so during the review
>>>>> of its previous iteration [1].
>>>>>
>>>>> Peter also asked me previously to move to QEMU synchronization
>>>>> primitives from using the Glib ones in the early version of this
>>>>> patch set [2].
>>>>>
>>>>> I personally would rather use something common to many applications,
>>>>> well tested and with more pairs of eyes looking at it rather to
>>>>> re-invent things in QEMU.
>>>>>
>>>>> Looking at GThreadPool it seems that it lacks ability to wait until
>>>>> all queued work have finished, so this would need to be open-coded
>>>>> in the migration code.
>>>>>
>>>>> @Peter, what's your opinion on using Glib's thread pool instead of
>>>>> QEMU's one, considering the above things?
>>>>
>>>> I'll add a bit more about my thinking:
>>>>
>>>> Using QEMU's event-driven model is usually preferred because it makes
>>>> integrating with the rest of QEMU easy and avoids having lots of
>>>> single-purpose threads that are hard to observe/manage (e.g. through
>>>> the QMP monitor).
>>>>
>>>> When there is a genuine need to spawn a thread and write synchronous
>>>> code (e.g. a blocking ioctl(2) call or something CPU-intensive), then
>>>
>>> Right, AFAIU this is the current use case for VFIO, and anything beyond in
>>> migration context, where we want to use genuine threads with no need to
>>> integrate with the main even loop.
>>>
>>> Currently the VFIO workfn should read() the VFIO fd in a blocked way, then
>>> dump them to multifd threads (further dump to migration channels), during
>>> which it can wait on a semaphore.
>>>
>>>> it's okay to do that. Use QEMUBH, EventNotifier, or other QEMU APIs to
>>>> synchronize between event loop threads and special-purpose synchronous
>>>> threads.
>>>>
>>>> I haven't looked at the patch series enough to have an opinion about
>>>> whether this use case needs a special-purpose thread or not. I am
>>>> assuming it really needs to be a special-purpose thread. Peter and you
>>>> could discuss that further if you want.
>>>>
>>>> I agree with Peter's request to use QEMU's synchronization primitives.
>>>> They do not depend on the event loop so they can be used outside the
>>>> event loop.
>>>>
>>>> The issue I'm raising with this patch is that adding new join()/poll()
>>>> APIs that shouldn't be called from the event loop is bug-prone. It
>>>> will make the QEMU ThreadPool code harder to understand and maintain
>>>> because now there are two different contexts where different subsets
>>>> of this API can be used and mixing them leads to problems. To me the
>>>> non-event loop case is beyond the scope of QEMU's ThreadPool. I have
>>>> CCed Paolo, who wrote the thread pool in its current form in case he
>>>> wants to participate in the discussion.
>>>>
>>>> Using glib's ThreadPool solves the issue while still reusing an
>>>> existing thread pool implementation. Waiting for all work to complete
>>>> can be done using QemuSemaphore.
>>>
>>> Right.  It's a pity that g_thread_pool_unprocessed() only monitors
>>> unqueuing of tasks, and looks like there's no g_thread_pool_flush().
>>>
>>> Indeed the current thread poll is very aio-centric, and if we worry about
>>> misuse of the APIs we can switch to glib's threadpool.  Sorry Maciej, looks
>>> like I routed you to a direction that I didn't see the side effects..
>>>
>>> I think the fundamental request from my side (on behalf of migration) is we
>>> should avoid a specific vmstate handler managing threads on its own.  E.g.,
>>> any future devices (vdpa, vcpu, etc.) that may also be able to offload
>>> save() processes concurrently to threads (just like what VFIO can already
>>> do right now) should share the same pool of threads.  As long as that can
>>> be achieved I am ok.
>>
>> So, to be clear - do you still prefer using the (extended) QEMU's thread pool
>> or rather prefer switching to the Glib thread pool instead (with
>> thread_pool_wait() equivalent reimplemented inside QEMU since Glib lacks it)?
> 
> After reading Stefan's comment, I prefer the latter.
> 
> I wonder whether we should rename the current ThreadPool to AioThreadPool
> or similar, so that it'll be crystal clear we want it to stick to aio
> context.  Then the new pool can be the raw thread pool (and I also wonder
> whether at some point the aio thread pool can still reuse the raw thread
> pool to some degree).
> 
> And yes, it'll be nice we can wrap glib's with a wait() semantics.

So, if I understand your design correctly, you want to basically wrap
the Glib's GThreadPool into some QEMU GenericThreadPool and then use the
later in multifd code, right?
  
> Thanks,
> 

Thanks,
Maciej
Peter Xu Sept. 9, 2024, 7:24 p.m. UTC | #12
On Mon, Sep 09, 2024 at 09:16:32PM +0200, Maciej S. Szmigiero wrote:
> So, if I understand your design correctly, you want to basically wrap
> the Glib's GThreadPool into some QEMU GenericThreadPool and then use the
> later in multifd code, right?

Yes.  I didn't have an explicit picture yet in mind, but what you said
makes sense to me.
diff mbox series

Patch

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index b484c4780ea6..1769496056cd 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -37,9 +37,15 @@  BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
                                    void *arg, GDestroyNotify arg_destroy,
                                    BlockCompletionFunc *cb, void *opaque);
 int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPoolFunc *func,
-                        void *arg, GDestroyNotify arg_destroy);
+BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+                               void *arg, GDestroyNotify arg_destroy,
+                               BlockCompletionFunc *cb, void *opaque);
 
+void thread_pool_join(ThreadPool *pool);
+void thread_pool_poll(ThreadPool *pool);
+
+void thread_pool_set_minmax_threads(ThreadPool *pool,
+                                    int min_threads, int max_threads);
 void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
 
 #endif
diff --git a/tests/unit/test-thread-pool.c b/tests/unit/test-thread-pool.c
index e4afb9e36292..469c0f7057b6 100644
--- a/tests/unit/test-thread-pool.c
+++ b/tests/unit/test-thread-pool.c
@@ -46,7 +46,7 @@  static void done_cb(void *opaque, int ret)
 static void test_submit(void)
 {
     WorkerTestData data = { .n = 0 };
-    thread_pool_submit(worker_cb, &data, NULL);
+    thread_pool_submit(NULL, worker_cb, &data, NULL, NULL, NULL);
     while (data.n == 0) {
         aio_poll(ctx, true);
     }
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 69a87ee79252..2bf3be875a51 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -60,6 +60,7 @@  struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuCond request_cond;
+    QemuCond no_requests_cond;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -73,6 +74,7 @@  struct ThreadPool {
     int pending_threads; /* threads created but not running yet */
     int min_threads;
     int max_threads;
+    size_t requests_executing;
 };
 
 static void *worker_thread(void *opaque)
@@ -107,6 +109,10 @@  static void *worker_thread(void *opaque)
         req = QTAILQ_FIRST(&pool->request_list);
         QTAILQ_REMOVE(&pool->request_list, req, reqs);
         req->state = THREAD_ACTIVE;
+
+        assert(pool->requests_executing < SIZE_MAX);
+        pool->requests_executing++;
+
         qemu_mutex_unlock(&pool->lock);
 
         ret = req->func(req->arg);
@@ -118,6 +124,14 @@  static void *worker_thread(void *opaque)
 
         qemu_bh_schedule(pool->completion_bh);
         qemu_mutex_lock(&pool->lock);
+
+        assert(pool->requests_executing > 0);
+        pool->requests_executing--;
+
+        if (pool->requests_executing == 0 &&
+            QTAILQ_EMPTY(&pool->request_list)) {
+            qemu_cond_signal(&pool->no_requests_cond);
+        }
     }
 
     pool->cur_threads--;
@@ -243,13 +257,16 @@  static const AIOCBInfo thread_pool_aiocb_info = {
     .cancel_async       = thread_pool_cancel,
 };
 
-BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
-                                   void *arg, GDestroyNotify arg_destroy,
-                                   BlockCompletionFunc *cb, void *opaque)
+BlockAIOCB *thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+                               void *arg, GDestroyNotify arg_destroy,
+                               BlockCompletionFunc *cb, void *opaque)
 {
     ThreadPoolElement *req;
     AioContext *ctx = qemu_get_current_aio_context();
-    ThreadPool *pool = aio_get_thread_pool(ctx);
+
+    if (!pool) {
+        pool = aio_get_thread_pool(ctx);
+    }
 
     /* Assert that the thread submitting work is the same running the pool */
     assert(pool->ctx == qemu_get_current_aio_context());
@@ -275,6 +292,18 @@  BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
     return &req->common;
 }
 
+BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
+                                   void *arg, GDestroyNotify arg_destroy,
+                                   BlockCompletionFunc *cb, void *opaque)
+{
+    return thread_pool_submit(NULL, func, arg, arg_destroy, cb, opaque);
+}
+
+void thread_pool_poll(ThreadPool *pool)
+{
+    aio_bh_poll(pool->ctx);
+}
+
 typedef struct ThreadPoolCo {
     Coroutine *co;
     int ret;
@@ -297,18 +326,38 @@  int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
     return tpc.ret;
 }
 
-void thread_pool_submit(ThreadPoolFunc *func,
-                        void *arg, GDestroyNotify arg_destroy)
+void thread_pool_join(ThreadPool *pool)
 {
-    thread_pool_submit_aio(func, arg, arg_destroy, NULL, NULL);
+    /* Assert that the thread waiting is the same running the pool */
+    assert(pool->ctx == qemu_get_current_aio_context());
+
+    qemu_mutex_lock(&pool->lock);
+
+    if (pool->requests_executing > 0 ||
+        !QTAILQ_EMPTY(&pool->request_list)) {
+        qemu_cond_wait(&pool->no_requests_cond, &pool->lock);
+    }
+    assert(pool->requests_executing == 0 &&
+           QTAILQ_EMPTY(&pool->request_list));
+
+    qemu_mutex_unlock(&pool->lock);
+
+    aio_bh_poll(pool->ctx);
+
+    assert(QLIST_EMPTY(&pool->head));
 }
 
-void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+void thread_pool_set_minmax_threads(ThreadPool *pool,
+                                    int min_threads, int max_threads)
 {
+    assert(min_threads >= 0);
+    assert(max_threads > 0);
+    assert(max_threads >= min_threads);
+
     qemu_mutex_lock(&pool->lock);
 
-    pool->min_threads = ctx->thread_pool_min;
-    pool->max_threads = ctx->thread_pool_max;
+    pool->min_threads = min_threads;
+    pool->max_threads = max_threads;
 
     /*
      * We either have to:
@@ -330,6 +379,12 @@  void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_unlock(&pool->lock);
 }
 
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
+{
+    thread_pool_set_minmax_threads(pool,
+                                   ctx->thread_pool_min, ctx->thread_pool_max);
+}
+
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
     if (!ctx) {
@@ -342,6 +397,7 @@  static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_cond_init(&pool->request_cond);
+    qemu_cond_init(&pool->no_requests_cond);
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
@@ -382,6 +438,7 @@  void thread_pool_free(ThreadPool *pool)
     qemu_mutex_unlock(&pool->lock);
 
     qemu_bh_delete(pool->completion_bh);
+    qemu_cond_destroy(&pool->no_requests_cond);
     qemu_cond_destroy(&pool->request_cond);
     qemu_cond_destroy(&pool->worker_stopped);
     qemu_mutex_destroy(&pool->lock);