diff mbox series

[bpf-next,v1,11/22] rqspinlock: Add deadlock detection and recovery

Message ID 20250107140004.2732830-12-memxor@gmail.com (mailing list archive)
State Changes Requested
Delegated to: BPF
Headers show
Series Resilient Queued Spin Lock | expand

Checks

Context Check Description
bpf/vmtest-bpf-next-VM_Test-0 success Logs for Lint
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-2 success Logs for Unittests
bpf/vmtest-bpf-next-VM_Test-3 success Logs for Validate matrix.py
bpf/vmtest-bpf-next-VM_Test-5 success Logs for aarch64-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-4 success Logs for aarch64-gcc / build / build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for aarch64-gcc / test (test_verifier, false, 360) / test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-6 success Logs for aarch64-gcc / test (test_maps, false, 360) / test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-10 success Logs for aarch64-gcc / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-11 success Logs for aarch64-gcc / veristat-meta
bpf/vmtest-bpf-next-VM_Test-12 fail Logs for s390x-gcc / build / build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-13 success Logs for s390x-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-14 success Logs for s390x-gcc / test
bpf/vmtest-bpf-next-VM_Test-15 success Logs for s390x-gcc / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-16 success Logs for s390x-gcc / veristat-meta
bpf/vmtest-bpf-next-VM_Test-17 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-18 success Logs for x86_64-gcc / build / build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-19 success Logs for x86_64-gcc / build-release
bpf/vmtest-bpf-next-VM_Test-20 success Logs for x86_64-gcc / test (test_maps, false, 360) / test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-23 success Logs for x86_64-gcc / test (test_progs_no_alu32_parallel, true, 30) / test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-24 success Logs for x86_64-gcc / test (test_progs_parallel, true, 30) / test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-25 success Logs for x86_64-gcc / test (test_verifier, false, 360) / test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-26 success Logs for x86_64-gcc / veristat-kernel / x86_64-gcc veristat_kernel
bpf/vmtest-bpf-next-VM_Test-27 success Logs for x86_64-gcc / veristat-meta / x86_64-gcc veristat_meta
bpf/vmtest-bpf-next-VM_Test-28 success Logs for x86_64-llvm-17 / build / build for x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-29 success Logs for x86_64-llvm-17 / build-release / build for x86_64 with llvm-17-O2
bpf/vmtest-bpf-next-VM_Test-30 success Logs for x86_64-llvm-17 / test (test_maps, false, 360) / test_maps on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-33 success Logs for x86_64-llvm-17 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-34 success Logs for x86_64-llvm-17 / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-35 success Logs for x86_64-llvm-17 / veristat-meta
bpf/vmtest-bpf-next-VM_Test-36 success Logs for x86_64-llvm-18 / build / build for x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-37 success Logs for x86_64-llvm-18 / build-release / build for x86_64 with llvm-18-O2
bpf/vmtest-bpf-next-VM_Test-38 success Logs for x86_64-llvm-18 / test (test_maps, false, 360) / test_maps on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-42 success Logs for x86_64-llvm-18 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-43 success Logs for x86_64-llvm-18 / veristat-kernel
bpf/vmtest-bpf-next-VM_Test-44 success Logs for x86_64-llvm-18 / veristat-meta
bpf/vmtest-bpf-next-PR fail PR summary
bpf/vmtest-bpf-next-VM_Test-7 fail Logs for aarch64-gcc / test (test_progs, false, 360) / test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-8 fail Logs for aarch64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-21 fail Logs for x86_64-gcc / test (test_progs, false, 360) / test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-22 fail Logs for x86_64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-31 fail Logs for x86_64-llvm-17 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-32 fail Logs for x86_64-llvm-17 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-17
bpf/vmtest-bpf-next-VM_Test-39 fail Logs for x86_64-llvm-18 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-40 fail Logs for x86_64-llvm-18 / test (test_progs_cpuv4, false, 360) / test_progs_cpuv4 on x86_64 with llvm-18
bpf/vmtest-bpf-next-VM_Test-41 fail Logs for x86_64-llvm-18 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-18
netdev/series_format fail Series longer than 15 patches
netdev/tree_selection success Clearly marked for bpf-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1 this patch: 1
netdev/build_tools success Errors and warnings before: 0 (+0) this patch: 0 (+0)
netdev/cc_maintainers warning 6 maintainers not CCed: longman@redhat.com will@kernel.org linux-arch@vger.kernel.org arnd@arndb.de boqun.feng@gmail.com mingo@redhat.com
netdev/build_clang success Errors and warnings before: 1 this patch: 1
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1 this patch: 1
netdev/checkpatch warning CHECK: extern prototypes should be avoided in .h files WARNING: line length of 105 exceeds 80 columns WARNING: line length of 81 exceeds 80 columns WARNING: line length of 82 exceeds 80 columns WARNING: line length of 83 exceeds 80 columns WARNING: line length of 84 exceeds 80 columns WARNING: line length of 85 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns WARNING: line length of 87 exceeds 80 columns WARNING: line length of 88 exceeds 80 columns WARNING: line length of 91 exceeds 80 columns WARNING: line length of 93 exceeds 80 columns WARNING: memory barrier without comment
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 2 this patch: 2
netdev/source_inline success Was 0 now: 0

