diff mbox series

[RFC,1/1] hpref: Hazard Pointers with Reference Counter

Message ID 20240921164210.256278-1-mathieu.desnoyers@efficios.com (mailing list archive)
State Superseded
Headers show
Series [RFC,1/1] hpref: Hazard Pointers with Reference Counter | expand

Commit Message

Mathieu Desnoyers Sept. 21, 2024, 4:42 p.m. UTC
Boqun Feng's patch series and LPC talk gave me a few ideas I wanted to
try. I figured we could improve the concept of reference counters by
adding a hazard-pointer protected fast-path to them.

This API combines hazard pointers and reference counters.
It uses hazard pointers as fast-paths, and falls back to reference
counters either explicitly when the reader expects to hold the object
for a long time, or when no hazard pointer slots are available.

This prototype is implemented in userspace within the liburcu project,
and depends on librseq for per-cpu data structure allocation, indexing,
and updates.

- The top of this liburcu feature branch can be found at:
  https://github.com/compudj/userspace-rcu-dev/tree/hpref
  (it's implemented within liburcu for convenience, but it does
  not actually use RCU).

- librseq can be found at:
  https://git.kernel.org/pub/scm/libs/librseq/librseq.git/

This leverages the fact that both synchronization mechanisms aim to
guarantee existence of objects, and those existence guarantees can be
chained. Each mechanism achieves its purpose in a different way with
different tradeoffs. The hazard pointers are faster to read and scale
better than reference counters, but they consume more memory than a
per-object reference counter.

The fall-back to reference counter allows bounding the number of
hazard pointer slots to a fixed size for the entire system:
nr_cpus * N, where N=8 as it fills a single 64 bytes cache line on
64-bit architectures.

Porting it to the Linux kernel should be straightforward. We might
want to pick heavily contented reference counts such as the mm_struct
mm_count field as a starting point to see if it provides significant
performance gains.

The hpref read-side performs well even compared to RCU in my benchmarks:

    Results:

    CPU(s):                  16
      On-line CPU(s) list:   0-15
    Vendor ID:               AuthenticAMD
      Model name:            AMD Ryzen 7 PRO 6850U with Radeon Graphics

    8 readers, 1 writer, 10s

    test_urcu_mb         (smp_mb)             nr_reads    829829784 nr_writes     18057836 nr_ops    847887620
    test_hpref_benchmark (smp_mb)             nr_reads   2040376076 nr_writes      2993246 nr_ops   2043369322
    test_hpref_benchmark (barrier/membarrier) nr_reads  10609164704 nr_writes      2208432 nr_ops  10611373136
    test_urcu_bp         (barrier/membarrier) nr_reads  20242102863 nr_writes       599484 nr_ops  20242702347
    test_urcu            (barrier/membarrier) nr_reads  20714490759 nr_writes       782045 nr_ops  20715272804
    test_urcu_qsbr                            nr_reads  40774708959 nr_writes      3512904 nr_ops  40778221863

References:

[1]: M. M. Michael, "Hazard pointers: safe memory reclamation for
     lock-free objects," in IEEE Transactions on Parallel and
     Distributed Systems, vol. 15, no. 6, pp. 491-504, June 2004

Link: https://lore.kernel.org/lkml/j3scdl5iymjlxavomgc6u5ndg3svhab6ga23dr36o4f5mt333w@7xslvq6b6hmv/
Link: https://lpc.events/event/18/contributions/1731/
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I6369064a0e1a1f9632394df31ff41c76905d17e3
Cc: "Paul E. McKenney" <paulmck@kernel.org>
Cc: Will Deacon <will@kernel.org>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Boqun Feng <boqun.feng@gmail.com>
Cc: Alan Stern <stern@rowland.harvard.edu>
Cc: John Stultz <jstultz@google.com>
To: Neeraj Upadhyay <Neeraj.Upadhyay@amd.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
Cc: Boqun Feng <boqun.feng@gmail.com>
Cc: Frederic Weisbecker <frederic@kernel.org>
Cc: Joel Fernandes <joel@joelfernandes.org>
Cc: Josh Triplett <josh@joshtriplett.org>
Cc: Uladzislau Rezki <urezki@gmail.com>
Cc: Steven Rostedt <rostedt@goodmis.org>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
Cc: Zqiang <qiang.zhang1211@gmail.com>
Cc: Ingo Molnar <mingo@redhat.com>
Cc: Waiman Long <longman@redhat.com>
Cc: Mark Rutland <mark.rutland@arm.com>
Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Vlastimil Babka <vbabka@suse.cz>
Cc: maged.michael@gmail.com
Cc: Mateusz Guzik <mjguzik@gmail.com>
Cc: rcu@vger.kernel.org
Cc: linux-mm@kvack.org
Cc: lkmm@lists.linux.dev
---
 include/urcu/hpref.h | 222 +++++++++++++++++++++++++++++++++++++++++++
 src/Makefile.am      |   6 +-
 src/hpref.c          |  78 +++++++++++++++
 3 files changed, 305 insertions(+), 1 deletion(-)
 create mode 100644 include/urcu/hpref.h
 create mode 100644 src/hpref.c

Comments

Lai Jiangshan Sept. 21, 2024, 9:07 p.m. UTC | #1
> +/*
> + * hpref_hp_get: Obtain a reference to a stable object, protected either
> + *               by hazard pointer (fast-path) or using reference
> + *               counter as fall-back.
> + */
> +static inline
> +bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
> +{
> +       int cpu = rseq_current_cpu_raw();
> +       struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
> +       struct hpref_slot *slot = &cpu_slots->slots[cpu_slots->current_slot];
> +       bool use_refcount = false;
> +       struct hpref_node *node, *node2;
> +       unsigned int next_slot;
> +
> +retry:
> +       node = uatomic_load(node_p, CMM_RELAXED);
> +       if (!node)
> +               return false;
> +       /* Use rseq to try setting current slot hp. Store B. */
> +       if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
> +                               (intptr_t *) &slot->node, (intptr_t) NULL,
> +                               (intptr_t) node, cpu)) {
> +               slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];

Can @cpu be possibly changed? if it can, it seems @cpu and @cpu_slots
should be updated first.

