From patchwork Tue Feb 9 11:46:09 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 8260701 Return-Path: X-Original-To: patchwork-qemu-devel@patchwork.kernel.org Delivered-To: patchwork-parsemail@patchwork1.web.kernel.org Received: from mail.kernel.org (mail.kernel.org [198.145.29.136]) by patchwork1.web.kernel.org (Postfix) with ESMTP id A497B9F319 for ; Tue, 9 Feb 2016 12:03:38 +0000 (UTC) Received: from mail.kernel.org (localhost [127.0.0.1]) by mail.kernel.org (Postfix) with ESMTP id 6F9962025A for ; Tue, 9 Feb 2016 12:03:37 +0000 (UTC) Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.kernel.org (Postfix) with ESMTPS id 097A82022D for ; Tue, 9 Feb 2016 12:03:36 +0000 (UTC) Received: from localhost ([::1]:54948 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aT715-0002lg-Ez for patchwork-qemu-devel@patchwork.kernel.org; Tue, 09 Feb 2016 07:03:35 -0500 Received: from eggs.gnu.org ([2001:4830:134:3::10]:37729) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aT6kh-0006f8-Gf for qemu-devel@nongnu.org; Tue, 09 Feb 2016 06:46:41 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1aT6kf-0005Ie-BI for qemu-devel@nongnu.org; Tue, 09 Feb 2016 06:46:39 -0500 Received: from mx1.redhat.com ([209.132.183.28]:56973) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1aT6ke-0005IY-St for qemu-devel@nongnu.org; Tue, 09 Feb 2016 06:46:37 -0500 Received: from int-mx09.intmail.prod.int.phx2.redhat.com (int-mx09.intmail.prod.int.phx2.redhat.com [10.5.11.22]) by mx1.redhat.com (Postfix) with ESMTPS id 80FC07AE88 for ; Tue, 9 Feb 2016 11:46:36 +0000 (UTC) Received: from donizetti.redhat.com (ovpn-112-47.ams2.redhat.com [10.36.112.47]) by int-mx09.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id u19BkELt002965; Tue, 9 Feb 2016 06:46:35 -0500 From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Tue, 9 Feb 2016 12:46:09 +0100 Message-Id: <1455018374-4706-12-git-send-email-pbonzini@redhat.com> In-Reply-To: <1455018374-4706-1-git-send-email-pbonzini@redhat.com> References: <1455018374-4706-1-git-send-email-pbonzini@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.22 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Cc: stefanha@redhat.com Subject: [Qemu-devel] [PATCH 11/16] qemu-thread: optimize QemuLockCnt with futexes on Linux X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org X-Spam-Status: No, score=-6.9 required=5.0 tests=BAYES_00, RCVD_IN_DNSWL_HI, UNPARSEABLE_RELAY autolearn=unavailable version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on mail.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP This is complex, but I think it is reasonably documented in the source. Signed-off-by: Paolo Bonzini --- docs/lockcnt.txt | 9 +- include/qemu/futex.h | 36 ++++++ include/qemu/thread.h | 3 + trace-events | 10 ++ util/lockcnt.c | 282 +++++++++++++++++++++++++++++++++++++++++++++++ util/qemu-thread-posix.c | 25 +---- 6 files changed, 336 insertions(+), 29 deletions(-) create mode 100644 include/qemu/futex.h diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt index fc5d240..594764b 100644 --- a/docs/lockcnt.txt +++ b/docs/lockcnt.txt @@ -142,12 +142,11 @@ can also be more efficient in two ways: - it avoids taking the lock for many operations (for example incrementing the counter while it is non-zero); -- on some platforms, one could implement QemuLockCnt to hold the - lock and the mutex in a single word, making it no more expensive +- on some platforms, one can implement QemuLockCnt to hold the lock + and the mutex in a single word, making the fast path no more expensive than simply managing a counter using atomic operations (see - docs/atomics.txt). This is not implemented yet, but can be - very helpful if concurrent access to the data structure is - expected to be rare. + docs/atomics.txt). This can be very helpful if concurrent access to + the data structure is expected to be rare. Using the same mutex for frees and writes can still incur some small diff --git a/include/qemu/futex.h b/include/qemu/futex.h new file mode 100644 index 0000000..c3d1089 --- /dev/null +++ b/include/qemu/futex.h @@ -0,0 +1,36 @@ +/* + * Wrappers around Linux futex syscall + * + * Copyright Red Hat, Inc. 2015 + * + * Author: + * Paolo Bonzini + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include +#include + +#define futex(...) syscall(__NR_futex, __VA_ARGS__) + +static inline void futex_wake(void *f, int n) +{ + futex(f, FUTEX_WAKE, n, NULL, NULL, 0); +} + +static inline void futex_wait(void *f, unsigned val) +{ + while (futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + return; + case EINTR: + break; /* get out of switch and retry */ + default: + abort(); + } + } +} diff --git a/include/qemu/thread.h b/include/qemu/thread.h index 9fadca4..22d92d2 100644 --- a/include/qemu/thread.h +++ b/include/qemu/thread.h @@ -1,6 +1,7 @@ #ifndef __QEMU_THREAD_H #define __QEMU_THREAD_H 1 +#include "config-host.h" #include #include @@ -67,7 +68,9 @@ void qemu_thread_atexit_add(struct Notifier *notifier); void qemu_thread_atexit_remove(struct Notifier *notifier); struct QemuLockCnt { +#ifndef CONFIG_LINUX QemuMutex mutex; +#endif unsigned count; }; diff --git a/trace-events b/trace-events index c9ac144..daa091e 100644 --- a/trace-events +++ b/trace-events @@ -1434,6 +1434,16 @@ hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long c hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 +# util/lockcnt.c +lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d" +lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded" +lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d" +lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded" +lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d" +lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d" +lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d" +lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter" + # target-s390x/mmu_helper.c get_skeys_nonzero(int rc) "SKEY: Call to get_skeys unexpectedly returned %d" set_skeys_nonzero(int rc) "SKEY: Call to set_skeys unexpectedly returned %d" diff --git a/util/lockcnt.c b/util/lockcnt.c index 78ed1e4..71e8f8f 100644 --- a/util/lockcnt.c +++ b/util/lockcnt.c @@ -9,7 +9,288 @@ #include "qemu/osdep.h" #include "qemu/thread.h" #include "qemu/atomic.h" +#include "trace.h" +#ifdef CONFIG_LINUX +#include "qemu/futex.h" + +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter. + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok, + * this is not the most relaxing citation I could make...). It is similar + * to mutex2 in the paper. + */ + +#define QEMU_LOCKCNT_STATE_MASK 3 +#define QEMU_LOCKCNT_STATE_FREE 0 +#define QEMU_LOCKCNT_STATE_LOCKED 1 +#define QEMU_LOCKCNT_STATE_WAITING 2 + +#define QEMU_LOCKCNT_COUNT_STEP 4 +#define QEMU_LOCKCNT_COUNT_SHIFT 2 + +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ +} + +/* *val is the current value of lockcnt->count. + * + * If the lock is free, try a cmpxchg from *val to new_if_free; return + * true and set *val to the old value found by the cmpxchg in + * lockcnt->count. + * + * If the lock is taken, wait for it to be released and return false + * *without trying again to take the lock*. Again, set *val to the + * new value of lockcnt->count. + * + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED + * if calling this function a second time after it has returned + * false. + */ +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val, + int new_if_free, bool *waited) +{ + /* Fast path for when the lock is free. */ + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) { + int expected = *val; + + trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free); + *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free); + if (*val == expected) { + trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free); + *val = new_if_free; + return true; + } + } + + /* The slow path moves from locked to waiting if necessary, then + * does a futex wait. Both steps can be repeated ad nauseam, + * only getting out of the loop if we can have another shot at the + * fast path. Once we can, get out to compute the new destination + * value for the fast path. + */ + while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) { + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) { + int expected = *val; + int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING; + + trace_lockcnt_futex_wait_prepare(lockcnt, expected, new); + *val = atomic_cmpxchg(&lockcnt->count, expected, new); + if (*val == expected) { + *val = new; + } + continue; + } + + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) { + *waited = true; + trace_lockcnt_futex_wait(lockcnt, *val); + futex_wait(&lockcnt->count, *val); + *val = atomic_read(&lockcnt->count); + trace_lockcnt_futex_wait_resume(lockcnt, *val); + continue; + } + + abort(); + } + return false; +} + +static void lockcnt_wake(QemuLockCnt *lockcnt) +{ + trace_lockcnt_futex_wake(lockcnt); + futex_wake(&lockcnt->count, 1); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + bool waited = false; + + for (;;) { + if (val >= QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP); + if (val == expected) { + break; + } + } else { + /* The fast path is (0, unlocked)->(1, unlocked). */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP, + &waited)) { + break; + } + } + } + + /* If we were woken by another thread, we should also wake one because + * we are effectively releasing the lock that was given to us. This is + * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING + * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and + * wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + for (;;) { + if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + int new = val - QEMU_LOCKCNT_COUNT_STEP; + val = atomic_cmpxchg(&lockcnt->count, val, new); + if (val == expected) { + break; + } + } + + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +/* If the counter is one, decrement it and return locked. Otherwise do + * nothing. + * + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) { + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int step = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + /* The third argument is only used if the low bits of val are 0 + * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired + * state. + */ + while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) { + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + step = QEMU_LOCKCNT_STATE_WAITING; + } + } +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = val & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT; +} +#else void qemu_lockcnt_init(QemuLockCnt *lockcnt) { qemu_mutex_init(&lockcnt->mutex); @@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) { return lockcnt->count; } +#endif diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index 1aec83f..b1e3cdb 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -11,10 +11,6 @@ * */ #include "qemu/osdep.h" -#ifdef __linux__ -#include -#include -#endif #include "qemu/thread.h" #include "qemu/atomic.h" #include "qemu/notify.h" @@ -293,26 +289,7 @@ void qemu_sem_wait(QemuSemaphore *sem) } #ifdef __linux__ -#define futex(...) syscall(__NR_futex, __VA_ARGS__) - -static inline void futex_wake(QemuEvent *ev, int n) -{ - futex(ev, FUTEX_WAKE, n, NULL, NULL, 0); -} - -static inline void futex_wait(QemuEvent *ev, unsigned val) -{ - while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { - switch (errno) { - case EWOULDBLOCK: - return; - case EINTR: - break; /* get out of switch and retry */ - default: - abort(); - } - } -} +#include "qemu/futex.h" #else static inline void futex_wake(QemuEvent *ev, int n) {