Commit Message

Kumar Kartikeya Dwivedi Jan. 7, 2025, 1:59 p.m. UTC
While the timeout logic provides guarantees for the waiter's forward
progress, the time until a stalling waiter unblocks can still be long.
The default timeout of 1/2 sec can be excessively long for some use
cases.  Additionally, custom timeouts may exacerbate recovery time.

Introduce logic to detect common cases of deadlocks and perform quicker
recovery. This is done by dividing the time from entry into the locking
slow path until the timeout into intervals of 1 ms. Then, after each
interval elapses, deadlock detection is performed, while also polling
the lock word to ensure we can quickly break out of the detection logic
and proceed with lock acquisition.

A 'held_locks' table is maintained per-CPU where the entry at the bottom
denotes a lock being waited for or already taken. Entries coming before
it denote locks that are already held. The current CPU's table can thus
be looked at to detect AA deadlocks. The tables from other CPUs can be
looked at to discover ABBA situations. Finally, when a matching entry
for the lock being taken on the current CPU is found on some other CPU,
a deadlock situation is detected. This function can take a long time,
therefore the lock word is constantly polled in each loop iteration to
ensure we can preempt detection and proceed with lock acquisition, using
the is_lock_released check.

We set 'spin' member of rqspinlock_timeout struct to 0 to trigger
deadlock checks immediately to perform faster recovery.

Note: Extending lock word size by 4 bytes to record owner CPU can allow
faster detection for ABBA. It is typically the owner which participates
in a ABBA situation. However, to keep compatibility with existing lock
words in the kernel (struct qspinlock), and given deadlocks are a rare
event triggered by bugs, we choose to favor compatibility over faster
detection.

Signed-off-by: Kumar Kartikeya Dwivedi <memxor@gmail.com>
---
 include/asm-generic/rqspinlock.h |  56 +++++++++-
 kernel/locking/rqspinlock.c      | 178 ++++++++++++++++++++++++++++---
 2 files changed, 220 insertions(+), 14 deletions(-)

Comments