> +               use_refcount = true;
> +               /*
> +                * This may busy-wait for another reader using the
> +                * emergency slot to transition to refcount.
> +                */
> +               caa_cpu_relax();
> +               goto retry;
> +       }
Mathieu Desnoyers Sept. 22, 2024, 7:47 a.m. UTC | #2
On 2024-09-21 23:07, Lai Jiangshan wrote:
>> +/*
>> + * hpref_hp_get: Obtain a reference to a stable object, protected either
>> + *               by hazard pointer (fast-path) or using reference
>> + *               counter as fall-back.
>> + */
>> +static inline
>> +bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
>> +{
>> +       int cpu = rseq_current_cpu_raw();
>> +       struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
>> +       struct hpref_slot *slot = &cpu_slots->slots[cpu_slots->current_slot];
>> +       bool use_refcount = false;
>> +       struct hpref_node *node, *node2;
>> +       unsigned int next_slot;
>> +
>> +retry:
>> +       node = uatomic_load(node_p, CMM_RELAXED);
>> +       if (!node)
>> +               return false;
>> +       /* Use rseq to try setting current slot hp. Store B. */
>> +       if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
>> +                               (intptr_t *) &slot->node, (intptr_t) NULL,
>> +                               (intptr_t) node, cpu)) {
>> +               slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
> 
> Can @cpu be possibly changed? if it can, it seems @cpu and @cpu_slots
> should be updated first.

Indeed, if migration happens between rseq_current_cpu_raw() and
execution of rseq_load_cbne_store__ptr(), it will cause this
second operation to fail. This in turn could cause the reader
to retry for a long time (possibly forever) until it finally
migrates back to the original CPU. As you suggest, we should
re-load cpu and cpu_slots after failure here. More specifically,
we should re-load those when the rseq c.s. fails with -1, which
means it was abort or there was a cpu number mismatch. If the
rseq c.s. returns 1, this means the slot did not contain NULL,
so all we need to do is move over to the next slot.

While applying your suggested change, I noticed that I can
improve the fast-path by removing the notion of "current_slot"
number, and thus remove increment of this hint from the
fast path. The fast path will instead just scan all 8 slots
trying to find a NULL one. This also lessens the odds that
the fast-path must fallback to refcount in case of concurrent
use. It provides a 50% performance improvement for the fast
path with barrier/membarrier pairing.

I've updated the https://github.com/compudj/userspace-rcu-dev/tree/hpref
volatile feature branch with these changes. I'll give others some
time to provide feedback on the overall idea before sending out
an updated patch.

Thanks for your feedback!

Mathieu

> 
>> +               use_refcount = true;
>> +               /*
>> +                * This may busy-wait for another reader using the
>> +                * emergency slot to transition to refcount.
>> +                */
>> +               caa_cpu_relax();
>> +               goto retry;
>> +       }
Jonas Oberhauser Sept. 25, 2024, 5:57 a.m. UTC | #3
Hi Mathieu,

interesting idea. Conceptually it looks good.

There's another approach of using hazard pointer to optimize shared 
reference counting (to make it lock-free in common cases).

https://github.com/cmuparlay/concurrent_deferred_rc

It doesn't go as far as what you're doing, but they also use the hazard 
pointer to protect the refcount (like you do in the promote function).

I haven't read your code in detail but it seems to me you have an ABA 
bug: as I explained elsewhere, you could read the same pointer after ABA 
but you don't synchronize with the newer store that gave you node2, 
leaving you to speculatively read stale values through *ctx->hp.
(I am assuming here that ctx->hp is essentially an out parameter used to 
let the caller know which node got protected).

Have fun,
   jonas

> +/*
> + * hpref_hp_get: Obtain a reference to a stable object, protected either
> + *               by hazard pointer (fast-path) or using reference
> + *               counter as fall-back.
> + */
> +static inline
> +bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
> +{
> +	int cpu = rseq_current_cpu_raw();
> +	struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
> +	struct hpref_slot *slot = &cpu_slots->slots[cpu_slots->current_slot];
> +	bool use_refcount = false;
> +	struct hpref_node *node, *node2;
> +	unsigned int next_slot;
> +
> +retry:
> +	node = uatomic_load(node_p, CMM_RELAXED);
> +	if (!node)
> +		return false;
> +	/* Use rseq to try setting current slot hp. Store B. */
> +	if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
> +				(intptr_t *) &slot->node, (intptr_t) NULL,
> +				(intptr_t) node, cpu)) {
> +		slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
> +		use_refcount = true;
> +		/*
> +		 * This may busy-wait for another reader using the
> +		 * emergency slot to transition to refcount.
> +		 */
> +		caa_cpu_relax();
> +		goto retry;
> +	}
> +	/* Memory ordering: Store B before Load A. */
> +	cmm_smp_mb();
> +	node2 = uatomic_load(node_p, CMM_RELAXED);	/* Load A */
> +	if (node != node2) {
> +		uatomic_store(&slot->node, NULL, CMM_RELAXED);
> +		if (!node2)
> +			return false;
> +		goto retry;
> +	}
> +	ctx->type = HPREF_TYPE_HP;
> +	ctx->hp = node;
                    ^---- here

