@@ -21,13 +21,14 @@
#include "block/aio.h"
enum {
- POOL_BATCH_SIZE = 64,
+ POOL_BATCH_SIZE = 16,
+ POOL_MAX_BATCHES = 32,
};
-/** Free list to speed up creation */
-static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
-static unsigned int release_pool_size;
-static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool);
+/** Free stack to speed up creation */
+static QSLIST_HEAD(, Coroutine) pool[POOL_MAX_BATCHES];
+static int pool_top;
+static __thread QSLIST_HEAD(, Coroutine) alloc_pool;
static __thread unsigned int alloc_pool_size;
static __thread Notifier coroutine_pool_cleanup_notifier;
@@ -49,20 +50,26 @@ Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
if (CONFIG_COROUTINE_POOL) {
co = QSLIST_FIRST(&alloc_pool);
if (!co) {
- if (release_pool_size > POOL_BATCH_SIZE) {
- /* Slow path; a good place to register the destructor, too. */
- if (!coroutine_pool_cleanup_notifier.notify) {
- coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
- qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
+ int top;
+
+ /* Slow path; a good place to register the destructor, too. */
+ if (!coroutine_pool_cleanup_notifier.notify) {
+ coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
+ qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
+ }
+
+ while ((top = atomic_read(&pool_top)) > 0) {
+ if (atomic_cmpxchg(&pool_top, top, top - 1) != top) {
+ continue;
}
- /* This is not exact; there could be a little skew between
- * release_pool_size and the actual size of release_pool. But
- * it is just a heuristic, it does not need to be perfect.
- */
- alloc_pool_size = atomic_xchg(&release_pool_size, 0);
- QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
+ QSLIST_MOVE_ATOMIC(&alloc_pool, &pool[top - 1]);
co = QSLIST_FIRST(&alloc_pool);
+
+ if (co) {
+ alloc_pool_size = POOL_BATCH_SIZE;
+ break;
+ }
}
}
if (co) {
@@ -86,16 +93,30 @@ static void coroutine_delete(Coroutine *co)
co->caller = NULL;
if (CONFIG_COROUTINE_POOL) {
- if (release_pool_size < POOL_BATCH_SIZE * 2) {
- QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
- atomic_inc(&release_pool_size);
- return;
- }
+ int top, value, old;
+
if (alloc_pool_size < POOL_BATCH_SIZE) {
QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
alloc_pool_size++;
return;
}
+
+ for (top = atomic_read(&pool_top); top < POOL_MAX_BATCHES; top++) {
+ QSLIST_REPLACE_ATOMIC(&pool[top], &alloc_pool);
+ if (!QSLIST_EMPTY(&alloc_pool)) {
+ continue;
+ }
+
+ value = top + 1;
+
+ do {
+ old = atomic_cmpxchg(&pool_top, top, value);
+ } while (old != top && (top = old) < value);
+
+ QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
+ alloc_pool_size = 1;
+ return;
+ }
}
qemu_coroutine_delete(co);
This patch replace the global coroutine queue with a lock-free stack of which the elements are coroutine queues. Threads can put coroutine queues into the stack or take queues from it and each coroutine queue has exactly POOL_BATCH_SIZE coroutines. Note that the stack is not strictly LIFO, but it's enough for buffer pool. Coroutines will be put into thread-local pools first while release. Now the fast pathes of both allocation and release are atomic-free, and there won't be too many coroutines remain in a single thread since POOL_BATCH_SIZE has been reduced to 16. In practice, I've run a VM with two block devices binding to two different iothreads, and run fio with iodepth 128 on each device. It maintains around 400 coroutines and has about 1% chance of calling to `qemu_coroutine_new` without this patch. And with this patch, it maintains no more than 273 coroutines and doesn't call `qemu_coroutine_new` after initial allocations. Signed-off-by: wanghonghao <wanghonghao@bytedance.com> --- util/qemu-coroutine.c | 63 ++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 21 deletions(-)