Waiman Long Jan. 8, 2025, 4:06 p.m. UTC | #1
On 1/7/25 8:59 AM, Kumar Kartikeya Dwivedi wrote:
> While the timeout logic provides guarantees for the waiter's forward
> progress, the time until a stalling waiter unblocks can still be long.
> The default timeout of 1/2 sec can be excessively long for some use
> cases.  Additionally, custom timeouts may exacerbate recovery time.
>
> Introduce logic to detect common cases of deadlocks and perform quicker
> recovery. This is done by dividing the time from entry into the locking
> slow path until the timeout into intervals of 1 ms. Then, after each
> interval elapses, deadlock detection is performed, while also polling
> the lock word to ensure we can quickly break out of the detection logic
> and proceed with lock acquisition.
>
> A 'held_locks' table is maintained per-CPU where the entry at the bottom
> denotes a lock being waited for or already taken. Entries coming before
> it denote locks that are already held. The current CPU's table can thus
> be looked at to detect AA deadlocks. The tables from other CPUs can be
> looked at to discover ABBA situations. Finally, when a matching entry
> for the lock being taken on the current CPU is found on some other CPU,
> a deadlock situation is detected. This function can take a long time,
> therefore the lock word is constantly polled in each loop iteration to
> ensure we can preempt detection and proceed with lock acquisition, using
> the is_lock_released check.
>
> We set 'spin' member of rqspinlock_timeout struct to 0 to trigger
> deadlock checks immediately to perform faster recovery.
>
> Note: Extending lock word size by 4 bytes to record owner CPU can allow
> faster detection for ABBA. It is typically the owner which participates
> in a ABBA situation. However, to keep compatibility with existing lock
> words in the kernel (struct qspinlock), and given deadlocks are a rare
> event triggered by bugs, we choose to favor compatibility over faster
> detection.
>
> Signed-off-by: Kumar Kartikeya Dwivedi <memxor@gmail.com>
> ---
>   include/asm-generic/rqspinlock.h |  56 +++++++++-
>   kernel/locking/rqspinlock.c      | 178 ++++++++++++++++++++++++++++---
>   2 files changed, 220 insertions(+), 14 deletions(-)
>
> diff --git a/include/asm-generic/rqspinlock.h b/include/asm-generic/rqspinlock.h
> index 5c996a82e75f..c7e33ccc57a6 100644
> --- a/include/asm-generic/rqspinlock.h
> +++ b/include/asm-generic/rqspinlock.h
> @@ -11,14 +11,68 @@
>   
>   #include <linux/types.h>
>   #include <vdso/time64.h>
> +#include <linux/percpu.h>
>   
>   struct qspinlock;
>   
> +extern int resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val, u64 timeout);
> +
>   /*
>    * Default timeout for waiting loops is 0.5 seconds
>    */
>   #define RES_DEF_TIMEOUT (NSEC_PER_SEC / 2)
>   
> -extern int resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val, u64 timeout);
> +#define RES_NR_HELD 32
> +
> +struct rqspinlock_held {
> +	int cnt;
> +	void *locks[RES_NR_HELD];
> +};
> +
> +DECLARE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
> +
> +static __always_inline void grab_held_lock_entry(void *lock)
> +{
> +	int cnt = this_cpu_inc_return(rqspinlock_held_locks.cnt);
> +
> +	if (unlikely(cnt > RES_NR_HELD)) {
> +		/* Still keep the inc so we decrement later. */
> +		return;
> +	}
> +
> +	/*
> +	 * Implied compiler barrier in per-CPU operations; otherwise we can have
> +	 * the compiler reorder inc with write to table, allowing interrupts to
> +	 * overwrite and erase our write to the table (as on interrupt exit it
> +	 * will be reset to NULL).
> +	 */
> +	this_cpu_write(rqspinlock_held_locks.locks[cnt - 1], lock);
> +}
> +
> +/*
> + * It is possible to run into misdetection scenarios of AA deadlocks on the same
> + * CPU, and missed ABBA deadlocks on remote CPUs when this function pops entries
> + * out of order (due to lock A, lock B, unlock A, unlock B) pattern. The correct
> + * logic to preserve right entries in the table would be to walk the array of
> + * held locks and swap and clear out-of-order entries, but that's too
> + * complicated and we don't have a compelling use case for out of order unlocking.
Maybe we can pass in the lock and print a warning if out-of-order unlock 
is being done.
> + *
> + * Therefore, we simply don't support such cases and keep the logic simple here.
> + */
> +static __always_inline void release_held_lock_entry(void)
> +{
> +	struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
> +
> +	if (unlikely(rqh->cnt > RES_NR_HELD))
> +		goto dec;
> +	smp_store_release(&rqh->locks[rqh->cnt - 1], NULL);
> +	/*
> +	 * Overwrite of NULL should appear before our decrement of the count to
> +	 * other CPUs, otherwise we have the issue of a stale non-NULL entry being
> +	 * visible in the array, leading to misdetection during deadlock detection.
> +	 */
> +dec:
> +	this_cpu_dec(rqspinlock_held_locks.cnt);
AFAIU, smp_store_release() only guarantees memory ordering before it, 
not after. That shouldn't be a problem if the decrement is observed 
before clearing the entry as that non-NULL entry won't be checked anyway.
> +}
>   
>   #endif /* __ASM_GENERIC_RQSPINLOCK_H */
> diff --git a/kernel/locking/rqspinlock.c b/kernel/locking/rqspinlock.c
> index b63f92bd43b1..b7c86127d288 100644
> --- a/kernel/locking/rqspinlock.c
> +++ b/kernel/locking/rqspinlock.c
> @@ -30,6 +30,7 @@
>    * Include queued spinlock definitions and statistics code
>    */
>   #include "qspinlock.h"
> +#include "rqspinlock.h"
>   #include "qspinlock_stat.h"
>   
>   /*
> @@ -74,16 +75,141 @@
>   struct rqspinlock_timeout {
>   	u64 timeout_end;
>   	u64 duration;
> +	u64 cur;
>   	u16 spin;
>   };
>   
>   #define RES_TIMEOUT_VAL	2
>   
> -static noinline int check_timeout(struct rqspinlock_timeout *ts)
> +DEFINE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
> +
> +static bool is_lock_released(struct qspinlock *lock, u32 mask, struct rqspinlock_timeout *ts)
> +{
> +	if (!(atomic_read_acquire(&lock->val) & (mask)))
> +		return true;
> +	return false;
> +}
> +
> +static noinline int check_deadlock_AA(struct qspinlock *lock, u32 mask,
> +				      struct rqspinlock_timeout *ts)
> +{
> +	struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
> +	int cnt = min(RES_NR_HELD, rqh->cnt);
> +
> +	/*
> +	 * Return an error if we hold the lock we are attempting to acquire.
> +	 * We'll iterate over max 32 locks; no need to do is_lock_released.
> +	 */
> +	for (int i = 0; i < cnt - 1; i++) {
> +		if (rqh->locks[i] == lock)
> +			return -EDEADLK;
> +	}
> +	return 0;
> +}
> +
> +static noinline int check_deadlock_ABBA(struct qspinlock *lock, u32 mask,
> +					struct rqspinlock_timeout *ts)
> +{

I think you should note that the ABBA check here is not exhaustive. It 
is just the most common case and there are corner cases that will be missed.

Cheers,
Longman
diff mbox series

Patch

diff --git a/include/asm-generic/rqspinlock.h b/include/asm-generic/rqspinlock.h
index 5c996a82e75f..c7e33ccc57a6 100644
--- a/include/asm-generic/rqspinlock.h
+++ b/include/asm-generic/rqspinlock.h
@@ -11,14 +11,68 @@ 
 
 #include <linux/types.h>
 #include <vdso/time64.h>
+#include <linux/percpu.h>
 
 struct qspinlock;
 
+extern int resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val, u64 timeout);
+
 /*
  * Default timeout for waiting loops is 0.5 seconds
  */
 #define RES_DEF_TIMEOUT (NSEC_PER_SEC / 2)
 
