Message ID | 20200222085030.1760640-5-stefanha@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | [PULL,01/31] virtio: increase virtqueue size for virtio-scsi and virtio-blk | expand |
Hi On Sat, Feb 22, 2020 at 9:51 AM Stefan Hajnoczi <stefanha@redhat.com> wrote: > > The ctx->first_bh list contains all created BHs, including those that > are not scheduled. The list is iterated by the event loop and therefore > has O(n) time complexity with respected to the number of created BHs. > > Rewrite BHs so that only scheduled or deleted BHs are enqueued. > Only BHs that actually require action will be iterated. > > One semantic change is required: qemu_bh_delete() enqueues the BH and > therefore invokes aio_notify(). The > tests/test-aio.c:test_source_bh_delete_from_cb() test case assumed that > g_main_context_iteration(NULL, false) returns false after > qemu_bh_delete() but it now returns true for one iteration. Fix up the > test case. > > This patch makes aio_compute_timeout() and aio_bh_poll() drop from a CPU > profile reported by perf-top(1). Previously they combined to 9% CPU > utilization when AioContext polling is commented out and the guest has 2 > virtio-blk,num-queues=1 and 99 virtio-blk,num-queues=32 devices. > > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> > Reviewed-by: Paolo Bonzini <pbonzini@redhat.com> > Message-id: 20200221093951.1414693-1-stefanha@redhat.com > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> > --- Current master migration-test fails with --enable-sanitizers. qemu exit 1 when 0 is expected. QTEST_QEMU_BINARY=x86_64-softmmu/qemu-system-x86_64 tests/qtest/migration-test -p /x86_64/migration/postcopy/recovery tests/qtest/libqtest.c:140: kill_qemu() tried to terminate QEMU process but encountered exit status 1 (expected 0) Leak is ================================================================= ==2082571==ERROR: LeakSanitizer: detected memory leaks Direct leak of 40 byte(s) in 1 object(s) allocated from: #0 0x7f25971dfc58 in __interceptor_malloc (/lib64/libasan.so.5+0x10dc58) #1 0x7f2596d08358 in g_malloc (/lib64/libglib-2.0.so.0+0x57358) #2 0x560970d006f8 in qemu_bh_new /home/elmarco/src/qemu/util/main-loop.c:532 #3 0x5609704afa02 in migrate_fd_connect /home/elmarco/src/qemu/migration/migration.c:3407 #4 0x5609704b6b6f in migration_channel_connect /home/elmarco/src/qemu/migration/channel.c:92 #5 0x5609704b2bfb in socket_outgoing_migration /home/elmarco/src/qemu/migration/socket.c:108 #6 0x560970b9bd6c in qio_task_complete /home/elmarco/src/qemu/io/task.c:196 #7 0x560970b9aa97 in qio_task_thread_result /home/elmarco/src/qemu/io/task.c:111 #8 0x7f2596cfee3a (/lib64/libglib-2.0.so.0+0x4de3a) Any idea? > include/block/aio.h | 20 +++- > tests/test-aio.c | 3 +- > util/async.c | 237 ++++++++++++++++++++++++++------------------ > 3 files changed, 158 insertions(+), 102 deletions(-) > > diff --git a/include/block/aio.h b/include/block/aio.h > index 7ba9bd7874..1a2ce9ca26 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -51,6 +51,19 @@ struct ThreadPool; > struct LinuxAioState; > struct LuringState; > > +/* > + * Each aio_bh_poll() call carves off a slice of the BH list, so that newly > + * scheduled BHs are not processed until the next aio_bh_poll() call. All > + * active aio_bh_poll() calls chain their slices together in a list, so that > + * nested aio_bh_poll() calls process all scheduled bottom halves. > + */ > +typedef QSLIST_HEAD(, QEMUBH) BHList; > +typedef struct BHListSlice BHListSlice; > +struct BHListSlice { > + BHList bh_list; > + QSIMPLEQ_ENTRY(BHListSlice) next; > +}; > + > struct AioContext { > GSource source; > > @@ -91,8 +104,11 @@ struct AioContext { > */ > QemuLockCnt list_lock; > > - /* Anchor of the list of Bottom Halves belonging to the context */ > - struct QEMUBH *first_bh; > + /* Bottom Halves pending aio_bh_poll() processing */ > + BHList bh_list; > + > + /* Chained BH list slices for each nested aio_bh_poll() call */ > + QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list; > > /* Used by aio_notify. > * > diff --git a/tests/test-aio.c b/tests/test-aio.c > index 86fb73b3d5..8a46078463 100644 > --- a/tests/test-aio.c > +++ b/tests/test-aio.c > @@ -615,7 +615,8 @@ static void test_source_bh_delete_from_cb(void) > g_assert_cmpint(data1.n, ==, data1.max); > g_assert(data1.bh == NULL); > > - g_assert(!g_main_context_iteration(NULL, false)); > + assert(g_main_context_iteration(NULL, false)); > + assert(!g_main_context_iteration(NULL, false)); > } > > static void test_source_bh_delete_from_cb_many(void) > diff --git a/util/async.c b/util/async.c > index c192a24a61..b94518b948 100644 > --- a/util/async.c > +++ b/util/async.c > @@ -29,6 +29,7 @@ > #include "block/thread-pool.h" > #include "qemu/main-loop.h" > #include "qemu/atomic.h" > +#include "qemu/rcu_queue.h" > #include "block/raw-aio.h" > #include "qemu/coroutine_int.h" > #include "trace.h" > @@ -36,16 +37,76 @@ > /***********************************************************/ > /* bottom halves (can be seen as timers which expire ASAP) */ > > +/* QEMUBH::flags values */ > +enum { > + /* Already enqueued and waiting for aio_bh_poll() */ > + BH_PENDING = (1 << 0), > + > + /* Invoke the callback */ > + BH_SCHEDULED = (1 << 1), > + > + /* Delete without invoking callback */ > + BH_DELETED = (1 << 2), > + > + /* Delete after invoking callback */ > + BH_ONESHOT = (1 << 3), > + > + /* Schedule periodically when the event loop is idle */ > + BH_IDLE = (1 << 4), > +}; > + > struct QEMUBH { > AioContext *ctx; > QEMUBHFunc *cb; > void *opaque; > - QEMUBH *next; > - bool scheduled; > - bool idle; > - bool deleted; > + QSLIST_ENTRY(QEMUBH) next; > + unsigned flags; > }; > > +/* Called concurrently from any thread */ > +static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags) > +{ > + AioContext *ctx = bh->ctx; > + unsigned old_flags; > + > + /* > + * The memory barrier implicit in atomic_fetch_or makes sure that: > + * 1. idle & any writes needed by the callback are done before the > + * locations are read in the aio_bh_poll. > + * 2. ctx is loaded before the callback has a chance to execute and bh > + * could be freed. > + */ > + old_flags = atomic_fetch_or(&bh->flags, BH_PENDING | new_flags); > + if (!(old_flags & BH_PENDING)) { > + QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next); > + } > + > + aio_notify(ctx); > +} > + > +/* Only called from aio_bh_poll() and aio_ctx_finalize() */ > +static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags) > +{ > + QEMUBH *bh = QSLIST_FIRST_RCU(head); > + > + if (!bh) { > + return NULL; > + } > + > + QSLIST_REMOVE_HEAD(head, next); > + > + /* > + * The atomic_and is paired with aio_bh_enqueue(). The implicit memory > + * barrier ensures that the callback sees all writes done by the scheduling > + * thread. It also ensures that the scheduling thread sees the cleared > + * flag before bh->cb has run, and thus will call aio_notify again if > + * necessary. > + */ > + *flags = atomic_fetch_and(&bh->flags, > + ~(BH_PENDING | BH_SCHEDULED | BH_IDLE)); > + return bh; > +} > + > void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) > { > QEMUBH *bh; > @@ -55,15 +116,7 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) > .cb = cb, > .opaque = opaque, > }; > - qemu_lockcnt_lock(&ctx->list_lock); > - bh->next = ctx->first_bh; > - bh->scheduled = 1; > - bh->deleted = 1; > - /* Make sure that the members are ready before putting bh into list */ > - smp_wmb(); > - ctx->first_bh = bh; > - qemu_lockcnt_unlock(&ctx->list_lock); > - aio_notify(ctx); > + aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT); > } > > QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) > @@ -75,12 +128,6 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) > .cb = cb, > .opaque = opaque, > }; > - qemu_lockcnt_lock(&ctx->list_lock); > - bh->next = ctx->first_bh; > - /* Make sure that the members are ready before putting bh into list */ > - smp_wmb(); > - ctx->first_bh = bh; > - qemu_lockcnt_unlock(&ctx->list_lock); > return bh; > } > > @@ -89,91 +136,56 @@ void aio_bh_call(QEMUBH *bh) > bh->cb(bh->opaque); > } > > -/* Multiple occurrences of aio_bh_poll cannot be called concurrently. > - * The count in ctx->list_lock is incremented before the call, and is > - * not affected by the call. > - */ > +/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */ > int aio_bh_poll(AioContext *ctx) > { > - QEMUBH *bh, **bhp, *next; > - int ret; > - bool deleted = false; > - > - ret = 0; > - for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { > - next = atomic_rcu_read(&bh->next); > - /* The atomic_xchg is paired with the one in qemu_bh_schedule. The > - * implicit memory barrier ensures that the callback sees all writes > - * done by the scheduling thread. It also ensures that the scheduling > - * thread sees the zero before bh->cb has run, and thus will call > - * aio_notify again if necessary. > - */ > - if (atomic_xchg(&bh->scheduled, 0)) { > + BHListSlice slice; > + BHListSlice *s; > + int ret = 0; > + > + QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list); > + QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next); > + > + while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) { > + QEMUBH *bh; > + unsigned flags; > + > + bh = aio_bh_dequeue(&s->bh_list, &flags); > + if (!bh) { > + QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next); > + continue; > + } > + > + if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { > /* Idle BHs don't count as progress */ > - if (!bh->idle) { > + if (!(flags & BH_IDLE)) { > ret = 1; > } > - bh->idle = 0; > aio_bh_call(bh); > } > - if (bh->deleted) { > - deleted = true; > + if (flags & (BH_DELETED | BH_ONESHOT)) { > + g_free(bh); > } > } > > - /* remove deleted bhs */ > - if (!deleted) { > - return ret; > - } > - > - if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { > - bhp = &ctx->first_bh; > - while (*bhp) { > - bh = *bhp; > - if (bh->deleted && !bh->scheduled) { > - *bhp = bh->next; > - g_free(bh); > - } else { > - bhp = &bh->next; > - } > - } > - qemu_lockcnt_inc_and_unlock(&ctx->list_lock); > - } > return ret; > } > > void qemu_bh_schedule_idle(QEMUBH *bh) > { > - bh->idle = 1; > - /* Make sure that idle & any writes needed by the callback are done > - * before the locations are read in the aio_bh_poll. > - */ > - atomic_mb_set(&bh->scheduled, 1); > + aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE); > } > > void qemu_bh_schedule(QEMUBH *bh) > { > - AioContext *ctx; > - > - ctx = bh->ctx; > - bh->idle = 0; > - /* The memory barrier implicit in atomic_xchg makes sure that: > - * 1. idle & any writes needed by the callback are done before the > - * locations are read in the aio_bh_poll. > - * 2. ctx is loaded before scheduled is set and the callback has a chance > - * to execute. > - */ > - if (atomic_xchg(&bh->scheduled, 1) == 0) { > - aio_notify(ctx); > - } > + aio_bh_enqueue(bh, BH_SCHEDULED); > } > > - > /* This func is async. > */ > void qemu_bh_cancel(QEMUBH *bh) > { > - atomic_mb_set(&bh->scheduled, 0); > + atomic_and(&bh->flags, ~BH_SCHEDULED); > } > > /* This func is async.The bottom half will do the delete action at the finial > @@ -181,21 +193,16 @@ void qemu_bh_cancel(QEMUBH *bh) > */ > void qemu_bh_delete(QEMUBH *bh) > { > - bh->scheduled = 0; > - bh->deleted = 1; > + aio_bh_enqueue(bh, BH_DELETED); > } > > -int64_t > -aio_compute_timeout(AioContext *ctx) > +static int64_t aio_compute_bh_timeout(BHList *head, int timeout) > { > - int64_t deadline; > - int timeout = -1; > QEMUBH *bh; > > - for (bh = atomic_rcu_read(&ctx->first_bh); bh; > - bh = atomic_rcu_read(&bh->next)) { > - if (bh->scheduled) { > - if (bh->idle) { > + QSLIST_FOREACH_RCU(bh, head, next) { > + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { > + if (bh->flags & BH_IDLE) { > /* idle bottom halves will be polled at least > * every 10ms */ > timeout = 10000000; > @@ -207,6 +214,28 @@ aio_compute_timeout(AioContext *ctx) > } > } > > + return timeout; > +} > + > +int64_t > +aio_compute_timeout(AioContext *ctx) > +{ > + BHListSlice *s; > + int64_t deadline; > + int timeout = -1; > + > + timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout); > + if (timeout == 0) { > + return 0; > + } > + > + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { > + timeout = aio_compute_bh_timeout(&s->bh_list, timeout); > + if (timeout == 0) { > + return 0; > + } > + } > + > deadline = timerlistgroup_deadline_ns(&ctx->tlg); > if (deadline == 0) { > return 0; > @@ -237,15 +266,24 @@ aio_ctx_check(GSource *source) > { > AioContext *ctx = (AioContext *) source; > QEMUBH *bh; > + BHListSlice *s; > > atomic_and(&ctx->notify_me, ~1); > aio_notify_accept(ctx); > > - for (bh = ctx->first_bh; bh; bh = bh->next) { > - if (bh->scheduled) { > + QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) { > + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { > return true; > } > } > + > + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { > + QSLIST_FOREACH_RCU(bh, &s->bh_list, next) { > + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { > + return true; > + } > + } > + } > return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0); > } > > @@ -265,6 +303,8 @@ static void > aio_ctx_finalize(GSource *source) > { > AioContext *ctx = (AioContext *) source; > + QEMUBH *bh; > + unsigned flags; > > thread_pool_free(ctx->thread_pool); > > @@ -287,18 +327,15 @@ aio_ctx_finalize(GSource *source) > assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); > qemu_bh_delete(ctx->co_schedule_bh); > > - qemu_lockcnt_lock(&ctx->list_lock); > - assert(!qemu_lockcnt_count(&ctx->list_lock)); > - while (ctx->first_bh) { > - QEMUBH *next = ctx->first_bh->next; > + /* There must be no aio_bh_poll() calls going on */ > + assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list)); > > + while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) { > /* qemu_bh_delete() must have been called on BHs in this AioContext */ > - assert(ctx->first_bh->deleted); > + assert(flags & BH_DELETED); > > - g_free(ctx->first_bh); > - ctx->first_bh = next; > + g_free(bh); > } > - qemu_lockcnt_unlock(&ctx->list_lock); > > aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); > event_notifier_cleanup(&ctx->notifier); > @@ -445,6 +482,8 @@ AioContext *aio_context_new(Error **errp) > AioContext *ctx; > > ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); > + QSLIST_INIT(&ctx->bh_list); > + QSIMPLEQ_INIT(&ctx->bh_slice_list); > aio_context_setup(ctx); > > ret = event_notifier_init(&ctx->notifier, false); > -- > 2.24.1 >
diff --git a/include/block/aio.h b/include/block/aio.h index 7ba9bd7874..1a2ce9ca26 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -51,6 +51,19 @@ struct ThreadPool; struct LinuxAioState; struct LuringState; +/* + * Each aio_bh_poll() call carves off a slice of the BH list, so that newly + * scheduled BHs are not processed until the next aio_bh_poll() call. All + * active aio_bh_poll() calls chain their slices together in a list, so that + * nested aio_bh_poll() calls process all scheduled bottom halves. + */ +typedef QSLIST_HEAD(, QEMUBH) BHList; +typedef struct BHListSlice BHListSlice; +struct BHListSlice { + BHList bh_list; + QSIMPLEQ_ENTRY(BHListSlice) next; +}; + struct AioContext { GSource source; @@ -91,8 +104,11 @@ struct AioContext { */ QemuLockCnt list_lock; - /* Anchor of the list of Bottom Halves belonging to the context */ - struct QEMUBH *first_bh; + /* Bottom Halves pending aio_bh_poll() processing */ + BHList bh_list; + + /* Chained BH list slices for each nested aio_bh_poll() call */ + QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list; /* Used by aio_notify. * diff --git a/tests/test-aio.c b/tests/test-aio.c index 86fb73b3d5..8a46078463 100644 --- a/tests/test-aio.c +++ b/tests/test-aio.c @@ -615,7 +615,8 @@ static void test_source_bh_delete_from_cb(void) g_assert_cmpint(data1.n, ==, data1.max); g_assert(data1.bh == NULL); - g_assert(!g_main_context_iteration(NULL, false)); + assert(g_main_context_iteration(NULL, false)); + assert(!g_main_context_iteration(NULL, false)); } static void test_source_bh_delete_from_cb_many(void) diff --git a/util/async.c b/util/async.c index c192a24a61..b94518b948 100644 --- a/util/async.c +++ b/util/async.c @@ -29,6 +29,7 @@ #include "block/thread-pool.h" #include "qemu/main-loop.h" #include "qemu/atomic.h" +#include "qemu/rcu_queue.h" #include "block/raw-aio.h" #include "qemu/coroutine_int.h" #include "trace.h" @@ -36,16 +37,76 @@ /***********************************************************/ /* bottom halves (can be seen as timers which expire ASAP) */ +/* QEMUBH::flags values */ +enum { + /* Already enqueued and waiting for aio_bh_poll() */ + BH_PENDING = (1 << 0), + + /* Invoke the callback */ + BH_SCHEDULED = (1 << 1), + + /* Delete without invoking callback */ + BH_DELETED = (1 << 2), + + /* Delete after invoking callback */ + BH_ONESHOT = (1 << 3), + + /* Schedule periodically when the event loop is idle */ + BH_IDLE = (1 << 4), +}; + struct QEMUBH { AioContext *ctx; QEMUBHFunc *cb; void *opaque; - QEMUBH *next; - bool scheduled; - bool idle; - bool deleted; + QSLIST_ENTRY(QEMUBH) next; + unsigned flags; }; +/* Called concurrently from any thread */ +static void aio_bh_enqueue(QEMUBH *bh, unsigned new_flags) +{ + AioContext *ctx = bh->ctx; + unsigned old_flags; + + /* + * The memory barrier implicit in atomic_fetch_or makes sure that: + * 1. idle & any writes needed by the callback are done before the + * locations are read in the aio_bh_poll. + * 2. ctx is loaded before the callback has a chance to execute and bh + * could be freed. + */ + old_flags = atomic_fetch_or(&bh->flags, BH_PENDING | new_flags); + if (!(old_flags & BH_PENDING)) { + QSLIST_INSERT_HEAD_ATOMIC(&ctx->bh_list, bh, next); + } + + aio_notify(ctx); +} + +/* Only called from aio_bh_poll() and aio_ctx_finalize() */ +static QEMUBH *aio_bh_dequeue(BHList *head, unsigned *flags) +{ + QEMUBH *bh = QSLIST_FIRST_RCU(head); + + if (!bh) { + return NULL; + } + + QSLIST_REMOVE_HEAD(head, next); + + /* + * The atomic_and is paired with aio_bh_enqueue(). The implicit memory + * barrier ensures that the callback sees all writes done by the scheduling + * thread. It also ensures that the scheduling thread sees the cleared + * flag before bh->cb has run, and thus will call aio_notify again if + * necessary. + */ + *flags = atomic_fetch_and(&bh->flags, + ~(BH_PENDING | BH_SCHEDULED | BH_IDLE)); + return bh; +} + void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) { QEMUBH *bh; @@ -55,15 +116,7 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_lockcnt_lock(&ctx->list_lock); - bh->next = ctx->first_bh; - bh->scheduled = 1; - bh->deleted = 1; - /* Make sure that the members are ready before putting bh into list */ - smp_wmb(); - ctx->first_bh = bh; - qemu_lockcnt_unlock(&ctx->list_lock); - aio_notify(ctx); + aio_bh_enqueue(bh, BH_SCHEDULED | BH_ONESHOT); } QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) @@ -75,12 +128,6 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_lockcnt_lock(&ctx->list_lock); - bh->next = ctx->first_bh; - /* Make sure that the members are ready before putting bh into list */ - smp_wmb(); - ctx->first_bh = bh; - qemu_lockcnt_unlock(&ctx->list_lock); return bh; } @@ -89,91 +136,56 @@ void aio_bh_call(QEMUBH *bh) bh->cb(bh->opaque); } -/* Multiple occurrences of aio_bh_poll cannot be called concurrently. - * The count in ctx->list_lock is incremented before the call, and is - * not affected by the call. - */ +/* Multiple occurrences of aio_bh_poll cannot be called concurrently. */ int aio_bh_poll(AioContext *ctx) { - QEMUBH *bh, **bhp, *next; - int ret; - bool deleted = false; - - ret = 0; - for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { - next = atomic_rcu_read(&bh->next); - /* The atomic_xchg is paired with the one in qemu_bh_schedule. The - * implicit memory barrier ensures that the callback sees all writes - * done by the scheduling thread. It also ensures that the scheduling - * thread sees the zero before bh->cb has run, and thus will call - * aio_notify again if necessary. - */ - if (atomic_xchg(&bh->scheduled, 0)) { + BHListSlice slice; + BHListSlice *s; + int ret = 0; + + QSLIST_MOVE_ATOMIC(&slice.bh_list, &ctx->bh_list); + QSIMPLEQ_INSERT_TAIL(&ctx->bh_slice_list, &slice, next); + + while ((s = QSIMPLEQ_FIRST(&ctx->bh_slice_list))) { + QEMUBH *bh; + unsigned flags; + + bh = aio_bh_dequeue(&s->bh_list, &flags); + if (!bh) { + QSIMPLEQ_REMOVE_HEAD(&ctx->bh_slice_list, next); + continue; + } + + if ((flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { /* Idle BHs don't count as progress */ - if (!bh->idle) { + if (!(flags & BH_IDLE)) { ret = 1; } - bh->idle = 0; aio_bh_call(bh); } - if (bh->deleted) { - deleted = true; + if (flags & (BH_DELETED | BH_ONESHOT)) { + g_free(bh); } } - /* remove deleted bhs */ - if (!deleted) { - return ret; - } - - if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { - bhp = &ctx->first_bh; - while (*bhp) { - bh = *bhp; - if (bh->deleted && !bh->scheduled) { - *bhp = bh->next; - g_free(bh); - } else { - bhp = &bh->next; - } - } - qemu_lockcnt_inc_and_unlock(&ctx->list_lock); - } return ret; } void qemu_bh_schedule_idle(QEMUBH *bh) { - bh->idle = 1; - /* Make sure that idle & any writes needed by the callback are done - * before the locations are read in the aio_bh_poll. - */ - atomic_mb_set(&bh->scheduled, 1); + aio_bh_enqueue(bh, BH_SCHEDULED | BH_IDLE); } void qemu_bh_schedule(QEMUBH *bh) { - AioContext *ctx; - - ctx = bh->ctx; - bh->idle = 0; - /* The memory barrier implicit in atomic_xchg makes sure that: - * 1. idle & any writes needed by the callback are done before the - * locations are read in the aio_bh_poll. - * 2. ctx is loaded before scheduled is set and the callback has a chance - * to execute. - */ - if (atomic_xchg(&bh->scheduled, 1) == 0) { - aio_notify(ctx); - } + aio_bh_enqueue(bh, BH_SCHEDULED); } - /* This func is async. */ void qemu_bh_cancel(QEMUBH *bh) { - atomic_mb_set(&bh->scheduled, 0); + atomic_and(&bh->flags, ~BH_SCHEDULED); } /* This func is async.The bottom half will do the delete action at the finial @@ -181,21 +193,16 @@ void qemu_bh_cancel(QEMUBH *bh) */ void qemu_bh_delete(QEMUBH *bh) { - bh->scheduled = 0; - bh->deleted = 1; + aio_bh_enqueue(bh, BH_DELETED); } -int64_t -aio_compute_timeout(AioContext *ctx) +static int64_t aio_compute_bh_timeout(BHList *head, int timeout) { - int64_t deadline; - int timeout = -1; QEMUBH *bh; - for (bh = atomic_rcu_read(&ctx->first_bh); bh; - bh = atomic_rcu_read(&bh->next)) { - if (bh->scheduled) { - if (bh->idle) { + QSLIST_FOREACH_RCU(bh, head, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { + if (bh->flags & BH_IDLE) { /* idle bottom halves will be polled at least * every 10ms */ timeout = 10000000; @@ -207,6 +214,28 @@ aio_compute_timeout(AioContext *ctx) } } + return timeout; +} + +int64_t +aio_compute_timeout(AioContext *ctx) +{ + BHListSlice *s; + int64_t deadline; + int timeout = -1; + + timeout = aio_compute_bh_timeout(&ctx->bh_list, timeout); + if (timeout == 0) { + return 0; + } + + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { + timeout = aio_compute_bh_timeout(&s->bh_list, timeout); + if (timeout == 0) { + return 0; + } + } + deadline = timerlistgroup_deadline_ns(&ctx->tlg); if (deadline == 0) { return 0; @@ -237,15 +266,24 @@ aio_ctx_check(GSource *source) { AioContext *ctx = (AioContext *) source; QEMUBH *bh; + BHListSlice *s; atomic_and(&ctx->notify_me, ~1); aio_notify_accept(ctx); - for (bh = ctx->first_bh; bh; bh = bh->next) { - if (bh->scheduled) { + QSLIST_FOREACH_RCU(bh, &ctx->bh_list, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { return true; } } + + QSIMPLEQ_FOREACH(s, &ctx->bh_slice_list, next) { + QSLIST_FOREACH_RCU(bh, &s->bh_list, next) { + if ((bh->flags & (BH_SCHEDULED | BH_DELETED)) == BH_SCHEDULED) { + return true; + } + } + } return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0); } @@ -265,6 +303,8 @@ static void aio_ctx_finalize(GSource *source) { AioContext *ctx = (AioContext *) source; + QEMUBH *bh; + unsigned flags; thread_pool_free(ctx->thread_pool); @@ -287,18 +327,15 @@ aio_ctx_finalize(GSource *source) assert(QSLIST_EMPTY(&ctx->scheduled_coroutines)); qemu_bh_delete(ctx->co_schedule_bh); - qemu_lockcnt_lock(&ctx->list_lock); - assert(!qemu_lockcnt_count(&ctx->list_lock)); - while (ctx->first_bh) { - QEMUBH *next = ctx->first_bh->next; + /* There must be no aio_bh_poll() calls going on */ + assert(QSIMPLEQ_EMPTY(&ctx->bh_slice_list)); + while ((bh = aio_bh_dequeue(&ctx->bh_list, &flags))) { /* qemu_bh_delete() must have been called on BHs in this AioContext */ - assert(ctx->first_bh->deleted); + assert(flags & BH_DELETED); - g_free(ctx->first_bh); - ctx->first_bh = next; + g_free(bh); } - qemu_lockcnt_unlock(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL); event_notifier_cleanup(&ctx->notifier); @@ -445,6 +482,8 @@ AioContext *aio_context_new(Error **errp) AioContext *ctx; ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext)); + QSLIST_INIT(&ctx->bh_list); + QSIMPLEQ_INIT(&ctx->bh_slice_list); aio_context_setup(ctx); ret = event_notifier_init(&ctx->notifier, false);