> +	ctx->slot = slot;
> +	if (use_refcount) {
> +		hpref_promote_hp_to_ref(ctx);
> +		return true;
> +	}
> +	/*
> +	 * Increment current slot (racy increment is OK because it is
> +	 * just a position hint). Skip the emergency slot.
> +	 */
> +	next_slot = uatomic_load(&cpu_slots->current_slot, CMM_RELAXED) + 1;
> +	if (next_slot >= HPREF_EMERGENCY_SLOT)
> +		next_slot = 0;
> +	uatomic_store(&cpu_slots->current_slot, next_slot, CMM_RELAXED);
> +	return true;
> +}
> +
> +static inline
> +void hpref_put(struct hpref_ctx *ctx)
> +{
> +	if (ctx->type == HPREF_TYPE_REF) {
> +		urcu_ref_put(&ctx->hp->refcount, hpref_release);
> +	} else {
> +		/* Release HP. */
> +		uatomic_store(&ctx->slot->node, NULL, CMM_RELEASE);
> +	}
> +	ctx->hp = NULL;
> +}
> +
> +/*
> + * hpref_set_pointer: Store pointer @node to @ptr, with RCU publication
> + *                    guarantees.
> + */
> +static inline
> +void hpref_set_pointer(struct hpref_node **ptr, struct hpref_node *node)
> +{
> +	if (__builtin_constant_p(node) && node == NULL)
> +		uatomic_store(ptr, NULL, CMM_RELAXED);
> +	else
> +		uatomic_store(ptr, node, CMM_RELEASE);
> +}
> +
> +/*
> + * Return the content of the hpref context hazard pointer field.
> + */
> +static inline
> +struct hpref_node *hpref_ctx_pointer(struct hpref_ctx *ctx)
> +{
> +	return ctx->hp;
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _URCU_HPREF_H */
> diff --git a/src/Makefile.am b/src/Makefile.am
> index b555c818..7312c9f7 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -19,7 +19,8 @@ RCULFHASH = rculfhash.c rculfhash-mm-order.c rculfhash-mm-chunk.c \
>   lib_LTLIBRARIES = liburcu-common.la \
>   		liburcu.la liburcu-qsbr.la \
>   		liburcu-mb.la liburcu-bp.la \
> -		liburcu-memb.la liburcu-cds.la
> +		liburcu-memb.la liburcu-cds.la \
> +		liburcu-hpref.la
>   
>   #
>   # liburcu-common contains wait-free queues (needed by call_rcu) as well
> @@ -50,6 +51,9 @@ liburcu_cds_la_SOURCES = rculfqueue.c rculfstack.c lfstack.c \
>   	workqueue.c workqueue.h $(RCULFHASH) $(COMPAT)
>   liburcu_cds_la_LIBADD = liburcu-common.la
>   
> +liburcu_hpref_la_SOURCES = hpref.c
> +liburcu_hpref_la_LIBADD = -lrseq
> +
>   pkgconfigdir = $(libdir)/pkgconfig
>   pkgconfig_DATA = liburcu-cds.pc liburcu.pc liburcu-bp.pc liburcu-qsbr.pc \
>   	liburcu-mb.pc liburcu-memb.pc
> diff --git a/src/hpref.c b/src/hpref.c
> new file mode 100644
> index 00000000..f63530f5
> --- /dev/null
> +++ b/src/hpref.c
> @@ -0,0 +1,78 @@
> +// SPDX-FileCopyrightText: 2024 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
> +//
> +// SPDX-License-Identifier: LGPL-2.1-or-later
> +
> +/*
> + * HPREF: Hazard pointers with reference counters
> + */
> +
> +#define _LGPL_SOURCE
> +#include <urcu/hpref.h>
> +#include <rseq/mempool.h>	/* Per-CPU memory */
> +
> +static struct rseq_mempool *mempool;
> +struct hpref_percpu_slots *hpref_percpu_slots;
> +
> +void hpref_release(struct urcu_ref *ref)
> +{
> +	struct hpref_node *node = caa_container_of(ref, struct hpref_node, refcount);
> +
> +	node->release(node);
> +}
> +
> +/*
> + * hpref_synchronize: Wait for any reader possessing a hazard pointer to
> + *                    @node to clear its hazard pointer slot.
> + */
> +void hpref_synchronize(struct hpref_node *node)
> +{
> +	int nr_cpus = rseq_get_max_nr_cpus(), cpu;
> +
> +	/* Memory ordering: Store A before Load B. */
> +	cmm_smp_mb();
> +	/* Scan all CPUs slots. */
> +	for (cpu = 0; cpu < nr_cpus; cpu++) {
> +		struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
> +		struct hpref_slot *slot;
> +		unsigned int i;
> +
> +		for (i = 0; i < HPREF_NR_PERCPU_SLOTS; i++) {
> +			slot = &cpu_slots->slots[i];
> +			/* Busy-wait if node is found. */
> +			while (uatomic_load(&slot->node, CMM_ACQUIRE) == node)	/* Load B */
> +				caa_cpu_relax();
> +		}
> +	}
> +}
> +
> +/*
> + * hpref_synchronize_put: Wait for any reader possessing a hazard
> + *                        pointer to clear its slot and put reference
> + *                        count.
> + */
> +void hpref_synchronize_put(struct hpref_node *node)
> +{
> +	if (!node)
> +		return;
> +	hpref_synchronize(node);
> +	urcu_ref_put(&node->refcount, hpref_release);
> +}
> +
> +static __attribute__((constructor))
> +void hpref_init(void)
> +{
> +	mempool = rseq_mempool_create("hpref", sizeof(struct hpref_percpu_slots), NULL);
> +	if (!mempool)
> +		abort();
> +	hpref_percpu_slots = rseq_mempool_percpu_zmalloc(mempool);
> +	if (!hpref_percpu_slots)
> +		abort();
> +}
> +
> +static __attribute__((destructor))
> +void hpref_exit(void)
> +{
> +	rseq_mempool_percpu_free(hpref_percpu_slots);
> +	if (rseq_mempool_destroy(mempool))
> +		abort();
> +}
Mathieu Desnoyers Sept. 25, 2024, 6:35 a.m. UTC | #4
On 2024-09-25 07:57, Jonas Oberhauser wrote:
> Hi Mathieu,
> 
> interesting idea. Conceptually it looks good.
> 
> There's another approach of using hazard pointer to optimize shared 
> reference counting (to make it lock-free in common cases).
> 
> https://github.com/cmuparlay/concurrent_deferred_rc
> 
> It doesn't go as far as what you're doing, but they also use the hazard 
> pointer to protect the refcount (like you do in the promote function).

Thanks for the reference. Indeed, one use-case of this HP+refcount
scheme would be to implement something that resembles C++ shared
pointers. I'm glad to see I'm not alone in seeing potential in this
approach.

> I haven't read your code in detail but it seems to me you have an ABA 
> bug: as I explained elsewhere, you could read the same pointer after ABA 
> but you don't synchronize with the newer store that gave you node2, 
> leaving you to speculatively read stale values through *ctx->hp.
> (I am assuming here that ctx->hp is essentially an out parameter used to 
> let the caller know which node got protected).

You are of course absolutely right. The following change should fix it:

  	cmm_barrier();
-	node2 = uatomic_load(node_p, CMM_RELAXED);	/* Load A */
+	node2 = rcu_dereference(*node_p);	/* Load A */

Please let me know if I'm missing anything.

Thanks,

Mathieu


> 
> Have fun,
>    jonas
> 
>> +/*
>> + * hpref_hp_get: Obtain a reference to a stable object, protected either
>> + *               by hazard pointer (fast-path) or using reference
>> + *               counter as fall-back.
>> + */
>> +static inline
>> +bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
>> +{
>> +    int cpu = rseq_current_cpu_raw();
>> +    struct hpref_percpu_slots *cpu_slots = 
>> rseq_percpu_ptr(hpref_percpu_slots, cpu);
>> +    struct hpref_slot *slot = 
>> &cpu_slots->slots[cpu_slots->current_slot];
>> +    bool use_refcount = false;
>> +    struct hpref_node *node, *node2;
>> +    unsigned int next_slot;
>> +
>> +retry:
>> +    node = uatomic_load(node_p, CMM_RELAXED);
>> +    if (!node)
>> +        return false;
>> +    /* Use rseq to try setting current slot hp. Store B. */
>> +    if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
>> +                (intptr_t *) &slot->node, (intptr_t) NULL,
>> +                (intptr_t) node, cpu)) {
>> +        slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
>> +        use_refcount = true;
>> +        /*
>> +         * This may busy-wait for another reader using the
>> +         * emergency slot to transition to refcount.
>> +         */
>> +        caa_cpu_relax();
>> +        goto retry;
>> +    }
>> +    /* Memory ordering: Store B before Load A. */
>> +    cmm_smp_mb();
>> +    node2 = uatomic_load(node_p, CMM_RELAXED);    /* Load A */
>> +    if (node != node2) {
>> +        uatomic_store(&slot->node, NULL, CMM_RELAXED);
>> +        if (!node2)
>> +            return false;
>> +        goto retry;
>> +    }
>> +    ctx->type = HPREF_TYPE_HP;
>> +    ctx->hp = node;
>                     ^---- here
> 
>> +    ctx->slot = slot;
>> +    if (use_refcount) {
>> +        hpref_promote_hp_to_ref(ctx);
>> +        return true;
>> +    }
>> +    /*
>> +     * Increment current slot (racy increment is OK because it is
>> +     * just a position hint). Skip the emergency slot.
>> +     */
>> +    next_slot = uatomic_load(&cpu_slots->current_slot, CMM_RELAXED) + 1;
>> +    if (next_slot >= HPREF_EMERGENCY_SLOT)
>> +        next_slot = 0;
>> +    uatomic_store(&cpu_slots->current_slot, next_slot, CMM_RELAXED);
>> +    return true;
>> +}
>> +
>> +static inline
>> +void hpref_put(struct hpref_ctx *ctx)
>> +{
>> +    if (ctx->type == HPREF_TYPE_REF) {
>> +        urcu_ref_put(&ctx->hp->refcount, hpref_release);
>> +    } else {
>> +        /* Release HP. */
>> +        uatomic_store(&ctx->slot->node, NULL, CMM_RELEASE);
>> +    }
>> +    ctx->hp = NULL;
>> +}
>> +
>> +/*
>> + * hpref_set_pointer: Store pointer @node to @ptr, with RCU publication
>> + *                    guarantees.
>> + */
>> +static inline
>> +void hpref_set_pointer(struct hpref_node **ptr, struct hpref_node *node)
>> +{
>> +    if (__builtin_constant_p(node) && node == NULL)
>> +        uatomic_store(ptr, NULL, CMM_RELAXED);
>> +    else
>> +        uatomic_store(ptr, node, CMM_RELEASE);
>> +}
>> +
>> +/*
>> + * Return the content of the hpref context hazard pointer field.
>> + */
>> +static inline
>> +struct hpref_node *hpref_ctx_pointer(struct hpref_ctx *ctx)
>> +{
>> +    return ctx->hp;
>> +}
>> +
>> +#ifdef __cplusplus
>> +}
>> +#endif
>> +
>> +#endif /* _URCU_HPREF_H */
>> diff --git a/src/Makefile.am b/src/Makefile.am
>> index b555c818..7312c9f7 100644
>> --- a/src/Makefile.am
>> +++ b/src/Makefile.am
>> @@ -19,7 +19,8 @@ RCULFHASH = rculfhash.c rculfhash-mm-order.c 
>> rculfhash-mm-chunk.c \
>>   lib_LTLIBRARIES = liburcu-common.la \
>>           liburcu.la liburcu-qsbr.la \
>>           liburcu-mb.la liburcu-bp.la \
>> -        liburcu-memb.la liburcu-cds.la
>> +        liburcu-memb.la liburcu-cds.la \
>> +        liburcu-hpref.la
>>   #
>>   # liburcu-common contains wait-free queues (needed by call_rcu) as well
>> @@ -50,6 +51,9 @@ liburcu_cds_la_SOURCES = rculfqueue.c rculfstack.c 
>> lfstack.c \
>>       workqueue.c workqueue.h $(RCULFHASH) $(COMPAT)
>>   liburcu_cds_la_LIBADD = liburcu-common.la
>> +liburcu_hpref_la_SOURCES = hpref.c
>> +liburcu_hpref_la_LIBADD = -lrseq
>> +
>>   pkgconfigdir = $(libdir)/pkgconfig
>>   pkgconfig_DATA = liburcu-cds.pc liburcu.pc liburcu-bp.pc 
>> liburcu-qsbr.pc \
>>       liburcu-mb.pc liburcu-memb.pc
>> diff --git a/src/hpref.c b/src/hpref.c
>> new file mode 100644
>> index 00000000..f63530f5
>> --- /dev/null
>> +++ b/src/hpref.c
>> @@ -0,0 +1,78 @@
>> +// SPDX-FileCopyrightText: 2024 Mathieu Desnoyers 
>> <mathieu.desnoyers@efficios.com>
>> +//
>> +// SPDX-License-Identifier: LGPL-2.1-or-later
>> +
>> +/*
>> + * HPREF: Hazard pointers with reference counters
>> + */
>> +
>> +#define _LGPL_SOURCE
>> +#include <urcu/hpref.h>
>> +#include <rseq/mempool.h>    /* Per-CPU memory */
>> +
>> +static struct rseq_mempool *mempool;
>> +struct hpref_percpu_slots *hpref_percpu_slots;
>> +
>> +void hpref_release(struct urcu_ref *ref)
>> +{
>> +    struct hpref_node *node = caa_container_of(ref, struct 
>> hpref_node, refcount);
>> +
>> +    node->release(node);
>> +}
>> +
>> +/*
>> + * hpref_synchronize: Wait for any reader possessing a hazard pointer to
>> + *                    @node to clear its hazard pointer slot.
>> + */
>> +void hpref_synchronize(struct hpref_node *node)
>> +{
>> +    int nr_cpus = rseq_get_max_nr_cpus(), cpu;
>> +
>> +    /* Memory ordering: Store A before Load B. */
>> +    cmm_smp_mb();
>> +    /* Scan all CPUs slots. */
>> +    for (cpu = 0; cpu < nr_cpus; cpu++) {
>> +        struct hpref_percpu_slots *cpu_slots = 
>> rseq_percpu_ptr(hpref_percpu_slots, cpu);
>> +        struct hpref_slot *slot;
>> +        unsigned int i;
>> +
>> +        for (i = 0; i < HPREF_NR_PERCPU_SLOTS; i++) {
>> +            slot = &cpu_slots->slots[i];
>> +            /* Busy-wait if node is found. */
>> +            while (uatomic_load(&slot->node, CMM_ACQUIRE) == node)    
>> /* Load B */
>> +                caa_cpu_relax();
>> +        }
>> +    }
>> +}
>> +
>> +/*
>> + * hpref_synchronize_put: Wait for any reader possessing a hazard
>> + *                        pointer to clear its slot and put reference
>> + *                        count.
>> + */
>> +void hpref_synchronize_put(struct hpref_node *node)
>> +{
>> +    if (!node)
>> +        return;
>> +    hpref_synchronize(node);
>> +    urcu_ref_put(&node->refcount, hpref_release);
>> +}
>> +
>> +static __attribute__((constructor))
>> +void hpref_init(void)
>> +{
>> +    mempool = rseq_mempool_create("hpref", sizeof(struct 
>> hpref_percpu_slots), NULL);
>> +    if (!mempool)
>> +        abort();
>> +    hpref_percpu_slots = rseq_mempool_percpu_zmalloc(mempool);
>> +    if (!hpref_percpu_slots)
>> +        abort();
>> +}
>> +
>> +static __attribute__((destructor))
>> +void hpref_exit(void)
>> +{
>> +    rseq_mempool_percpu_free(hpref_percpu_slots);
>> +    if (rseq_mempool_destroy(mempool))
>> +        abort();
>> +}
>
Jonas Oberhauser Sept. 25, 2024, 10:06 a.m. UTC | #5
Am 9/25/2024 um 8:35 AM schrieb Mathieu Desnoyers:
> On 2024-09-25 07:57, Jonas Oberhauser wrote:
>> Hi Mathieu,