-extern int resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 val, u64 timeout);
+#define RES_NR_HELD 32
+
+struct rqspinlock_held {
+	int cnt;
+	void *locks[RES_NR_HELD];
+};
+
+DECLARE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
+
+static __always_inline void grab_held_lock_entry(void *lock)
+{
+	int cnt = this_cpu_inc_return(rqspinlock_held_locks.cnt);
+
+	if (unlikely(cnt > RES_NR_HELD)) {
+		/* Still keep the inc so we decrement later. */
+		return;
+	}
+
+	/*
+	 * Implied compiler barrier in per-CPU operations; otherwise we can have
+	 * the compiler reorder inc with write to table, allowing interrupts to
+	 * overwrite and erase our write to the table (as on interrupt exit it
+	 * will be reset to NULL).
+	 */
+	this_cpu_write(rqspinlock_held_locks.locks[cnt - 1], lock);
+}
+
+/*
+ * It is possible to run into misdetection scenarios of AA deadlocks on the same
+ * CPU, and missed ABBA deadlocks on remote CPUs when this function pops entries
+ * out of order (due to lock A, lock B, unlock A, unlock B) pattern. The correct
+ * logic to preserve right entries in the table would be to walk the array of
+ * held locks and swap and clear out-of-order entries, but that's too
+ * complicated and we don't have a compelling use case for out of order unlocking.
+ *
+ * Therefore, we simply don't support such cases and keep the logic simple here.
+ */
+static __always_inline void release_held_lock_entry(void)
+{
+	struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+
+	if (unlikely(rqh->cnt > RES_NR_HELD))
+		goto dec;
+	smp_store_release(&rqh->locks[rqh->cnt - 1], NULL);
+	/*
+	 * Overwrite of NULL should appear before our decrement of the count to
+	 * other CPUs, otherwise we have the issue of a stale non-NULL entry being
+	 * visible in the array, leading to misdetection during deadlock detection.
+	 */
+dec:
+	this_cpu_dec(rqspinlock_held_locks.cnt);
+}
 
 #endif /* __ASM_GENERIC_RQSPINLOCK_H */
