@@ -51,12 +51,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
.cb = cb,
.opaque = opaque,
};
- qemu_mutex_lock(&ctx->list_lock);
+ qemu_lockcnt_lock(&ctx->list_lock);
bh->next = ctx->first_bh;
/* Make sure that the members are ready before putting bh into list */
smp_wmb();
ctx->first_bh = bh;
- qemu_mutex_unlock(&ctx->list_lock);
+ qemu_lockcnt_unlock(&ctx->list_lock);
return bh;
}
@@ -71,13 +71,11 @@ int aio_bh_poll(AioContext *ctx)
QEMUBH *bh, **bhp, *next;
int ret;
- ctx->walking_bh++;
+ qemu_lockcnt_inc(&ctx->list_lock);
ret = 0;
- for (bh = ctx->first_bh; bh; bh = next) {
- /* Make sure that fetching bh happens before accessing its members */
- smp_read_barrier_depends();
- next = bh->next;
+ for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
+ next = atomic_rcu_read(&bh->next);
/* The atomic_xchg is paired with the one in qemu_bh_schedule. The
* implicit memory barrier ensures that the callback sees all writes
* done by the scheduling thread. It also ensures that the scheduling
@@ -94,11 +92,8 @@ int aio_bh_poll(AioContext *ctx)
}
}
- ctx->walking_bh--;
-
/* remove deleted bhs */
- if (!ctx->walking_bh) {
- qemu_mutex_lock(&ctx->list_lock);
+ if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
bhp = &ctx->first_bh;
while (*bhp) {
bh = *bhp;
@@ -109,7 +104,7 @@ int aio_bh_poll(AioContext *ctx)
bhp = &bh->next;
}
}
- qemu_mutex_unlock(&ctx->list_lock);
+ qemu_lockcnt_unlock(&ctx->list_lock);
}
return ret;
@@ -165,7 +160,8 @@ aio_compute_timeout(AioContext *ctx)
int timeout = -1;
QEMUBH *bh;
- for (bh = ctx->first_bh; bh; bh = bh->next) {
+ for (bh = atomic_rcu_read(&ctx->first_bh); bh;
+ bh = atomic_rcu_read(&bh->next)) {
if (!bh->deleted && bh->scheduled) {
if (bh->idle) {
/* idle bottom halves will be polled at least
@@ -240,7 +236,8 @@ aio_ctx_finalize(GSource *source)
thread_pool_free(ctx->thread_pool);
- qemu_mutex_lock(&ctx->list_lock);
+ qemu_lockcnt_lock(&ctx->list_lock);
+ assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
QEMUBH *next = ctx->first_bh->next;
@@ -250,12 +247,12 @@ aio_ctx_finalize(GSource *source)
g_free(ctx->first_bh);
ctx->first_bh = next;
}
- qemu_mutex_unlock(&ctx->list_lock);
+ qemu_lockcnt_unlock(&ctx->list_lock);
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL);
event_notifier_cleanup(&ctx->notifier);
qemu_rec_mutex_destroy(&ctx->lock);
- qemu_mutex_destroy(&ctx->list_lock);
+ qemu_lockcnt_destroy(&ctx->list_lock);
timerlistgroup_deinit(&ctx->tlg);
}
@@ -551,7 +548,7 @@ AioContext *aio_context_new(Error **errp)
(EventNotifierHandler *)
event_notifier_dummy_cb);
ctx->thread_pool = NULL;
- qemu_mutex_init(&ctx->list_lock);
+ qemu_lockcnt_init(&ctx->list_lock);
qemu_rec_mutex_init(&ctx->lock);
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
@@ -87,17 +87,15 @@ struct AioContext {
*/
uint32_t notify_me;
- /* lock to protect between bh's adders and deleter */
- QemuMutex list_lock;
+ /* A lock to protect between bh's adders and deleter, and to ensure
+ * that no callbacks are removed while we're walking and dispatching
+ * them.
+ */
+ QemuLockCnt list_lock;
/* Anchor of the list of Bottom Halves belonging to the context */
struct QEMUBH *first_bh;
- /* A simple lock used to protect the first_bh list, and ensure that
- * no callbacks are removed while we're walking and dispatching callbacks.
- */
- int walking_bh;
-
/* Used by aio_notify.
*
* "notified" is used to avoid expensive event_notifier_test_and_clear
This will make it possible to walk the list of bottom halves without holding the AioContext lock---and in turn to call bottom half handlers without holding the lock. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> --- async.c | 31 ++++++++++++++----------------- include/block/aio.h | 12 +++++------- 2 files changed, 19 insertions(+), 24 deletions(-)