>> I haven't read your code in detail but it seems to me you have an ABA 
>> bug: as I explained elsewhere, you could read the same pointer after 
>> ABA but you don't synchronize with the newer store that gave you 
>> node2, leaving you to speculatively read stale values through *ctx->hp.
>> (I am assuming here that ctx->hp is essentially an out parameter used 
>> to let the caller know which node got protected).
> 
> The following change should fix it:
> 
>       cmm_barrier();
> -    node2 = uatomic_load(node_p, CMM_RELAXED);    /* Load A */
> +    node2 = rcu_dereference(*node_p);    /* Load A */
> 

I don't think this fixes it, because IIRC rcu_dereference relies on the 
address dependency (which we don't have here) to provide ordering.

I would recommend either:

-    ctx->hp = node;
+    ctx->hp = node2;

which fixes the problem under the perhaps too weak assumption that the 
compiler doesn't use its knowledge that node==node2 to just undo this 
fix, or more strictly,

+    ctx->hp = READ_ONCE(node2);

which I believe makes sure that the value of node2 is used.

Alternatively you could always use an acquire load.


Best wishes,

   jonas
Mathieu Desnoyers Sept. 25, 2024, 11:36 a.m. UTC | #6
On 2024-09-25 12:06, Jonas Oberhauser wrote:
> 
> 
> Am 9/25/2024 um 8:35 AM schrieb Mathieu Desnoyers:
>> On 2024-09-25 07:57, Jonas Oberhauser wrote:
>>> Hi Mathieu,
> 
>>> I haven't read your code in detail but it seems to me you have an ABA 
>>> bug: as I explained elsewhere, you could read the same pointer after 
>>> ABA but you don't synchronize with the newer store that gave you 
>>> node2, leaving you to speculatively read stale values through *ctx->hp.
>>> (I am assuming here that ctx->hp is essentially an out parameter used 
>>> to let the caller know which node got protected).
>>
>> The following change should fix it:
>>
>>       cmm_barrier();
>> -    node2 = uatomic_load(node_p, CMM_RELAXED);    /* Load A */
>> +    node2 = rcu_dereference(*node_p);    /* Load A */
>>
> 
> I don't think this fixes it, because IIRC rcu_dereference relies on the 
> address dependency (which we don't have here) to provide ordering.
> 
> I would recommend either:
> 
> -    ctx->hp = node;
> +    ctx->hp = node2;
> 
> which fixes the problem under the perhaps too weak assumption that the 
> compiler doesn't use its knowledge that node==node2 to just undo this 
> fix, or more strictly,

As stated in Documentation/RCU/rcu_dereference.rst from the Linux
kernel, comparing the result of rcu_dereference against another
non-NULL pointer is discouraged, as you rightly point out.

> 
> +    ctx->hp = READ_ONCE(node2);
> 
> which I believe makes sure that the value of node2 is used.

I am not entirely sure this extra READ_ONCE() would be sufficient
to prevent the compiler from making assumptions about the content
of node2 and thus use the result of the first load (node) instead.
It would also not suffice to prevent the CPU from speculatively
using the result of the first load to perform dependent loads AFAIU.

> Alternatively you could always use an acquire load.

Unless someone comes up with a sound alternate approach,
I am tempted to go with an acquire load as the second load
within hpref_hp_get().

This way, the compiler would not attempt to use the
node value from the first load for dependent loads,
and the and CPU won't try to speculate dependent loads
either.

Thanks,

Mathieu

> 
> 
> Best wishes,
> 
>    jonas
>
Jonas Oberhauser Sept. 25, 2024, 12:02 p.m. UTC | #7
Am 9/25/2024 um 1:36 PM schrieb Mathieu Desnoyers:
> On 2024-09-25 12:06, Jonas Oberhauser wrote:
>>
>>
>> Am 9/25/2024 um 8:35 AM schrieb Mathieu Desnoyers:
>>> On 2024-09-25 07:57, Jonas Oberhauser wrote:
>>>> Hi Mathieu,
>>
>>>> I haven't read your code in detail but it seems to me you have an 
>>>> ABA bug: as I explained elsewhere, you could read the same pointer 
>>>> after ABA but you don't synchronize with the newer store that gave 
>>>> you node2, leaving you to speculatively read stale values through 
>>>> *ctx->hp.
>>>> (I am assuming here that ctx->hp is essentially an out parameter 
>>>> used to let the caller know which node got protected).
>>>
>>> The following change should fix it:
>>>
>>>       cmm_barrier();
>>> -    node2 = uatomic_load(node_p, CMM_RELAXED);    /* Load A */
>>> +    node2 = rcu_dereference(*node_p);    /* Load A */
>>>
>>
>> I don't think this fixes it, because IIRC rcu_dereference relies on 
>> the address dependency (which we don't have here) to provide ordering.
>>
>> I would recommend either:
>>
>> -    ctx->hp = node;
>> +    ctx->hp = node2;
>>
>> which fixes the problem under the perhaps too weak assumption that the 
>> compiler doesn't use its knowledge that node==node2 to just undo this 
>> fix, or more strictly,
> 
> As stated in Documentation/RCU/rcu_dereference.rst from the Linux
> kernel, comparing the result of rcu_dereference against another
> non-NULL pointer is discouraged, as you rightly point out.
> 
>>
>> +    ctx->hp = READ_ONCE(node2);
>>
>> which I believe makes sure that the value of node2 is used.
> 
> I am not entirely sure this extra READ_ONCE() would be sufficient
> to prevent the compiler from making assumptions about the content
> of node2 and thus use the result of the first load (node) instead.
> It would also not suffice to prevent the CPU from speculatively
> using the result of the first load to perform dependent loads AFAIU.

The reason I think it should be sufficient is that it forces the 
compiler to assume that since the comparison, the value of node2 might 
have changed.
Therefore, simply using the value of node at that point should be 
unsound from the compiler's POV.

But I'm not a compiler expert... So I definitely support uneasiness 
about this construct :))

jonas
Jonas Oberhauser Sept. 28, 2024, 11:22 a.m. UTC | #8
Two more questions below:

Am 9/21/2024 um 6:42 PM schrieb Mathieu Desnoyers:
> +#define NR_PERCPU_SLOTS_BITS	3

Have you measured any advantage of this multi-slot version vs a version 
with just one normal slot and one emergency slot?
With just one normal slot, the normal slot version would always be zero, 
and there'd be no need to increment etc., which might make the common 
case (no conflict) faster.

Either way I recommend stress testing with just one normal slot to 
increase the chance of conflict (and hence triggering corner cases) 
during stress testing.

> +retry:
> +	node = uatomic_load(node_p, CMM_RELAXED);
> +	if (!node)
> +		return false;
> +	/* Use rseq to try setting current slot hp. Store B. */
> +	if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
> +				(intptr_t *) &slot->node, (intptr_t) NULL,
> +				(intptr_t) node, cpu)) {
> +		slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
> +		use_refcount = true;
> +		/*
> +		 * This may busy-wait for another reader using the
> +		 * emergency slot to transition to refcount.
> +		 */
> +		caa_cpu_relax();
> +		goto retry;
> +	}