diff --git a/kernel/locking/rqspinlock.c b/kernel/locking/rqspinlock.c
index b63f92bd43b1..b7c86127d288 100644
--- a/kernel/locking/rqspinlock.c
+++ b/kernel/locking/rqspinlock.c
@@ -30,6 +30,7 @@ 
  * Include queued spinlock definitions and statistics code
  */
 #include "qspinlock.h"
+#include "rqspinlock.h"
 #include "qspinlock_stat.h"
 
 /*
@@ -74,16 +75,141 @@ 
 struct rqspinlock_timeout {
 	u64 timeout_end;
 	u64 duration;
+	u64 cur;
 	u16 spin;
 };
 
 #define RES_TIMEOUT_VAL	2
 
-static noinline int check_timeout(struct rqspinlock_timeout *ts)
+DEFINE_PER_CPU_ALIGNED(struct rqspinlock_held, rqspinlock_held_locks);
+
+static bool is_lock_released(struct qspinlock *lock, u32 mask, struct rqspinlock_timeout *ts)
+{
+	if (!(atomic_read_acquire(&lock->val) & (mask)))
+		return true;
+	return false;
+}
+
+static noinline int check_deadlock_AA(struct qspinlock *lock, u32 mask,
+				      struct rqspinlock_timeout *ts)
+{
+	struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+	int cnt = min(RES_NR_HELD, rqh->cnt);
+
+	/*
+	 * Return an error if we hold the lock we are attempting to acquire.
+	 * We'll iterate over max 32 locks; no need to do is_lock_released.
+	 */
+	for (int i = 0; i < cnt - 1; i++) {
+		if (rqh->locks[i] == lock)
+			return -EDEADLK;
+	}
+	return 0;
+}
+
+static noinline int check_deadlock_ABBA(struct qspinlock *lock, u32 mask,
+					struct rqspinlock_timeout *ts)
+{
+	struct rqspinlock_held *rqh = this_cpu_ptr(&rqspinlock_held_locks);
+	int rqh_cnt = min(RES_NR_HELD, rqh->cnt);
+	void *remote_lock;
+	int cpu;
+
+	/*
+	 * Find the CPU holding the lock that we want to acquire. If there is a
+	 * deadlock scenario, we will read a stable set on the remote CPU and
+	 * find the target. This would be a constant time operation instead of
+	 * O(NR_CPUS) if we could determine the owning CPU from a lock value, but
+	 * that requires increasing the size of the lock word.
+	 */
+	for_each_possible_cpu(cpu) {
+		struct rqspinlock_held *rqh_cpu = per_cpu_ptr(&rqspinlock_held_locks, cpu);
+		int real_cnt = READ_ONCE(rqh_cpu->cnt);
+		int cnt = min(RES_NR_HELD, real_cnt);
+
+		/*
+		 * Let's ensure to break out of this loop if the lock is available for
+		 * us to potentially acquire.
+		 */
+		if (is_lock_released(lock, mask, ts))
+			return 0;
+
+		/*
+		 * Skip ourselves, and CPUs whose count is less than 2, as they need at
+		 * least one held lock and one acquisition attempt (reflected as top
+		 * most entry) to participate in an ABBA deadlock.
+		 *
+		 * If cnt is more than RES_NR_HELD, it means the current lock being
+		 * acquired won't appear in the table, and other locks in the table are
+		 * already held, so we can't determine ABBA.
+		 */
+		if (cpu == smp_processor_id() || real_cnt < 2 || real_cnt > RES_NR_HELD)
+			continue;
+
+		/*
+		 * Obtain the entry at the top, this corresponds to the lock the
+		 * remote CPU is attempting to acquire in a deadlock situation,
+		 * and would be one of the locks we hold on the current CPU.
+		 */
+		remote_lock = READ_ONCE(rqh_cpu->locks[cnt - 1]);
+		/*
+		 * If it is NULL, we've raced and cannot determine a deadlock
+		 * conclusively, skip this CPU.
+		 */
+		if (!remote_lock)
+			continue;
+		/*
+		 * Find if the lock we're attempting to acquire is held by this CPU.
+		 * Don't consider the topmost entry, as that must be the latest lock
+		 * being held or acquired.  For a deadlock, the target CPU must also
+		 * attempt to acquire a lock we hold, so for this search only 'cnt - 1'
+		 * entries are important.
+		 */
+		for (int i = 0; i < cnt - 1; i++) {
+			if (READ_ONCE(rqh_cpu->locks[i]) != lock)
+				continue;
+			/*
+			 * We found our lock as held on the remote CPU.  Is the
+			 * acquisition attempt on the remote CPU for a lock held
+			 * by us?  If so, we have a deadlock situation, and need
+			 * to recover.
+			 */
+			for (int i = 0; i < rqh_cnt - 1; i++) {
+				if (rqh->locks[i] == remote_lock)
+					return -EDEADLK;
+			}
+			/*
+			 * Inconclusive; retry again later.
+			 */
+			return 0;
+		}
+	}
+	return 0;
+}
+
+static noinline int check_deadlock(struct qspinlock *lock, u32 mask,
+				   struct rqspinlock_timeout *ts)
+{
+	int ret;
+
+	ret = check_deadlock_AA(lock, mask, ts);
+	if (ret)
+		return ret;
+	ret = check_deadlock_ABBA(lock, mask, ts);
+	if (ret)
+		return ret;
+
+	return 0;
+}
+
+static noinline int check_timeout(struct qspinlock *lock, u32 mask,
+				  struct rqspinlock_timeout *ts)
 {
 	u64 time = ktime_get_mono_fast_ns();
+	u64 prev = ts->cur;
 
 	if (!ts->timeout_end) {
+		ts->cur = time;
 		ts->timeout_end = time + ts->duration;
 		return 0;
 	}
@@ -91,20 +217,30 @@  static noinline int check_timeout(struct rqspinlock_timeout *ts)
 	if (time > ts->timeout_end)
 		return -ETIMEDOUT;
 
+	/*
+	 * A millisecond interval passed from last time? Trigger deadlock
+	 * checks.
+	 */
+	if (prev + NSEC_PER_MSEC < time) {
+		ts->cur = time;
+		return check_deadlock(lock, mask, ts);
+	}
+
 	return 0;
 }
 