I'm not familiar with Linux' preemption model. Can this deadlock if a 
low-interrupt-level thread is occupying the EMERGENCY slot and a 
higher-interrupt-level thread is also trying to take it?




Best wishes,
   jonas
Mathieu Desnoyers Sept. 28, 2024, 11:33 a.m. UTC | #9
On 2024-09-28 13:22, Jonas Oberhauser wrote:
> Two more questions below:
> 
> Am 9/21/2024 um 6:42 PM schrieb Mathieu Desnoyers:
>> +#define NR_PERCPU_SLOTS_BITS    3
> 
> Have you measured any advantage of this multi-slot version vs a version 
> with just one normal slot and one emergency slot?

No, I have not. That being said, I am taking a minimalistic
approach that takes things even further in the "simple" direction
for what I will send as RFC against the Linux kernel:
there is just the one "emergency slot", irqs are disabled around
use of the HP slot, and promotion to refcount is done before
returning to the caller.

> With just one normal slot, the normal slot version would always be zero, 
> and there'd be no need to increment etc., which might make the common 
> case (no conflict) faster.

The multi-slots allows preemption while holding the slot. It also allows
HP slots users to keep it longer without doing to refcount right away.

I even have a patch that dynamically adapts the scan depth (increase
by reader, decrease by synchronize, with an hysteresis) in my userspace
prototype. This splits the number of allocated slots from the scan
depth. But I keep that for later and will focus on the simple case
first (single HP slot, only used with irqoff).

> 
> Either way I recommend stress testing with just one normal slot to 
> increase the chance of conflict (and hence triggering corner cases) 
> during stress testing.

Good point.

> 
>> +retry:
>> +    node = uatomic_load(node_p, CMM_RELAXED);
>> +    if (!node)
>> +        return false;
>> +    /* Use rseq to try setting current slot hp. Store B. */
>> +    if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
>> +                (intptr_t *) &slot->node, (intptr_t) NULL,
>> +                (intptr_t) node, cpu)) {
>> +        slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
>> +        use_refcount = true;
>> +        /*
>> +         * This may busy-wait for another reader using the
>> +         * emergency slot to transition to refcount.
>> +         */
>> +        caa_cpu_relax();
>> +        goto retry;
>> +    }
> 
> I'm not familiar with Linux' preemption model. Can this deadlock if a 
> low-interrupt-level thread is occupying the EMERGENCY slot and a 
> higher-interrupt-level thread is also trying to take it?

This is a userspace prototype. This will behave similarly to a userspace
spinlock in that case, which is not great in terms of CPU usage, but
should eventually unblock the waiter, unless it has a RT priority that
really prevents any progress from the emergency slot owner.

On my TODO list, I have a bullet about integrating with sys_futex to
block on wait, wake up on slot release. I would then use the wait/wakeup
code based on sys_futex already present in liburcu.

Thanks!

Mathieu