-#define RES_CHECK_TIMEOUT(ts, ret)                    \
-	({                                            \
-		if (!((ts).spin++ & 0xffff))          \
-			(ret) = check_timeout(&(ts)); \
-		(ret);                                \
+#define RES_CHECK_TIMEOUT(ts, ret, mask)                              \
+	({                                                            \
+		if (!((ts).spin++ & 0xffff))                          \
+			(ret) = check_timeout((lock), (mask), &(ts)); \
+		(ret);                                                \
 	})
 
 /*
  * Initialize the 'duration' member with the chosen timeout.
+ * Set spin member to 0 to trigger AA/ABBA checks immediately.
  */
-#define RES_INIT_TIMEOUT(ts, _timeout) ({ (ts).spin = 1; (ts).duration = _timeout; })
+#define RES_INIT_TIMEOUT(ts, _timeout) ({ (ts).spin = 0; (ts).duration = _timeout; })
 
 /*
  * We only need to reset 'timeout_end', 'spin' will just wrap around as necessary.
@@ -192,6 +328,11 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 		goto queue;
 	}
 
+	/*
+	 * Grab an entry in the held locks array, to enable deadlock detection.
+	 */
+	grab_held_lock_entry(lock);
+
 	/*
 	 * We're pending, wait for the owner to go away.
 	 *
@@ -205,7 +346,7 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 	 */
 	if (val & _Q_LOCKED_MASK) {
 		RES_RESET_TIMEOUT(ts);
-		smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, ret));
+		smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, ret, _Q_LOCKED_MASK));
 	}
 
 	if (ret) {
@@ -220,7 +361,7 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 		 */
 		clear_pending(lock);
 		lockevent_inc(rqspinlock_lock_timeout);
-		return ret;
+		goto err_release_entry;
 	}
 
 	/*
@@ -238,6 +379,11 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 	 */
 queue:
 	lockevent_inc(lock_slowpath);