> 
> 
> 
> 
> Best wishes,
>    jonas
>
Jonas Oberhauser Sept. 28, 2024, 11:50 a.m. UTC | #10
Thanks for your response,

Am 9/28/2024 um 1:33 PM schrieb Mathieu Desnoyers:
> 
> This is a userspace prototype. This will behave similarly to a userspace
> spinlock in that case, which is not great in terms of CPU usage, but
> should eventually unblock the waiter, unless it has a RT priority that
> really prevents any progress from the emergency slot owner.
> 
> On my TODO list, I have a bullet about integrating with sys_futex to
> block on wait, wake up on slot release. I would then use the wait/wakeup
> code based on sys_futex already present in liburcu.

Oh, I see. I think if it is for userspace then it really should use 
wait/wakeup as you said.

Best wishes,
   jonas
diff mbox series

Patch

diff --git a/include/urcu/hpref.h b/include/urcu/hpref.h
new file mode 100644
index 00000000..300f7d4e
--- /dev/null
+++ b/include/urcu/hpref.h
@@ -0,0 +1,222 @@ 
+// SPDX-FileCopyrightText: 2024 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+//
+// SPDX-License-Identifier: LGPL-2.1-or-later
+
+#ifndef _URCU_HPREF_H
+#define _URCU_HPREF_H
+
+/*
+ * HPREF: Hazard pointers with reference counters
+ *
+ * This API combines hazard pointers and reference counters.
+ * It uses hazard pointers as fast-paths, and fall-back to reference
+ * counters either explicitly when the reader expects to hold the object
+ * for a long time, or when no hazard pointer slots are available.
+ *
+ * This leverages the fact that both synchronization mechanisms aim to
+ * guarantee existence of objects, and those existence guarantees can be
+ * chained. Each mechanism achieves its purpose in a different way with
+ * different tradeoffs. The hazard pointers are faster to read and scale
+ * better than reference counters, but they consume more memory than a
+ * per-object reference counter.
+ *
+ * The fall-back to reference counter allows bounding the number of
+ * hazard pointer slots to a fixed size for the entire system:
+ * nr_cpus * N, where N=8 as it fills a single 64 bytes cache line on
+ * 64-bit architectures.
+ *
+ * References:
+ *
+ * [1]: M. M. Michael, "Hazard pointers: safe memory reclamation for
+ *      lock-free objects," in IEEE Transactions on Parallel and
+ *      Distributed Systems, vol. 15, no. 6, pp. 491-504, June 2004
+ */
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <rseq/mempool.h>	/* Per-CPU memory */
+#include <rseq/rseq.h>
+
+#include <urcu/ref.h>
+#include <urcu/uatomic.h>
+#include <urcu/compiler.h>
+
+struct hpref_node {
+	struct urcu_ref refcount;
+	void (*release)(struct hpref_node *node);
+};
+
+struct hpref_slot {
+	/* Use rseq to set from reader only if zero. */
+	struct hpref_node *node;
+};
+
+#define NR_PERCPU_SLOTS_BITS	3
+#define HPREF_NR_PERCPU_SLOTS	(1U << NR_PERCPU_SLOTS_BITS)
+/*
+ * The emergency slot is only used for short critical sections
+ * (would be preempt off in when porting this code to the kernel): only
+ * to ensure we have a free slot for taking a reference count as
+ * fallback.
+ */
+#define HPREF_EMERGENCY_SLOT	(HPREF_NR_PERCPU_SLOTS - 1)
+
+struct hpref_percpu_slots {
+	struct hpref_slot slots[HPREF_NR_PERCPU_SLOTS];
+	unsigned int current_slot;
+};
+
+enum hpref_type {
+	HPREF_TYPE_HP,
+	HPREF_TYPE_REF,
+};
+
+struct hpref_ctx {
+	struct hpref_slot *slot;
+	struct hpref_node *hp;
+	enum hpref_type type;
+};
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern struct hpref_percpu_slots *hpref_percpu_slots;
+
+void hpref_release(struct urcu_ref *ref);
+
+/*
+ * hpref_synchronize: Wait for any reader possessing a hazard pointer to
+ *                    @node to clear its hazard pointer slot.
+ */
+void hpref_synchronize(struct hpref_node *node);
+
+/*
+ * hpref_synchronize_put: Wait for any reader possessing a hazard
+ *                        pointer to clear its slot and put reference
+ *                        count.
+ */
+void hpref_synchronize_put(struct hpref_node *node);
+
+static inline
+void hpref_node_init(struct hpref_node *node,
+		void (*release)(struct hpref_node *node))
+{
+	urcu_ref_init(&node->refcount);
+	node->release = release;
+}
+
+/*
+ * hpref_promote_hp_to_ref: Promote hazard pointer to reference count.
+ */
+static inline
+void hpref_promote_hp_to_ref(struct hpref_ctx *ctx)
+{
+	if (ctx->type == HPREF_TYPE_REF)
+		return;
+	urcu_ref_get(&ctx->hp->refcount);
+	uatomic_store(&ctx->slot->node, NULL, CMM_RELEASE);
+	ctx->slot = NULL;
+	ctx->type = HPREF_TYPE_REF;
+}
+
+/*
+ * hpref_hp_get: Obtain a reference to a stable object, protected either
+ *               by hazard pointer (fast-path) or using reference
+ *               counter as fall-back.
+ */
+static inline
+bool hpref_hp_get(struct hpref_node **node_p, struct hpref_ctx *ctx)
+{
+	int cpu = rseq_current_cpu_raw();
+	struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
+	struct hpref_slot *slot = &cpu_slots->slots[cpu_slots->current_slot];
+	bool use_refcount = false;
+	struct hpref_node *node, *node2;
+	unsigned int next_slot;
+
+retry:
+	node = uatomic_load(node_p, CMM_RELAXED);
+	if (!node)
+		return false;
+	/* Use rseq to try setting current slot hp. Store B. */
+	if (rseq_load_cbne_store__ptr(RSEQ_MO_RELAXED, RSEQ_PERCPU_CPU_ID,
+				(intptr_t *) &slot->node, (intptr_t) NULL,
+				(intptr_t) node, cpu)) {
+		slot = &cpu_slots->slots[HPREF_EMERGENCY_SLOT];
+		use_refcount = true;
+		/*
+		 * This may busy-wait for another reader using the
+		 * emergency slot to transition to refcount.
+		 */
+		caa_cpu_relax();
+		goto retry;
+	}
+	/* Memory ordering: Store B before Load A. */
+	cmm_smp_mb();
+	node2 = uatomic_load(node_p, CMM_RELAXED);	/* Load A */
+	if (node != node2) {
+		uatomic_store(&slot->node, NULL, CMM_RELAXED);
+		if (!node2)
+			return false;
+		goto retry;
+	}
+	ctx->type = HPREF_TYPE_HP;
+	ctx->hp = node;
+	ctx->slot = slot;
+	if (use_refcount) {
+		hpref_promote_hp_to_ref(ctx);
+		return true;
+	}
+	/*
+	 * Increment current slot (racy increment is OK because it is
+	 * just a position hint). Skip the emergency slot.
+	 */
+	next_slot = uatomic_load(&cpu_slots->current_slot, CMM_RELAXED) + 1;
+	if (next_slot >= HPREF_EMERGENCY_SLOT)
+		next_slot = 0;
+	uatomic_store(&cpu_slots->current_slot, next_slot, CMM_RELAXED);
+	return true;
+}
+
+static inline
+void hpref_put(struct hpref_ctx *ctx)
+{
+	if (ctx->type == HPREF_TYPE_REF) {
+		urcu_ref_put(&ctx->hp->refcount, hpref_release);
+	} else {
+		/* Release HP. */
+		uatomic_store(&ctx->slot->node, NULL, CMM_RELEASE);
+	}
+	ctx->hp = NULL;
+}
+
+/*
+ * hpref_set_pointer: Store pointer @node to @ptr, with RCU publication
+ *                    guarantees.
+ */
+static inline
+void hpref_set_pointer(struct hpref_node **ptr, struct hpref_node *node)
+{
+	if (__builtin_constant_p(node) && node == NULL)
+		uatomic_store(ptr, NULL, CMM_RELAXED);
+	else
+		uatomic_store(ptr, node, CMM_RELEASE);
+}
+
+/*
+ * Return the content of the hpref context hazard pointer field.
+ */
+static inline
+struct hpref_node *hpref_ctx_pointer(struct hpref_ctx *ctx)
+{
+	return ctx->hp;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _URCU_HPREF_H */
diff --git a/src/Makefile.am b/src/Makefile.am
index b555c818..7312c9f7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -19,7 +19,8 @@  RCULFHASH = rculfhash.c rculfhash-mm-order.c rculfhash-mm-chunk.c \
 lib_LTLIBRARIES = liburcu-common.la \
 		liburcu.la liburcu-qsbr.la \
 		liburcu-mb.la liburcu-bp.la \
-		liburcu-memb.la liburcu-cds.la
+		liburcu-memb.la liburcu-cds.la \
+		liburcu-hpref.la
 
 #
 # liburcu-common contains wait-free queues (needed by call_rcu) as well
@@ -50,6 +51,9 @@  liburcu_cds_la_SOURCES = rculfqueue.c rculfstack.c lfstack.c \
 	workqueue.c workqueue.h $(RCULFHASH) $(COMPAT)
 liburcu_cds_la_LIBADD = liburcu-common.la
 
+liburcu_hpref_la_SOURCES = hpref.c
+liburcu_hpref_la_LIBADD = -lrseq
+
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = liburcu-cds.pc liburcu.pc liburcu-bp.pc liburcu-qsbr.pc \
 	liburcu-mb.pc liburcu-memb.pc
diff --git a/src/hpref.c b/src/hpref.c
new file mode 100644
index 00000000..f63530f5
--- /dev/null
+++ b/src/hpref.c
@@ -0,0 +1,78 @@ 
+// SPDX-FileCopyrightText: 2024 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+//
+// SPDX-License-Identifier: LGPL-2.1-or-later
+
+/*
+ * HPREF: Hazard pointers with reference counters
+ */
+
+#define _LGPL_SOURCE
+#include <urcu/hpref.h>
+#include <rseq/mempool.h>	/* Per-CPU memory */
+
+static struct rseq_mempool *mempool;
+struct hpref_percpu_slots *hpref_percpu_slots;
+
+void hpref_release(struct urcu_ref *ref)
+{
+	struct hpref_node *node = caa_container_of(ref, struct hpref_node, refcount);
+
+	node->release(node);
+}
+
+/*
+ * hpref_synchronize: Wait for any reader possessing a hazard pointer to
+ *                    @node to clear its hazard pointer slot.
+ */
+void hpref_synchronize(struct hpref_node *node)
+{
+	int nr_cpus = rseq_get_max_nr_cpus(), cpu;
+
+	/* Memory ordering: Store A before Load B. */
+	cmm_smp_mb();
+	/* Scan all CPUs slots. */
+	for (cpu = 0; cpu < nr_cpus; cpu++) {
+		struct hpref_percpu_slots *cpu_slots = rseq_percpu_ptr(hpref_percpu_slots, cpu);
+		struct hpref_slot *slot;
+		unsigned int i;
+
+		for (i = 0; i < HPREF_NR_PERCPU_SLOTS; i++) {
+			slot = &cpu_slots->slots[i];
+			/* Busy-wait if node is found. */
+			while (uatomic_load(&slot->node, CMM_ACQUIRE) == node)	/* Load B */
+				caa_cpu_relax();
+		}
+	}
+}
+
+/*
+ * hpref_synchronize_put: Wait for any reader possessing a hazard
+ *                        pointer to clear its slot and put reference
+ *                        count.
+ */
+void hpref_synchronize_put(struct hpref_node *node)
+{
+	if (!node)
+		return;
+	hpref_synchronize(node);
+	urcu_ref_put(&node->refcount, hpref_release);
+}
+
+static __attribute__((constructor))
+void hpref_init(void)
+{
+	mempool = rseq_mempool_create("hpref", sizeof(struct hpref_percpu_slots), NULL);
+	if (!mempool)
+		abort();
+	hpref_percpu_slots = rseq_mempool_percpu_zmalloc(mempool);
+	if (!hpref_percpu_slots)
+		abort();
+}
+
+static __attribute__((destructor))
+void hpref_exit(void)
+{
+	rseq_mempool_percpu_free(hpref_percpu_slots);
+	if (rseq_mempool_destroy(mempool))
+		abort();
+}