+	/*
+	 * Grab deadlock detection entry for the queue path.
+	 */
+	grab_held_lock_entry(lock);
+
 	node = this_cpu_ptr(&qnodes[0].mcs);
 	idx = node->count++;
 	tail = encode_tail(smp_processor_id(), idx);
@@ -257,9 +403,9 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 		lockevent_inc(lock_no_node);
 		RES_RESET_TIMEOUT(ts);
 		while (!queued_spin_trylock(lock)) {
-			if (RES_CHECK_TIMEOUT(ts, ret)) {
+			if (RES_CHECK_TIMEOUT(ts, ret, ~0u)) {
 				lockevent_inc(rqspinlock_lock_timeout);
-				break;
+				goto err_release_node;
 			}
 			cpu_relax();
 		}
@@ -350,7 +496,7 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 	 */
 	RES_RESET_TIMEOUT(ts);
 	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK) ||
-				       RES_CHECK_TIMEOUT(ts, ret));
+				       RES_CHECK_TIMEOUT(ts, ret, _Q_LOCKED_PENDING_MASK));
 
 waitq_timeout:
 	if (ret) {
@@ -375,7 +521,7 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 			WRITE_ONCE(next->locked, RES_TIMEOUT_VAL);
 		}
 		lockevent_inc(rqspinlock_lock_timeout);
-		goto release;
+		goto err_release_node;
 	}
 
 	/*
@@ -422,5 +568,11 @@  int __lockfunc resilient_queued_spin_lock_slowpath(struct qspinlock *lock, u32 v
 	 */
 	__this_cpu_dec(qnodes[0].mcs.count);
 	return ret;
+err_release_node:
+	trace_contention_end(lock, ret);
+	__this_cpu_dec(qnodes[0].mcs.count);
+err_release_entry:
+	release_held_lock_entry();
+	return ret;
 }
 EXPORT_SYMBOL(resilient_queued_spin_lock_slowpath);