diff mbox series

[v2,04/12] uprobes: revamp uprobe refcounting and lifetime management

Message ID 20240701223935.3783951-5-andrii@kernel.org (mailing list archive)
State New
Headers show
Series uprobes: add batched register/unregister APIs and per-CPU RW semaphore | expand

Checks

Context Check Description
netdev/tree_selection success Guessing tree name failed - patch did not apply

Commit Message

Andrii Nakryiko July 1, 2024, 10:39 p.m. UTC
Revamp how struct uprobe is refcounted, and thus how its lifetime is
managed.

Right now, there are a few possible "owners" of uprobe refcount:
  - uprobes_tree RB tree assumes one refcount when uprobe is registered
    and added to the lookup tree;
  - while uprobe is triggered and kernel is handling it in the breakpoint
    handler code, temporary refcount bump is done to keep uprobe from
    being freed;
  - if we have uretprobe requested on a given struct uprobe instance, we
    take another refcount to keep uprobe alive until user space code
    returns from the function and triggers return handler.

The uprobe_tree's extra refcount of 1 is problematic and inconvenient.
Because of it, we have extra retry logic in uprobe_register(), and we
have an extra logic in __uprobe_unregister(), which checks that uprobe
has no more consumers, and if that's the case, it removes struct uprobe
from uprobes_tree (through delete_uprobe(), which takes writer lock on
uprobes_tree), decrementing refcount after that. The latter is the
source of unfortunate race with uprobe_register, necessitating retries.

All of the above is a complication that makes adding batched uprobe
registration/unregistration APIs hard, and generally makes following the
logic harder.

This patch changes refcounting scheme in such a way as to not have
uprobes_tree keeping extra refcount for struct uprobe. Instead,
uprobe_consumer is assuming this extra refcount, which will be dropped
when consumer is unregistered. Other than that, all the active users of
uprobe (entry and return uprobe handling code) keeps exactly the same
refcounting approach.

With the above setup, once uprobe's refcount drops to zero, we need to
make sure that uprobe's "destructor" removes uprobe from uprobes_tree,
of course. This, though, races with uprobe entry handling code in
handle_swbp(), which, though find_active_uprobe()->find_uprobe() lookup
can race with uprobe being destroyed after refcount drops to zero (e.g.,
due to uprobe_consumer unregistering). This is because
find_active_uprobe() bumps refcount without knowing for sure that
uprobe's refcount is already positive (and it has to be this way, there
is no way around that setup).

One, attempted initially, way to solve this is through using
atomic_inc_not_zero() approach, turning get_uprobe() into
try_get_uprobe(), which can fail to bump refcount if uprobe is already
destined to be destroyed. This, unfortunately, turns out to be a rather
expensive due to underlying cmpxchg() operation in
atomic_inc_not_zero() and scales rather poorly with increased amount of
parallel threads triggering uprobes.

So, we devise a refcounting scheme that doesn't require cmpxchg(),
instead relying only on atomic additions, which scale better and are
faster. While the solution has a bit of a trick to it, all the logic is
nicely compartmentalized in __get_uprobe() and put_uprobe() helpers and
doesn't leak outside of those low-level helpers.

We, effectively, structure uprobe's destruction (i.e., put_uprobe() logic)
in such a way that we support "resurrecting" uprobe by bumping its
refcount from zero back to one, and pretending like it never dropped to
zero in the first place. This is done in a race-free way under
exclusive writer uprobes_treelock. Crucially, we take lock only once
refcount drops to zero. If we had to take lock before decrementing
refcount, the approach would be prohibitively expensive.

Anyways, under exclusive writer lock, we double-check that refcount
didn't change and is still zero. If it is, we proceed with destruction,
because at that point we have a guarantee that find_active_uprobe()
can't successfully look up this uprobe instance, as it's going to be
removed in destructor under writer lock. If, on the other hand,
find_active_uprobe() managed to bump refcount from zero to one in
between put_uprobe()'s atomic_dec_and_test(&uprobe->ref) and
write_lock(&uprobes_treelock), we'll deterministically detect this with
extra atomic_read(&uprobe->ref) check, and if it doesn't hold, we
pretend like atomic_dec_and_test() never returned true. There is no
resource freeing or any other irreversible action taken up till this
point, so we just exit early.

One tricky part in the above is actually two CPUs racing and dropping
refcnt to zero, and then attempting to free resources. This can happen
as follows:
  - CPU #0 drops refcnt from 1 to 0, and proceeds to grab uprobes_treelock;
  - before CPU #0 grabs a lock, CPU #1 updates refcnt as 0 -> 1 -> 0, at
    which point it decides that it needs to free uprobe as well.

At this point both CPU #0 and CPU #1 will believe they need to destroy
uprobe, which is obviously wrong. To prevent this situations, we augment
refcount with epoch counter, which is always incremented by 1 on either
get or put operation. This allows those two CPUs above to disambiguate
who should actually free uprobe (it's the CPU #1, because it has
up-to-date epoch). See comments in the code and note the specific values
of UPROBE_REFCNT_GET and UPROBE_REFCNT_PUT constants. Keep in mind that
a single atomi64_t is actually a two sort-of-independent 32-bit counters
that are incremented/decremented with a single atomic_add_and_return()
operation. Note also a small and extremely rare (and thus having no
effect on performance) need to clear the highest bit every 2 billion
get/put operations to prevent high 32-bit counter from "bleeding over"
into lower 32-bit counter.

Another aspect with this race is the winning CPU might, at least
theoretically, be so quick that it will free uprobe memory before losing
CPU gets a chance to discover that it lost. To prevent this, we
protected and delay uprobe lifetime with RCU. We can't use
rcu_read_lock() + rcu_read_unlock(), because we need to take locks
inside the RCU critical section. Luckily, we have RCU Tasks Trace
flavor, which supports locking and sleeping. It is already used by BPF
subsystem for sleepable BPF programs (including sleepable BPF uprobe
programs), and is optimized for reader-dominated workflows. It fits
perfectly and doesn't seem to introduce any significant slowdowns in
uprobe hot path.

All the above contained trickery aside, we end up with a nice semantics
for get and put operations, where get always succeeds and put handles
all the races properly and transparently to the caller.

And just to justify this a bit unorthodox refcounting approach, under
uprobe triggering micro-benchmark (using BPF selftests' bench tool) with
8 triggering threads, atomic_inc_not_zero() approach was producing about
3.3 millions/sec total uprobe triggerings across all threads. While the
final atomic_add_and_return()-based approach managed to get 3.6 millions/sec
throughput under the same 8 competing threads.

Furthermore, CPU profiling showed the following overall CPU usage:
  - try_get_uprobe (19.3%) + put_uprobe (8.2%) = 27.5% CPU usage for
    atomic_inc_not_zero approach;
  - __get_uprobe (12.3%) + put_uprobe (9.9%) = 22.2% CPU usage for
    atomic_add_and_return approach implemented by this patch.

So, CPU is spending relatively more CPU time in get/put operations while
delivering less total throughput if using atomic_inc_not_zero(). And
this will be even more prominent once we optimize away uprobe->register_rwsem
in the subsequent patch sets. So while slightly less straightforward,
current approach seems to be clearly winning and justified.

We also rename get_uprobe() to __get_uprobe() to indicate it's
a delicate internal helper that is only safe to call under valid
circumstances:
  - while holding uprobes_treelock (to synchronize with exclusive write
    lock in put_uprobe(), as described above);
  - or if we have a guarantee that uprobe's refcount is already positive
    through caller holding at least one refcount (in this case there is
    no risk of refcount dropping to zero by any other CPU).

We also document why it's safe to do unconditional __get_uprobe() at all
call sites, to make it clear that we maintain the above invariants.

Note also, we now don't have a race between registration and
unregistration, so we remove the retry logic completely.

Signed-off-by: Andrii Nakryiko <andrii@kernel.org>
---
 kernel/events/uprobes.c | 260 ++++++++++++++++++++++++++++++----------
 1 file changed, 195 insertions(+), 65 deletions(-)

Comments

Peter Zijlstra July 2, 2024, 10:22 a.m. UTC | #1
On Mon, Jul 01, 2024 at 03:39:27PM -0700, Andrii Nakryiko wrote:

> diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c
> index 23449a8c5e7e..560cf1ca512a 100644
> --- a/kernel/events/uprobes.c
> +++ b/kernel/events/uprobes.c
> @@ -53,9 +53,10 @@ DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem);
>  
>  struct uprobe {
>  	struct rb_node		rb_node;	/* node in the rb tree */
> -	refcount_t		ref;
> +	atomic64_t		ref;		/* see UPROBE_REFCNT_GET below */
>  	struct rw_semaphore	register_rwsem;
>  	struct rw_semaphore	consumer_rwsem;
> +	struct rcu_head		rcu;
>  	struct list_head	pending_list;
>  	struct uprobe_consumer	*consumers;
>  	struct inode		*inode;		/* Also hold a ref to inode */
> @@ -587,15 +588,138 @@ set_orig_insn(struct arch_uprobe *auprobe, struct mm_struct *mm, unsigned long v
>  			*(uprobe_opcode_t *)&auprobe->insn);
>  }
>  
> -static struct uprobe *get_uprobe(struct uprobe *uprobe)
> +/*
> + * Uprobe's 64-bit refcount is actually two independent counters co-located in
> + * a single u64 value:
> + *   - lower 32 bits are just a normal refcount with is increment and
> + *   decremented on get and put, respectively, just like normal refcount
> + *   would;
> + *   - upper 32 bits are a tag (or epoch, if you will), which is always
> + *   incremented by one, no matter whether get or put operation is done.
> + *
> + * This upper counter is meant to distinguish between:
> + *   - one CPU dropping refcnt from 1 -> 0 and proceeding with "destruction",
> + *   - while another CPU continuing further meanwhile with 0 -> 1 -> 0 refcnt
> + *   sequence, also proceeding to "destruction".
> + *
> + * In both cases refcount drops to zero, but in one case it will have epoch N,
> + * while the second drop to zero will have a different epoch N + 2, allowing
> + * first destructor to bail out because epoch changed between refcount going
> + * to zero and put_uprobe() taking uprobes_treelock (under which overall
> + * 64-bit refcount is double-checked, see put_uprobe() for details).
> + *
> + * Lower 32-bit counter is not meant to over overflow, while it's expected

So refcount_t very explicitly handles both overflow and underflow and
screams bloody murder if they happen. Your thing does not.. 

> + * that upper 32-bit counter will overflow occasionally. Note, though, that we
> + * can't allow upper 32-bit counter to "bleed over" into lower 32-bit counter,
> + * so whenever epoch counter gets highest bit set to 1, __get_uprobe() and
> + * put_uprobe() will attempt to clear upper bit with cmpxchg(). This makes
> + * epoch effectively a 31-bit counter with highest bit used as a flag to
> + * perform a fix-up. This ensures epoch and refcnt parts do not "interfere".
> + *
> + * UPROBE_REFCNT_GET constant is chosen such that it will *increment both*
> + * epoch and refcnt parts atomically with one atomic_add().
> + * UPROBE_REFCNT_PUT is chosen such that it will *decrement* refcnt part and
> + * *increment* epoch part.
> + */
> +#define UPROBE_REFCNT_GET ((1LL << 32) + 1LL) /* 0x0000000100000001LL */
> +#define UPROBE_REFCNT_PUT ((1LL << 32) - 1LL) /* 0x00000000ffffffffLL */
> +
> +/*
> + * Caller has to make sure that:
> + *   a) either uprobe's refcnt is positive before this call;
> + *   b) or uprobes_treelock is held (doesn't matter if for read or write),
> + *      preventing uprobe's destructor from removing it from uprobes_tree.
> + *
> + * In the latter case, uprobe's destructor will "resurrect" uprobe instance if
> + * it detects that its refcount went back to being positive again inbetween it
> + * dropping to zero at some point and (potentially delayed) destructor
> + * callback actually running.
> + */
> +static struct uprobe *__get_uprobe(struct uprobe *uprobe)
>  {
> -	refcount_inc(&uprobe->ref);
> +	s64 v;
> +
> +	v = atomic64_add_return(UPROBE_REFCNT_GET, &uprobe->ref);

Distinct lack of u32 overflow testing here..

> +
> +	/*
> +	 * If the highest bit is set, we need to clear it. If cmpxchg() fails,
> +	 * we don't retry because there is another CPU that just managed to
> +	 * update refcnt and will attempt the same "fix up". Eventually one of
> +	 * them will succeed to clear highset bit.
> +	 */
> +	if (unlikely(v < 0))
> +		(void)atomic64_cmpxchg(&uprobe->ref, v, v & ~(1ULL << 63));
> +
>  	return uprobe;
>  }

>  static void put_uprobe(struct uprobe *uprobe)
>  {
> -	if (refcount_dec_and_test(&uprobe->ref)) {
> +	s64 v;
> +
> +	/*
> +	 * here uprobe instance is guaranteed to be alive, so we use Tasks
> +	 * Trace RCU to guarantee that uprobe won't be freed from under us, if

What's wrong with normal RCU?

> +	 * we end up being a losing "destructor" inside uprobe_treelock'ed
> +	 * section double-checking uprobe->ref value below.
> +	 * Note call_rcu_tasks_trace() + uprobe_free_rcu below.
> +	 */
> +	rcu_read_lock_trace();
> +
> +	v = atomic64_add_return(UPROBE_REFCNT_PUT, &uprobe->ref);

No underflow handling... because nobody ever had a double put bug.

> +	if (unlikely((u32)v == 0)) {
> +		bool destroy;
> +
> +		write_lock(&uprobes_treelock);
> +		/*
> +		 * We might race with find_uprobe()->__get_uprobe() executed
> +		 * from inside read-locked uprobes_treelock, which can bump
> +		 * refcount from zero back to one, after we got here. Even
> +		 * worse, it's possible for another CPU to do 0 -> 1 -> 0
> +		 * transition between this CPU doing atomic_add() and taking
> +		 * uprobes_treelock. In either case this CPU should bail out
> +		 * and not proceed with destruction.
> +		 *
> +		 * So now that we have exclusive write lock, we double check
> +		 * the total 64-bit refcount value, which includes the epoch.
> +		 * If nothing changed (i.e., epoch is the same and refcnt is
> +		 * still zero), we are good and we proceed with the clean up.
> +		 *
> +		 * But if it managed to be updated back at least once, we just
> +		 * pretend it never went to zero. If lower 32-bit refcnt part
> +		 * drops to zero again, another CPU will proceed with
> +		 * destruction, due to more up to date epoch.
> +		 */
> +		destroy = atomic64_read(&uprobe->ref) == v;
> +		if (destroy && uprobe_is_active(uprobe))
> +			rb_erase(&uprobe->rb_node, &uprobes_tree);
> +		write_unlock(&uprobes_treelock);
> +
> +		/*
> +		 * Beyond here we don't need RCU protection, we are either the
> +		 * winning destructor and we control the rest of uprobe's
> +		 * lifetime; or we lost and we are bailing without accessing
> +		 * uprobe fields anymore.
> +		 */
> +		rcu_read_unlock_trace();
> +
> +		/* uprobe got resurrected, pretend we never tried to free it */
> +		if (!destroy)
> +			return;
> +
>  		/*
>  		 * If application munmap(exec_vma) before uprobe_unregister()
>  		 * gets called, we don't get a chance to remove uprobe from
> @@ -604,8 +728,21 @@ static void put_uprobe(struct uprobe *uprobe)
>  		mutex_lock(&delayed_uprobe_lock);
>  		delayed_uprobe_remove(uprobe, NULL);
>  		mutex_unlock(&delayed_uprobe_lock);
> -		kfree(uprobe);
> +
> +		call_rcu_tasks_trace(&uprobe->rcu, uprobe_free_rcu);
> +		return;
>  	}
> +
> +	/*
> +	 * If the highest bit is set, we need to clear it. If cmpxchg() fails,
> +	 * we don't retry because there is another CPU that just managed to
> +	 * update refcnt and will attempt the same "fix up". Eventually one of
> +	 * them will succeed to clear highset bit.
> +	 */
> +	if (unlikely(v < 0))
> +		(void)atomic64_cmpxchg(&uprobe->ref, v, v & ~(1ULL << 63));
> +
> +	rcu_read_unlock_trace();
>  }
Andrii Nakryiko July 2, 2024, 5:54 p.m. UTC | #2
On Tue, Jul 2, 2024 at 3:23 AM Peter Zijlstra <peterz@infradead.org> wrote:
>
> On Mon, Jul 01, 2024 at 03:39:27PM -0700, Andrii Nakryiko wrote:
>
> > diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c
> > index 23449a8c5e7e..560cf1ca512a 100644
> > --- a/kernel/events/uprobes.c
> > +++ b/kernel/events/uprobes.c
> > @@ -53,9 +53,10 @@ DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem);
> >
> >  struct uprobe {
> >       struct rb_node          rb_node;        /* node in the rb tree */
> > -     refcount_t              ref;
> > +     atomic64_t              ref;            /* see UPROBE_REFCNT_GET below */
> >       struct rw_semaphore     register_rwsem;
> >       struct rw_semaphore     consumer_rwsem;
> > +     struct rcu_head         rcu;
> >       struct list_head        pending_list;
> >       struct uprobe_consumer  *consumers;
> >       struct inode            *inode;         /* Also hold a ref to inode */
> > @@ -587,15 +588,138 @@ set_orig_insn(struct arch_uprobe *auprobe, struct mm_struct *mm, unsigned long v
> >                       *(uprobe_opcode_t *)&auprobe->insn);
> >  }
> >
> > -static struct uprobe *get_uprobe(struct uprobe *uprobe)
> > +/*
> > + * Uprobe's 64-bit refcount is actually two independent counters co-located in
> > + * a single u64 value:
> > + *   - lower 32 bits are just a normal refcount with is increment and
> > + *   decremented on get and put, respectively, just like normal refcount
> > + *   would;
> > + *   - upper 32 bits are a tag (or epoch, if you will), which is always
> > + *   incremented by one, no matter whether get or put operation is done.
> > + *
> > + * This upper counter is meant to distinguish between:
> > + *   - one CPU dropping refcnt from 1 -> 0 and proceeding with "destruction",
> > + *   - while another CPU continuing further meanwhile with 0 -> 1 -> 0 refcnt
> > + *   sequence, also proceeding to "destruction".
> > + *
> > + * In both cases refcount drops to zero, but in one case it will have epoch N,
> > + * while the second drop to zero will have a different epoch N + 2, allowing
> > + * first destructor to bail out because epoch changed between refcount going
> > + * to zero and put_uprobe() taking uprobes_treelock (under which overall
> > + * 64-bit refcount is double-checked, see put_uprobe() for details).
> > + *
> > + * Lower 32-bit counter is not meant to over overflow, while it's expected
>
> So refcount_t very explicitly handles both overflow and underflow and
> screams bloody murder if they happen. Your thing does not..
>

Correct, because I considered that to be practically impossible to
overflow this refcount. The main source of refcounts are uretprobes
that are holding uprobe references. We limit the depth of supported
recursion to 64, so you'd need 30+ millions of threads all hitting the
same uprobe/uretprobe to overflow this. I guess in theory it could
happen (not sure if we have some limits on total number of threads in
the system and if they can be bumped to over 30mln), but it just
seemed out of realm of practical possibility.

Having said that, I can add similar checks that refcount_t does in
refcount_add and do what refcount_warn_saturate does as well.

> > + * that upper 32-bit counter will overflow occasionally. Note, though, that we
> > + * can't allow upper 32-bit counter to "bleed over" into lower 32-bit counter,
> > + * so whenever epoch counter gets highest bit set to 1, __get_uprobe() and
> > + * put_uprobe() will attempt to clear upper bit with cmpxchg(). This makes
> > + * epoch effectively a 31-bit counter with highest bit used as a flag to
> > + * perform a fix-up. This ensures epoch and refcnt parts do not "interfere".
> > + *
> > + * UPROBE_REFCNT_GET constant is chosen such that it will *increment both*
> > + * epoch and refcnt parts atomically with one atomic_add().
> > + * UPROBE_REFCNT_PUT is chosen such that it will *decrement* refcnt part and
> > + * *increment* epoch part.
> > + */
> > +#define UPROBE_REFCNT_GET ((1LL << 32) + 1LL) /* 0x0000000100000001LL */
> > +#define UPROBE_REFCNT_PUT ((1LL << 32) - 1LL) /* 0x00000000ffffffffLL */
> > +
> > +/*
> > + * Caller has to make sure that:
> > + *   a) either uprobe's refcnt is positive before this call;
> > + *   b) or uprobes_treelock is held (doesn't matter if for read or write),
> > + *      preventing uprobe's destructor from removing it from uprobes_tree.
> > + *
> > + * In the latter case, uprobe's destructor will "resurrect" uprobe instance if
> > + * it detects that its refcount went back to being positive again inbetween it
> > + * dropping to zero at some point and (potentially delayed) destructor
> > + * callback actually running.
> > + */
> > +static struct uprobe *__get_uprobe(struct uprobe *uprobe)
> >  {
> > -     refcount_inc(&uprobe->ref);
> > +     s64 v;
> > +
> > +     v = atomic64_add_return(UPROBE_REFCNT_GET, &uprobe->ref);
>
> Distinct lack of u32 overflow testing here..
>
> > +
> > +     /*
> > +      * If the highest bit is set, we need to clear it. If cmpxchg() fails,
> > +      * we don't retry because there is another CPU that just managed to
> > +      * update refcnt and will attempt the same "fix up". Eventually one of
> > +      * them will succeed to clear highset bit.
> > +      */
> > +     if (unlikely(v < 0))
> > +             (void)atomic64_cmpxchg(&uprobe->ref, v, v & ~(1ULL << 63));
> > +
> >       return uprobe;
> >  }
>
> >  static void put_uprobe(struct uprobe *uprobe)
> >  {
> > -     if (refcount_dec_and_test(&uprobe->ref)) {
> > +     s64 v;
> > +
> > +     /*
> > +      * here uprobe instance is guaranteed to be alive, so we use Tasks
> > +      * Trace RCU to guarantee that uprobe won't be freed from under us, if
>
> What's wrong with normal RCU?
>

will reply in another thread to keep things focused

> > +      * we end up being a losing "destructor" inside uprobe_treelock'ed
> > +      * section double-checking uprobe->ref value below.
> > +      * Note call_rcu_tasks_trace() + uprobe_free_rcu below.
> > +      */
> > +     rcu_read_lock_trace();
> > +
> > +     v = atomic64_add_return(UPROBE_REFCNT_PUT, &uprobe->ref);
>
> No underflow handling... because nobody ever had a double put bug.
>

ack, see above, will add checks and special saturation value.

[...]
Peter Zijlstra July 3, 2024, 1:36 p.m. UTC | #3
On Mon, Jul 01, 2024 at 03:39:27PM -0700, Andrii Nakryiko wrote:

> One, attempted initially, way to solve this is through using
> atomic_inc_not_zero() approach, turning get_uprobe() into
> try_get_uprobe(),

This is the canonical thing to do. Everybody does this.

> which can fail to bump refcount if uprobe is already
> destined to be destroyed. This, unfortunately, turns out to be a rather
> expensive due to underlying cmpxchg() operation in
> atomic_inc_not_zero() and scales rather poorly with increased amount of
> parallel threads triggering uprobes.

Different archs different trade-offs. You'll not see this on LL/SC archs
for example.

> Furthermore, CPU profiling showed the following overall CPU usage:
>   - try_get_uprobe (19.3%) + put_uprobe (8.2%) = 27.5% CPU usage for
>     atomic_inc_not_zero approach;
>   - __get_uprobe (12.3%) + put_uprobe (9.9%) = 22.2% CPU usage for
>     atomic_add_and_return approach implemented by this patch.

I think those numbers suggest trying to not have a refcount in the first
place. Both are pretty terrible, yes one is less terrible than the
other, but still terrible.

Specifically, I'm thinking it is the refcounting in handlw_swbp() that
is actually the problem, all the other stuff is noise. 

So if you have SRCU protected consumers, what is the reason for still
having a refcount in handlw_swbp() ? Simply have the whole of it inside
a single SRCU critical section, then all consumers you find get a hit.

Hmm, return probes are a pain, they require the uprobe to stay extant
between handle_swbp() and handle_trampoline(). I'm thinking we can do
that with SRCU as well.

When I cobble all that together (it really shouldn't be one patch, but
you get the idea I hope) it looks a little something like the below.

I *think* it should work, but perhaps I've missed something?

TL;DR replace treelock with seqcount+SRCU
      replace register_rwsem with SRCU
      replace handle_swbp() refcount with SRCU
      replace return_instance refcount with a second SRCU

Paul, I had to do something vile with SRCU. The basic problem is that we
want to keep a SRCU critical section across fork(), which leads to both
parent and child doing srcu_read_unlock(&srcu, idx). As such, I need an
extra increment on the @idx ssp counter to even things out, see
__srcu_read_clone_lock().

---
 include/linux/rbtree.h  |  45 +++++++++++++
 include/linux/srcu.h    |   2 +
 include/linux/uprobes.h |   2 +
 kernel/events/uprobes.c | 166 +++++++++++++++++++++++++++++++-----------------
 kernel/rcu/srcutree.c   |   5 ++
 5 files changed, 161 insertions(+), 59 deletions(-)

diff --git a/include/linux/rbtree.h b/include/linux/rbtree.h
index f7edca369eda..9847fa58a287 100644
--- a/include/linux/rbtree.h
+++ b/include/linux/rbtree.h
@@ -244,6 +244,31 @@ rb_find_add(struct rb_node *node, struct rb_root *tree,
 	return NULL;
 }
 
+static __always_inline struct rb_node *
+rb_find_add_rcu(struct rb_node *node, struct rb_root *tree,
+		int (*cmp)(struct rb_node *, const struct rb_node *))
+{
+	struct rb_node **link = &tree->rb_node;
+	struct rb_node *parent = NULL;
+	int c;
+
+	while (*link) {
+		parent = *link;
+		c = cmp(node, parent);
+
+		if (c < 0)
+			link = &parent->rb_left;
+		else if (c > 0)
+			link = &parent->rb_right;
+		else
+			return parent;
+	}
+
+	rb_link_node_rcu(node, parent, link);
+	rb_insert_color(node, tree);
+	return NULL;
+}
+
 /**
  * rb_find() - find @key in tree @tree
  * @key: key to match
@@ -272,6 +297,26 @@ rb_find(const void *key, const struct rb_root *tree,
 	return NULL;
 }
 
+static __always_inline struct rb_node *
+rb_find_rcu(const void *key, const struct rb_root *tree,
+	    int (*cmp)(const void *key, const struct rb_node *))
+{
+	struct rb_node *node = tree->rb_node;
+
+	while (node) {
+		int c = cmp(key, node);
+
+		if (c < 0)
+			node = rcu_dereference_raw(node->rb_left);
+		else if (c > 0)
+			node = rcu_dereference_raw(node->rb_right);
+		else
+			return node;
+	}
+
+	return NULL;
+}
+
 /**
  * rb_find_first() - find the first @key in @tree
  * @key: key to match
diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index 236610e4a8fa..9b14acecbb9d 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -55,7 +55,9 @@ void call_srcu(struct srcu_struct *ssp, struct rcu_head *head,
 		void (*func)(struct rcu_head *head));
 void cleanup_srcu_struct(struct srcu_struct *ssp);
 int __srcu_read_lock(struct srcu_struct *ssp) __acquires(ssp);
+void __srcu_read_clone_lock(struct srcu_struct *ssp, int idx);
 void __srcu_read_unlock(struct srcu_struct *ssp, int idx) __releases(ssp);
+
 void synchronize_srcu(struct srcu_struct *ssp);
 unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp);
 unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp);
diff --git a/include/linux/uprobes.h b/include/linux/uprobes.h
index f46e0ca0169c..354cab634341 100644
--- a/include/linux/uprobes.h
+++ b/include/linux/uprobes.h
@@ -78,6 +78,7 @@ struct uprobe_task {
 
 	struct return_instance		*return_instances;
 	unsigned int			depth;
+	unsigned int			active_srcu_idx;
 };
 
 struct return_instance {
@@ -86,6 +87,7 @@ struct return_instance {
 	unsigned long		stack;		/* stack pointer */
 	unsigned long		orig_ret_vaddr; /* original return address */
 	bool			chained;	/* true, if instance is nested */
+	int			srcu_idx;
 
 	struct return_instance	*next;		/* keep as stack */
 };
diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c
index 2c83ba776fc7..0b7574a54093 100644
--- a/kernel/events/uprobes.c
+++ b/kernel/events/uprobes.c
@@ -26,6 +26,7 @@
 #include <linux/task_work.h>
 #include <linux/shmem_fs.h>
 #include <linux/khugepaged.h>
+#include <linux/srcu.h>
 
 #include <linux/uprobes.h>
 
@@ -40,6 +41,17 @@ static struct rb_root uprobes_tree = RB_ROOT;
 #define no_uprobe_events()	RB_EMPTY_ROOT(&uprobes_tree)
 
 static DEFINE_RWLOCK(uprobes_treelock);	/* serialize rbtree access */
+static seqcount_rwlock_t uprobes_seqcount = SEQCNT_RWLOCK_ZERO(uprobes_seqcount, &uprobes_treelock);
+
+/*
+ * Used for both the uprobes_tree and the uprobe->consumer list.
+ */
+DEFINE_STATIC_SRCU(uprobe_srcu);
+/*
+ * Used for return_instance and single-step uprobe lifetime. Separate from
+ * uprobe_srcu in order to minimize the synchronize_srcu() cost at unregister.
+ */
+DEFINE_STATIC_SRCU(uretprobe_srcu);
 
 #define UPROBES_HASH_SZ	13
 /* serialize uprobe->pending_list */
@@ -54,7 +66,8 @@ DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem);
 struct uprobe {
 	struct rb_node		rb_node;	/* node in the rb tree */
 	refcount_t		ref;
-	struct rw_semaphore	register_rwsem;
+	struct rcu_head		rcu;
+	struct mutex		register_mutex;
 	struct rw_semaphore	consumer_rwsem;
 	struct list_head	pending_list;
 	struct uprobe_consumer	*consumers;
@@ -67,7 +80,7 @@ struct uprobe {
 	 * The generic code assumes that it has two members of unknown type
 	 * owned by the arch-specific code:
 	 *
-	 * 	insn -	copy_insn() saves the original instruction here for
+	 *	insn -	copy_insn() saves the original instruction here for
 	 *		arch_uprobe_analyze_insn().
 	 *
 	 *	ixol -	potentially modified instruction to execute out of
@@ -205,7 +218,7 @@ static int __replace_page(struct vm_area_struct *vma, unsigned long addr,
 	folio_put(old_folio);
 
 	err = 0;
- unlock:
+unlock:
 	mmu_notifier_invalidate_range_end(&range);
 	folio_unlock(old_folio);
 	return err;
@@ -593,6 +606,22 @@ static struct uprobe *get_uprobe(struct uprobe *uprobe)
 	return uprobe;
 }
 
+static void uprobe_free_stage2(struct rcu_head *rcu)
+{
+	struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu);
+	kfree(uprobe);
+}
+
+static void uprobe_free_stage1(struct rcu_head *rcu)
+{
+	struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu);
+	/*
+	 * At this point all the consumers are complete and gone, but retprobe
+	 * and single-step might still reference the uprobe itself.
+	 */
+	call_srcu(&uretprobe_srcu, &uprobe->rcu, uprobe_free_stage2);
+}
+
 static void put_uprobe(struct uprobe *uprobe)
 {
 	if (refcount_dec_and_test(&uprobe->ref)) {
@@ -604,7 +633,8 @@ static void put_uprobe(struct uprobe *uprobe)
 		mutex_lock(&delayed_uprobe_lock);
 		delayed_uprobe_remove(uprobe, NULL);
 		mutex_unlock(&delayed_uprobe_lock);
-		kfree(uprobe);
+
+		call_srcu(&uprobe_srcu, &uprobe->rcu, uprobe_free_stage1);
 	}
 }
 
@@ -653,10 +683,10 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset)
 		.inode = inode,
 		.offset = offset,
 	};
-	struct rb_node *node = rb_find(&key, &uprobes_tree, __uprobe_cmp_key);
+	struct rb_node *node = rb_find_rcu(&key, &uprobes_tree, __uprobe_cmp_key);
 
 	if (node)
-		return get_uprobe(__node_2_uprobe(node));
+		return __node_2_uprobe(node);
 
 	return NULL;
 }
@@ -667,20 +697,32 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset)
  */
 static struct uprobe *find_uprobe(struct inode *inode, loff_t offset)
 {
-	struct uprobe *uprobe;
+	unsigned seq;
 
-	read_lock(&uprobes_treelock);
-	uprobe = __find_uprobe(inode, offset);
-	read_unlock(&uprobes_treelock);
+	lockdep_assert(srcu_read_lock_held(&uprobe_srcu));
 
-	return uprobe;
+	do {
+		seq = read_seqcount_begin(&uprobes_seqcount);
+		struct uprobe *uprobe = __find_uprobe(inode, offset);
+		if (uprobe) {
+			/*
+			 * Lockless RB-tree lookups are prone to false-negatives.
+			 * If they find something, it's good. If they do not find,
+			 * it needs to be validated.
+			 */
+			return uprobe;
+		}
+	} while (read_seqcount_retry(&uprobes_seqcount, seq));
+
+	/* Really didn't find anything. */
+	return NULL;
 }
 
 static struct uprobe *__insert_uprobe(struct uprobe *uprobe)
 {
 	struct rb_node *node;
 
-	node = rb_find_add(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp);
+	node = rb_find_add_rcu(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp);
 	if (node)
 		return get_uprobe(__node_2_uprobe(node));
 
@@ -702,7 +744,9 @@ static struct uprobe *insert_uprobe(struct uprobe *uprobe)
 	struct uprobe *u;
 
 	write_lock(&uprobes_treelock);
+	write_seqcount_begin(&uprobes_seqcount);
 	u = __insert_uprobe(uprobe);
+	write_seqcount_end(&uprobes_seqcount);
 	write_unlock(&uprobes_treelock);
 
 	return u;
@@ -730,7 +774,7 @@ static struct uprobe *alloc_uprobe(struct inode *inode, loff_t offset,
 	uprobe->inode = inode;
 	uprobe->offset = offset;
 	uprobe->ref_ctr_offset = ref_ctr_offset;
-	init_rwsem(&uprobe->register_rwsem);
+	mutex_init(&uprobe->register_mutex);
 	init_rwsem(&uprobe->consumer_rwsem);
 
 	/* add to uprobes_tree, sorted on inode:offset */
@@ -754,7 +798,7 @@ static void consumer_add(struct uprobe *uprobe, struct uprobe_consumer *uc)
 {
 	down_write(&uprobe->consumer_rwsem);
 	uc->next = uprobe->consumers;
-	uprobe->consumers = uc;
+	rcu_assign_pointer(uprobe->consumers, uc);
 	up_write(&uprobe->consumer_rwsem);
 }
 
@@ -771,7 +815,7 @@ static bool consumer_del(struct uprobe *uprobe, struct uprobe_consumer *uc)
 	down_write(&uprobe->consumer_rwsem);
 	for (con = &uprobe->consumers; *con; con = &(*con)->next) {
 		if (*con == uc) {
-			*con = uc->next;
+			rcu_assign_pointer(*con, uc->next);
 			ret = true;
 			break;
 		}
@@ -857,7 +901,7 @@ static int prepare_uprobe(struct uprobe *uprobe, struct file *file,
 	smp_wmb(); /* pairs with the smp_rmb() in handle_swbp() */
 	set_bit(UPROBE_COPY_INSN, &uprobe->flags);
 
- out:
+out:
 	up_write(&uprobe->consumer_rwsem);
 
 	return ret;
@@ -936,7 +980,9 @@ static void delete_uprobe(struct uprobe *uprobe)
 		return;
 
 	write_lock(&uprobes_treelock);
+	write_seqcount_begin(&uprobes_seqcount);
 	rb_erase(&uprobe->rb_node, &uprobes_tree);
+	write_seqcount_end(&uprobes_seqcount);
 	write_unlock(&uprobes_treelock);
 	RB_CLEAR_NODE(&uprobe->rb_node); /* for uprobe_is_active() */
 	put_uprobe(uprobe);
@@ -965,7 +1011,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register)
 	struct map_info *info;
 	int more = 0;
 
- again:
+again:
 	i_mmap_lock_read(mapping);
 	vma_interval_tree_foreach(vma, &mapping->i_mmap, pgoff, pgoff) {
 		if (!valid_vma(vma, is_register))
@@ -1019,7 +1065,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register)
 	} while (--more);
 
 	goto again;
- out:
+out:
 	while (prev)
 		prev = free_map_info(prev);
 	return curr;
@@ -1068,13 +1114,13 @@ register_for_each_vma(struct uprobe *uprobe, struct uprobe_consumer *new)
 				err |= remove_breakpoint(uprobe, mm, info->vaddr);
 		}
 
- unlock:
+unlock:
 		mmap_write_unlock(mm);
- free:
+free:
 		mmput(mm);
 		info = free_map_info(info);
 	}
- out:
+out:
 	percpu_up_write(&dup_mmap_sem);
 	return err;
 }
@@ -1101,16 +1147,17 @@ __uprobe_unregister(struct uprobe *uprobe, struct uprobe_consumer *uc)
  */
 void uprobe_unregister(struct inode *inode, loff_t offset, struct uprobe_consumer *uc)
 {
-	struct uprobe *uprobe;
+	scoped_guard (srcu, &uprobe_srcu) {
+		struct uprobe *uprobe = find_uprobe(inode, offset);
+		if (WARN_ON(!uprobe))
+			return;
 
-	uprobe = find_uprobe(inode, offset);
-	if (WARN_ON(!uprobe))
-		return;
+		mutex_lock(&uprobe->register_mutex);
+		__uprobe_unregister(uprobe, uc);
+		mutex_unlock(&uprobe->register_mutex);
+	}
 
-	down_write(&uprobe->register_rwsem);
-	__uprobe_unregister(uprobe, uc);
-	up_write(&uprobe->register_rwsem);
-	put_uprobe(uprobe);
+	synchronize_srcu(&uprobe_srcu); // XXX amortize / batch
 }
 EXPORT_SYMBOL_GPL(uprobe_unregister);
 
@@ -1159,7 +1206,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
 	if (!IS_ALIGNED(ref_ctr_offset, sizeof(short)))
 		return -EINVAL;
 
- retry:
+retry:
 	uprobe = alloc_uprobe(inode, offset, ref_ctr_offset);
 	if (!uprobe)
 		return -ENOMEM;
@@ -1170,7 +1217,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
 	 * We can race with uprobe_unregister()->delete_uprobe().
 	 * Check uprobe_is_active() and retry if it is false.
 	 */
-	down_write(&uprobe->register_rwsem);
+	mutex_lock(&uprobe->register_mutex);
 	ret = -EAGAIN;
 	if (likely(uprobe_is_active(uprobe))) {
 		consumer_add(uprobe, uc);
@@ -1178,7 +1225,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
 		if (ret)
 			__uprobe_unregister(uprobe, uc);
 	}
-	up_write(&uprobe->register_rwsem);
+	mutex_unlock(&uprobe->register_mutex);
 	put_uprobe(uprobe);
 
 	if (unlikely(ret == -EAGAIN))
@@ -1214,17 +1261,18 @@ int uprobe_apply(struct inode *inode, loff_t offset,
 	struct uprobe_consumer *con;
 	int ret = -ENOENT;
 
+	guard(srcu)(&uprobe_srcu);
+
 	uprobe = find_uprobe(inode, offset);
 	if (WARN_ON(!uprobe))
 		return ret;
 
-	down_write(&uprobe->register_rwsem);
+	mutex_lock(&uprobe->register_mutex);
 	for (con = uprobe->consumers; con && con != uc ; con = con->next)
 		;
 	if (con)
 		ret = register_for_each_vma(uprobe, add ? uc : NULL);
-	up_write(&uprobe->register_rwsem);
-	put_uprobe(uprobe);
+	mutex_unlock(&uprobe->register_mutex);
 
 	return ret;
 }
@@ -1468,7 +1516,7 @@ static int xol_add_vma(struct mm_struct *mm, struct xol_area *area)
 	ret = 0;
 	/* pairs with get_xol_area() */
 	smp_store_release(&mm->uprobes_state.xol_area, area); /* ^^^ */
- fail:
+fail:
 	mmap_write_unlock(mm);
 
 	return ret;
@@ -1512,7 +1560,7 @@ static struct xol_area *__create_xol_area(unsigned long vaddr)
 	kfree(area->bitmap);
  free_area:
 	kfree(area);
- out:
+out:
 	return NULL;
 }
 
@@ -1700,7 +1748,7 @@ unsigned long uprobe_get_trap_addr(struct pt_regs *regs)
 static struct return_instance *free_ret_instance(struct return_instance *ri)
 {
 	struct return_instance *next = ri->next;
-	put_uprobe(ri->uprobe);
+	srcu_read_unlock(&uretprobe_srcu, ri->srcu_idx);
 	kfree(ri);
 	return next;
 }
@@ -1718,7 +1766,7 @@ void uprobe_free_utask(struct task_struct *t)
 		return;
 
 	if (utask->active_uprobe)
-		put_uprobe(utask->active_uprobe);
+		srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx);
 
 	ri = utask->return_instances;
 	while (ri)
@@ -1761,7 +1809,7 @@ static int dup_utask(struct task_struct *t, struct uprobe_task *o_utask)
 			return -ENOMEM;
 
 		*n = *o;
-		get_uprobe(n->uprobe);
+		__srcu_read_clone_lock(&uretprobe_srcu, n->srcu_idx);
 		n->next = NULL;
 
 		*p = n;
@@ -1904,7 +1952,8 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs)
 		orig_ret_vaddr = utask->return_instances->orig_ret_vaddr;
 	}
 
-	ri->uprobe = get_uprobe(uprobe);
+	ri->srcu_idx = srcu_read_lock(&uretprobe_srcu);
+	ri->uprobe = uprobe;
 	ri->func = instruction_pointer(regs);
 	ri->stack = user_stack_pointer(regs);
 	ri->orig_ret_vaddr = orig_ret_vaddr;
@@ -1915,7 +1964,7 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs)
 	utask->return_instances = ri;
 
 	return;
- fail:
+fail:
 	kfree(ri);
 }
 
@@ -1944,6 +1993,7 @@ pre_ssout(struct uprobe *uprobe, struct pt_regs *regs, unsigned long bp_vaddr)
 		return err;
 	}
 
+	utask->active_srcu_idx = srcu_read_lock(&uretprobe_srcu);
 	utask->active_uprobe = uprobe;
 	utask->state = UTASK_SSTEP;
 	return 0;
@@ -2031,7 +2081,7 @@ static int is_trap_at_addr(struct mm_struct *mm, unsigned long vaddr)
 
 	copy_from_page(page, vaddr, &opcode, UPROBE_SWBP_INSN_SIZE);
 	put_page(page);
- out:
+out:
 	/* This needs to return true for any variant of the trap insn */
 	return is_trap_insn(&opcode);
 }
@@ -2071,8 +2121,9 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs)
 	int remove = UPROBE_HANDLER_REMOVE;
 	bool need_prep = false; /* prepare return uprobe, when needed */
 
-	down_read(&uprobe->register_rwsem);
-	for (uc = uprobe->consumers; uc; uc = uc->next) {
+	lockdep_assert(srcu_read_lock_held(&uprobe_srcu));
+
+	for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference(uc->next)) {
 		int rc = 0;
 
 		if (uc->handler) {
@@ -2094,7 +2145,6 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs)
 		WARN_ON(!uprobe_is_active(uprobe));
 		unapply_uprobe(uprobe, current->mm);
 	}
-	up_read(&uprobe->register_rwsem);
 }
 
 static void
@@ -2103,12 +2153,12 @@ handle_uretprobe_chain(struct return_instance *ri, struct pt_regs *regs)
 	struct uprobe *uprobe = ri->uprobe;
 	struct uprobe_consumer *uc;
 
-	down_read(&uprobe->register_rwsem);
-	for (uc = uprobe->consumers; uc; uc = uc->next) {
+	guard(srcu)(&uprobe_srcu);
+
+	for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference_raw(uc->next)) {
 		if (uc->ret_handler)
 			uc->ret_handler(uc, ri->func, regs);
 	}
-	up_read(&uprobe->register_rwsem);
 }
 
 static struct return_instance *find_next_ret_chain(struct return_instance *ri)
@@ -2159,7 +2209,7 @@ static void handle_trampoline(struct pt_regs *regs)
 	utask->return_instances = ri;
 	return;
 
- sigill:
+sigill:
 	uprobe_warn(current, "handle uretprobe, sending SIGILL.");
 	force_sig(SIGILL);
 
@@ -2190,6 +2240,8 @@ static void handle_swbp(struct pt_regs *regs)
 	if (bp_vaddr == get_trampoline_vaddr())
 		return handle_trampoline(regs);
 
+	guard(srcu)(&uprobe_srcu);
+
 	uprobe = find_active_uprobe(bp_vaddr, &is_swbp);
 	if (!uprobe) {
 		if (is_swbp > 0) {
@@ -2218,7 +2270,7 @@ static void handle_swbp(struct pt_regs *regs)
 	 * new and not-yet-analyzed uprobe at the same address, restart.
 	 */
 	if (unlikely(!test_bit(UPROBE_COPY_INSN, &uprobe->flags)))
-		goto out;
+		return;
 
 	/*
 	 * Pairs with the smp_wmb() in prepare_uprobe().
@@ -2231,22 +2283,18 @@ static void handle_swbp(struct pt_regs *regs)
 
 	/* Tracing handlers use ->utask to communicate with fetch methods */
 	if (!get_utask())
-		goto out;
+		return;
 
 	if (arch_uprobe_ignore(&uprobe->arch, regs))
-		goto out;
+		return;
 
 	handler_chain(uprobe, regs);
 
 	if (arch_uprobe_skip_sstep(&uprobe->arch, regs))
-		goto out;
+		return;
 
 	if (!pre_ssout(uprobe, regs, bp_vaddr))
 		return;
-
-	/* arch_uprobe_skip_sstep() succeeded, or restart if can't singlestep */
-out:
-	put_uprobe(uprobe);
 }
 
 /*
@@ -2266,7 +2314,7 @@ static void handle_singlestep(struct uprobe_task *utask, struct pt_regs *regs)
 	else
 		WARN_ON_ONCE(1);
 
-	put_uprobe(uprobe);
+	srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx);
 	utask->active_uprobe = NULL;
 	utask->state = UTASK_RUNNING;
 	xol_free_insn_slot(current);
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index bc4b58b0204e..d8cda9003da4 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -720,6 +720,11 @@ int __srcu_read_lock(struct srcu_struct *ssp)
 }
 EXPORT_SYMBOL_GPL(__srcu_read_lock);
 
+int __srcu_read_clone_lock(struct srcu_struct *ssp, int idx)
+{
+	this_cpu_inc(ssp->sda->srcu_lock_count[idx].counter);
+}
+
 /*
  * Removes the count for the old reader from the appropriate per-CPU
  * element of the srcu_struct.  Note that this may well be a different
Andrii Nakryiko July 3, 2024, 8:47 p.m. UTC | #4
On Wed, Jul 3, 2024 at 6:36 AM Peter Zijlstra <peterz@infradead.org> wrote:
>
> On Mon, Jul 01, 2024 at 03:39:27PM -0700, Andrii Nakryiko wrote:
>
> > One, attempted initially, way to solve this is through using
> > atomic_inc_not_zero() approach, turning get_uprobe() into
> > try_get_uprobe(),
>
> This is the canonical thing to do. Everybody does this.

Sure, and I provided arguments why I don't do it. Can you provide your
counter argument, please? "Everybody does this." is hardly one.

>
> > which can fail to bump refcount if uprobe is already
> > destined to be destroyed. This, unfortunately, turns out to be a rather
> > expensive due to underlying cmpxchg() operation in
> > atomic_inc_not_zero() and scales rather poorly with increased amount of
> > parallel threads triggering uprobes.
>
> Different archs different trade-offs. You'll not see this on LL/SC archs
> for example.

Clearly x86-64 is the highest priority target, and I've shown that it
benefits from atomic addition vs cmpxchg. Sure, other architecture
might benefit less. But will atomic addition be slower than cmpxchg on
any other architecture?

It's clearly beneficial for x86-64 and not regressing other
architectures, right?

>
> > Furthermore, CPU profiling showed the following overall CPU usage:
> >   - try_get_uprobe (19.3%) + put_uprobe (8.2%) = 27.5% CPU usage for
> >     atomic_inc_not_zero approach;
> >   - __get_uprobe (12.3%) + put_uprobe (9.9%) = 22.2% CPU usage for
> >     atomic_add_and_return approach implemented by this patch.
>
> I think those numbers suggest trying to not have a refcount in the first
> place. Both are pretty terrible, yes one is less terrible than the
> other, but still terrible.

Good, we are on the same page here, yes.

>
> Specifically, I'm thinking it is the refcounting in handlw_swbp() that
> is actually the problem, all the other stuff is noise.
>
> So if you have SRCU protected consumers, what is the reason for still
> having a refcount in handlw_swbp() ? Simply have the whole of it inside
> a single SRCU critical section, then all consumers you find get a hit.

That's the goal (except SRCU vs RCU Tasks Trace) and that's the next
step. I didn't want to add all that complexity to an already pretty
big and complex patch set. I do believe that batch APIs are the first
necessary step.

Your innocuous "// XXX amortize / batch" comment below is *the major
point of this patch set*. Try to appreciate that. It's not a small
todo, it took this entire patch set to allow for that.

Now, if you are so against percpu RW semapshore, I can just drop the
last patch for now, but the rest is necessary regardless.

Note how I didn't really touch locking *at all*. uprobes_treelock used
to be a spinlock, which we 1-to-1 replaced with rw_spinlock. And now
I'm replacing it, again 1-to-1, with percpu RW semaphore. Specifically
not to entangle batching with the locking schema changes.

>
> Hmm, return probes are a pain, they require the uprobe to stay extant
> between handle_swbp() and handle_trampoline(). I'm thinking we can do
> that with SRCU as well.

I don't think we can, and I'm surprised you don't think that way.

uretprobe might never be triggered for various reasons. Either user
space never returns from the function, or uretprobe was never
installed in the right place (and so uprobe part will trigger, but
there will never be returning probe triggering). I don't think it's
acceptable to delay whole global uprobes SRCU unlocking indefinitely
and keep that at user space's code will.

So, with that, I think refcounting *for return probe* will have to
stay. And will have to be fast.

>
> When I cobble all that together (it really shouldn't be one patch, but
> you get the idea I hope) it looks a little something like the below.
>
> I *think* it should work, but perhaps I've missed something?

Well, at the very least you missed that we can't delay SRCU (or any
other sleepable RCU flavor) potentially indefinitely for uretprobes,
which are completely under user space control.

>
> TL;DR replace treelock with seqcount+SRCU
>       replace register_rwsem with SRCU
>       replace handle_swbp() refcount with SRCU
>       replace return_instance refcount with a second SRCU

So, as I mentioned. I haven't considered seqcount just yet, and I will
think that through. This patch set was meant to add batched API to
unblock all of the above you describe. Percpu RW semaphore switch was
a no-brainer with batched APIs, so I went for that to get more
performance with zero added effort and complexity. If you hate that
part, I can drop it. But batching APIs are unavoidable, no matter what
specific RCU-protected locking schema we end up doing.

Can we agree on that and move this forward, please?

>
> Paul, I had to do something vile with SRCU. The basic problem is that we
> want to keep a SRCU critical section across fork(), which leads to both
> parent and child doing srcu_read_unlock(&srcu, idx). As such, I need an
> extra increment on the @idx ssp counter to even things out, see
> __srcu_read_clone_lock().
>
> ---
>  include/linux/rbtree.h  |  45 +++++++++++++
>  include/linux/srcu.h    |   2 +
>  include/linux/uprobes.h |   2 +
>  kernel/events/uprobes.c | 166 +++++++++++++++++++++++++++++++-----------------
>  kernel/rcu/srcutree.c   |   5 ++
>  5 files changed, 161 insertions(+), 59 deletions(-)
>
> diff --git a/include/linux/rbtree.h b/include/linux/rbtree.h
> index f7edca369eda..9847fa58a287 100644
> --- a/include/linux/rbtree.h
> +++ b/include/linux/rbtree.h
> @@ -244,6 +244,31 @@ rb_find_add(struct rb_node *node, struct rb_root *tree,
>         return NULL;
>  }
>
> +static __always_inline struct rb_node *
> +rb_find_add_rcu(struct rb_node *node, struct rb_root *tree,
> +               int (*cmp)(struct rb_node *, const struct rb_node *))
> +{
> +       struct rb_node **link = &tree->rb_node;
> +       struct rb_node *parent = NULL;
> +       int c;
> +
> +       while (*link) {
> +               parent = *link;
> +               c = cmp(node, parent);
> +
> +               if (c < 0)
> +                       link = &parent->rb_left;
> +               else if (c > 0)
> +                       link = &parent->rb_right;
> +               else
> +                       return parent;
> +       }
> +
> +       rb_link_node_rcu(node, parent, link);
> +       rb_insert_color(node, tree);
> +       return NULL;
> +}
> +
>  /**
>   * rb_find() - find @key in tree @tree
>   * @key: key to match
> @@ -272,6 +297,26 @@ rb_find(const void *key, const struct rb_root *tree,
>         return NULL;
>  }
>
> +static __always_inline struct rb_node *
> +rb_find_rcu(const void *key, const struct rb_root *tree,
> +           int (*cmp)(const void *key, const struct rb_node *))
> +{
> +       struct rb_node *node = tree->rb_node;
> +
> +       while (node) {
> +               int c = cmp(key, node);
> +
> +               if (c < 0)
> +                       node = rcu_dereference_raw(node->rb_left);
> +               else if (c > 0)
> +                       node = rcu_dereference_raw(node->rb_right);
> +               else
> +                       return node;
> +       }
> +
> +       return NULL;
> +}
> +
>  /**
>   * rb_find_first() - find the first @key in @tree
>   * @key: key to match
> diff --git a/include/linux/srcu.h b/include/linux/srcu.h
> index 236610e4a8fa..9b14acecbb9d 100644
> --- a/include/linux/srcu.h
> +++ b/include/linux/srcu.h
> @@ -55,7 +55,9 @@ void call_srcu(struct srcu_struct *ssp, struct rcu_head *head,
>                 void (*func)(struct rcu_head *head));
>  void cleanup_srcu_struct(struct srcu_struct *ssp);
>  int __srcu_read_lock(struct srcu_struct *ssp) __acquires(ssp);
> +void __srcu_read_clone_lock(struct srcu_struct *ssp, int idx);
>  void __srcu_read_unlock(struct srcu_struct *ssp, int idx) __releases(ssp);
> +
>  void synchronize_srcu(struct srcu_struct *ssp);
>  unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp);
>  unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp);
> diff --git a/include/linux/uprobes.h b/include/linux/uprobes.h
> index f46e0ca0169c..354cab634341 100644
> --- a/include/linux/uprobes.h
> +++ b/include/linux/uprobes.h
> @@ -78,6 +78,7 @@ struct uprobe_task {
>
>         struct return_instance          *return_instances;
>         unsigned int                    depth;
> +       unsigned int                    active_srcu_idx;
>  };
>
>  struct return_instance {
> @@ -86,6 +87,7 @@ struct return_instance {
>         unsigned long           stack;          /* stack pointer */
>         unsigned long           orig_ret_vaddr; /* original return address */
>         bool                    chained;        /* true, if instance is nested */
> +       int                     srcu_idx;
>
>         struct return_instance  *next;          /* keep as stack */
>  };
> diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c
> index 2c83ba776fc7..0b7574a54093 100644
> --- a/kernel/events/uprobes.c
> +++ b/kernel/events/uprobes.c
> @@ -26,6 +26,7 @@
>  #include <linux/task_work.h>
>  #include <linux/shmem_fs.h>
>  #include <linux/khugepaged.h>
> +#include <linux/srcu.h>
>
>  #include <linux/uprobes.h>
>
> @@ -40,6 +41,17 @@ static struct rb_root uprobes_tree = RB_ROOT;
>  #define no_uprobe_events()     RB_EMPTY_ROOT(&uprobes_tree)
>
>  static DEFINE_RWLOCK(uprobes_treelock);        /* serialize rbtree access */
> +static seqcount_rwlock_t uprobes_seqcount = SEQCNT_RWLOCK_ZERO(uprobes_seqcount, &uprobes_treelock);
> +
> +/*
> + * Used for both the uprobes_tree and the uprobe->consumer list.
> + */
> +DEFINE_STATIC_SRCU(uprobe_srcu);
> +/*
> + * Used for return_instance and single-step uprobe lifetime. Separate from
> + * uprobe_srcu in order to minimize the synchronize_srcu() cost at unregister.
> + */
> +DEFINE_STATIC_SRCU(uretprobe_srcu);
>
>  #define UPROBES_HASH_SZ        13
>  /* serialize uprobe->pending_list */
> @@ -54,7 +66,8 @@ DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem);
>  struct uprobe {
>         struct rb_node          rb_node;        /* node in the rb tree */
>         refcount_t              ref;
> -       struct rw_semaphore     register_rwsem;
> +       struct rcu_head         rcu;
> +       struct mutex            register_mutex;
>         struct rw_semaphore     consumer_rwsem;
>         struct list_head        pending_list;
>         struct uprobe_consumer  *consumers;
> @@ -67,7 +80,7 @@ struct uprobe {
>          * The generic code assumes that it has two members of unknown type
>          * owned by the arch-specific code:
>          *
> -        *      insn -  copy_insn() saves the original instruction here for
> +        *      insn -  copy_insn() saves the original instruction here for
>          *              arch_uprobe_analyze_insn().
>          *
>          *      ixol -  potentially modified instruction to execute out of
> @@ -205,7 +218,7 @@ static int __replace_page(struct vm_area_struct *vma, unsigned long addr,
>         folio_put(old_folio);
>
>         err = 0;
> - unlock:
> +unlock:
>         mmu_notifier_invalidate_range_end(&range);
>         folio_unlock(old_folio);
>         return err;
> @@ -593,6 +606,22 @@ static struct uprobe *get_uprobe(struct uprobe *uprobe)
>         return uprobe;
>  }
>
> +static void uprobe_free_stage2(struct rcu_head *rcu)
> +{
> +       struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu);
> +       kfree(uprobe);
> +}
> +
> +static void uprobe_free_stage1(struct rcu_head *rcu)
> +{
> +       struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu);
> +       /*
> +        * At this point all the consumers are complete and gone, but retprobe
> +        * and single-step might still reference the uprobe itself.
> +        */
> +       call_srcu(&uretprobe_srcu, &uprobe->rcu, uprobe_free_stage2);
> +}
> +
>  static void put_uprobe(struct uprobe *uprobe)
>  {
>         if (refcount_dec_and_test(&uprobe->ref)) {
> @@ -604,7 +633,8 @@ static void put_uprobe(struct uprobe *uprobe)
>                 mutex_lock(&delayed_uprobe_lock);
>                 delayed_uprobe_remove(uprobe, NULL);
>                 mutex_unlock(&delayed_uprobe_lock);
> -               kfree(uprobe);
> +
> +               call_srcu(&uprobe_srcu, &uprobe->rcu, uprobe_free_stage1);
>         }
>  }
>
> @@ -653,10 +683,10 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset)
>                 .inode = inode,
>                 .offset = offset,
>         };
> -       struct rb_node *node = rb_find(&key, &uprobes_tree, __uprobe_cmp_key);
> +       struct rb_node *node = rb_find_rcu(&key, &uprobes_tree, __uprobe_cmp_key);
>
>         if (node)
> -               return get_uprobe(__node_2_uprobe(node));
> +               return __node_2_uprobe(node);
>
>         return NULL;
>  }
> @@ -667,20 +697,32 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset)
>   */
>  static struct uprobe *find_uprobe(struct inode *inode, loff_t offset)
>  {
> -       struct uprobe *uprobe;
> +       unsigned seq;
>
> -       read_lock(&uprobes_treelock);
> -       uprobe = __find_uprobe(inode, offset);
> -       read_unlock(&uprobes_treelock);
> +       lockdep_assert(srcu_read_lock_held(&uprobe_srcu));
>
> -       return uprobe;
> +       do {
> +               seq = read_seqcount_begin(&uprobes_seqcount);
> +               struct uprobe *uprobe = __find_uprobe(inode, offset);
> +               if (uprobe) {
> +                       /*
> +                        * Lockless RB-tree lookups are prone to false-negatives.
> +                        * If they find something, it's good. If they do not find,
> +                        * it needs to be validated.
> +                        */
> +                       return uprobe;
> +               }
> +       } while (read_seqcount_retry(&uprobes_seqcount, seq));
> +
> +       /* Really didn't find anything. */
> +       return NULL;
>  }
>
>  static struct uprobe *__insert_uprobe(struct uprobe *uprobe)
>  {
>         struct rb_node *node;
>
> -       node = rb_find_add(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp);
> +       node = rb_find_add_rcu(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp);
>         if (node)
>                 return get_uprobe(__node_2_uprobe(node));
>
> @@ -702,7 +744,9 @@ static struct uprobe *insert_uprobe(struct uprobe *uprobe)
>         struct uprobe *u;
>
>         write_lock(&uprobes_treelock);
> +       write_seqcount_begin(&uprobes_seqcount);
>         u = __insert_uprobe(uprobe);
> +       write_seqcount_end(&uprobes_seqcount);
>         write_unlock(&uprobes_treelock);
>
>         return u;
> @@ -730,7 +774,7 @@ static struct uprobe *alloc_uprobe(struct inode *inode, loff_t offset,
>         uprobe->inode = inode;
>         uprobe->offset = offset;
>         uprobe->ref_ctr_offset = ref_ctr_offset;
> -       init_rwsem(&uprobe->register_rwsem);
> +       mutex_init(&uprobe->register_mutex);
>         init_rwsem(&uprobe->consumer_rwsem);
>
>         /* add to uprobes_tree, sorted on inode:offset */
> @@ -754,7 +798,7 @@ static void consumer_add(struct uprobe *uprobe, struct uprobe_consumer *uc)
>  {
>         down_write(&uprobe->consumer_rwsem);
>         uc->next = uprobe->consumers;
> -       uprobe->consumers = uc;
> +       rcu_assign_pointer(uprobe->consumers, uc);
>         up_write(&uprobe->consumer_rwsem);
>  }
>
> @@ -771,7 +815,7 @@ static bool consumer_del(struct uprobe *uprobe, struct uprobe_consumer *uc)
>         down_write(&uprobe->consumer_rwsem);
>         for (con = &uprobe->consumers; *con; con = &(*con)->next) {
>                 if (*con == uc) {
> -                       *con = uc->next;
> +                       rcu_assign_pointer(*con, uc->next);
>                         ret = true;
>                         break;
>                 }
> @@ -857,7 +901,7 @@ static int prepare_uprobe(struct uprobe *uprobe, struct file *file,
>         smp_wmb(); /* pairs with the smp_rmb() in handle_swbp() */
>         set_bit(UPROBE_COPY_INSN, &uprobe->flags);
>
> - out:
> +out:
>         up_write(&uprobe->consumer_rwsem);
>
>         return ret;
> @@ -936,7 +980,9 @@ static void delete_uprobe(struct uprobe *uprobe)
>                 return;
>
>         write_lock(&uprobes_treelock);
> +       write_seqcount_begin(&uprobes_seqcount);
>         rb_erase(&uprobe->rb_node, &uprobes_tree);
> +       write_seqcount_end(&uprobes_seqcount);
>         write_unlock(&uprobes_treelock);
>         RB_CLEAR_NODE(&uprobe->rb_node); /* for uprobe_is_active() */
>         put_uprobe(uprobe);
> @@ -965,7 +1011,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register)
>         struct map_info *info;
>         int more = 0;
>
> - again:
> +again:
>         i_mmap_lock_read(mapping);
>         vma_interval_tree_foreach(vma, &mapping->i_mmap, pgoff, pgoff) {
>                 if (!valid_vma(vma, is_register))
> @@ -1019,7 +1065,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register)
>         } while (--more);
>
>         goto again;
> - out:
> +out:
>         while (prev)
>                 prev = free_map_info(prev);
>         return curr;
> @@ -1068,13 +1114,13 @@ register_for_each_vma(struct uprobe *uprobe, struct uprobe_consumer *new)
>                                 err |= remove_breakpoint(uprobe, mm, info->vaddr);
>                 }
>
> - unlock:
> +unlock:
>                 mmap_write_unlock(mm);
> - free:
> +free:
>                 mmput(mm);
>                 info = free_map_info(info);
>         }
> - out:
> +out:
>         percpu_up_write(&dup_mmap_sem);
>         return err;
>  }
> @@ -1101,16 +1147,17 @@ __uprobe_unregister(struct uprobe *uprobe, struct uprobe_consumer *uc)
>   */
>  void uprobe_unregister(struct inode *inode, loff_t offset, struct uprobe_consumer *uc)
>  {
> -       struct uprobe *uprobe;
> +       scoped_guard (srcu, &uprobe_srcu) {
> +               struct uprobe *uprobe = find_uprobe(inode, offset);
> +               if (WARN_ON(!uprobe))
> +                       return;
>
> -       uprobe = find_uprobe(inode, offset);
> -       if (WARN_ON(!uprobe))
> -               return;
> +               mutex_lock(&uprobe->register_mutex);
> +               __uprobe_unregister(uprobe, uc);
> +               mutex_unlock(&uprobe->register_mutex);
> +       }
>
> -       down_write(&uprobe->register_rwsem);
> -       __uprobe_unregister(uprobe, uc);
> -       up_write(&uprobe->register_rwsem);
> -       put_uprobe(uprobe);
> +       synchronize_srcu(&uprobe_srcu); // XXX amortize / batch
>  }
>  EXPORT_SYMBOL_GPL(uprobe_unregister);
>
> @@ -1159,7 +1206,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
>         if (!IS_ALIGNED(ref_ctr_offset, sizeof(short)))
>                 return -EINVAL;
>
> - retry:
> +retry:
>         uprobe = alloc_uprobe(inode, offset, ref_ctr_offset);
>         if (!uprobe)
>                 return -ENOMEM;
> @@ -1170,7 +1217,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
>          * We can race with uprobe_unregister()->delete_uprobe().
>          * Check uprobe_is_active() and retry if it is false.
>          */
> -       down_write(&uprobe->register_rwsem);
> +       mutex_lock(&uprobe->register_mutex);
>         ret = -EAGAIN;
>         if (likely(uprobe_is_active(uprobe))) {
>                 consumer_add(uprobe, uc);
> @@ -1178,7 +1225,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset,
>                 if (ret)
>                         __uprobe_unregister(uprobe, uc);
>         }
> -       up_write(&uprobe->register_rwsem);
> +       mutex_unlock(&uprobe->register_mutex);
>         put_uprobe(uprobe);
>
>         if (unlikely(ret == -EAGAIN))
> @@ -1214,17 +1261,18 @@ int uprobe_apply(struct inode *inode, loff_t offset,
>         struct uprobe_consumer *con;
>         int ret = -ENOENT;
>
> +       guard(srcu)(&uprobe_srcu);
> +
>         uprobe = find_uprobe(inode, offset);
>         if (WARN_ON(!uprobe))
>                 return ret;
>
> -       down_write(&uprobe->register_rwsem);
> +       mutex_lock(&uprobe->register_mutex);
>         for (con = uprobe->consumers; con && con != uc ; con = con->next)
>                 ;
>         if (con)
>                 ret = register_for_each_vma(uprobe, add ? uc : NULL);
> -       up_write(&uprobe->register_rwsem);
> -       put_uprobe(uprobe);
> +       mutex_unlock(&uprobe->register_mutex);
>
>         return ret;
>  }
> @@ -1468,7 +1516,7 @@ static int xol_add_vma(struct mm_struct *mm, struct xol_area *area)
>         ret = 0;
>         /* pairs with get_xol_area() */
>         smp_store_release(&mm->uprobes_state.xol_area, area); /* ^^^ */
> - fail:
> +fail:
>         mmap_write_unlock(mm);
>
>         return ret;
> @@ -1512,7 +1560,7 @@ static struct xol_area *__create_xol_area(unsigned long vaddr)
>         kfree(area->bitmap);
>   free_area:
>         kfree(area);
> - out:
> +out:
>         return NULL;
>  }
>
> @@ -1700,7 +1748,7 @@ unsigned long uprobe_get_trap_addr(struct pt_regs *regs)
>  static struct return_instance *free_ret_instance(struct return_instance *ri)
>  {
>         struct return_instance *next = ri->next;
> -       put_uprobe(ri->uprobe);
> +       srcu_read_unlock(&uretprobe_srcu, ri->srcu_idx);
>         kfree(ri);
>         return next;
>  }
> @@ -1718,7 +1766,7 @@ void uprobe_free_utask(struct task_struct *t)
>                 return;
>
>         if (utask->active_uprobe)
> -               put_uprobe(utask->active_uprobe);
> +               srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx);
>
>         ri = utask->return_instances;
>         while (ri)
> @@ -1761,7 +1809,7 @@ static int dup_utask(struct task_struct *t, struct uprobe_task *o_utask)
>                         return -ENOMEM;
>
>                 *n = *o;
> -               get_uprobe(n->uprobe);
> +               __srcu_read_clone_lock(&uretprobe_srcu, n->srcu_idx);
>                 n->next = NULL;
>
>                 *p = n;
> @@ -1904,7 +1952,8 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs)
>                 orig_ret_vaddr = utask->return_instances->orig_ret_vaddr;
>         }
>
> -       ri->uprobe = get_uprobe(uprobe);
> +       ri->srcu_idx = srcu_read_lock(&uretprobe_srcu);
> +       ri->uprobe = uprobe;
>         ri->func = instruction_pointer(regs);
>         ri->stack = user_stack_pointer(regs);
>         ri->orig_ret_vaddr = orig_ret_vaddr;
> @@ -1915,7 +1964,7 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs)
>         utask->return_instances = ri;
>
>         return;
> - fail:
> +fail:
>         kfree(ri);
>  }
>
> @@ -1944,6 +1993,7 @@ pre_ssout(struct uprobe *uprobe, struct pt_regs *regs, unsigned long bp_vaddr)
>                 return err;
>         }
>
> +       utask->active_srcu_idx = srcu_read_lock(&uretprobe_srcu);
>         utask->active_uprobe = uprobe;
>         utask->state = UTASK_SSTEP;
>         return 0;
> @@ -2031,7 +2081,7 @@ static int is_trap_at_addr(struct mm_struct *mm, unsigned long vaddr)
>
>         copy_from_page(page, vaddr, &opcode, UPROBE_SWBP_INSN_SIZE);
>         put_page(page);
> - out:
> +out:
>         /* This needs to return true for any variant of the trap insn */
>         return is_trap_insn(&opcode);
>  }
> @@ -2071,8 +2121,9 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs)
>         int remove = UPROBE_HANDLER_REMOVE;
>         bool need_prep = false; /* prepare return uprobe, when needed */
>
> -       down_read(&uprobe->register_rwsem);
> -       for (uc = uprobe->consumers; uc; uc = uc->next) {
> +       lockdep_assert(srcu_read_lock_held(&uprobe_srcu));
> +
> +       for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference(uc->next)) {
>                 int rc = 0;
>
>                 if (uc->handler) {
> @@ -2094,7 +2145,6 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs)
>                 WARN_ON(!uprobe_is_active(uprobe));
>                 unapply_uprobe(uprobe, current->mm);
>         }
> -       up_read(&uprobe->register_rwsem);
>  }
>
>  static void
> @@ -2103,12 +2153,12 @@ handle_uretprobe_chain(struct return_instance *ri, struct pt_regs *regs)
>         struct uprobe *uprobe = ri->uprobe;
>         struct uprobe_consumer *uc;
>
> -       down_read(&uprobe->register_rwsem);
> -       for (uc = uprobe->consumers; uc; uc = uc->next) {
> +       guard(srcu)(&uprobe_srcu);
> +
> +       for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference_raw(uc->next)) {
>                 if (uc->ret_handler)
>                         uc->ret_handler(uc, ri->func, regs);
>         }
> -       up_read(&uprobe->register_rwsem);
>  }
>
>  static struct return_instance *find_next_ret_chain(struct return_instance *ri)
> @@ -2159,7 +2209,7 @@ static void handle_trampoline(struct pt_regs *regs)
>         utask->return_instances = ri;
>         return;
>
> - sigill:
> +sigill:
>         uprobe_warn(current, "handle uretprobe, sending SIGILL.");
>         force_sig(SIGILL);
>
> @@ -2190,6 +2240,8 @@ static void handle_swbp(struct pt_regs *regs)
>         if (bp_vaddr == get_trampoline_vaddr())
>                 return handle_trampoline(regs);
>
> +       guard(srcu)(&uprobe_srcu);
> +
>         uprobe = find_active_uprobe(bp_vaddr, &is_swbp);
>         if (!uprobe) {
>                 if (is_swbp > 0) {
> @@ -2218,7 +2270,7 @@ static void handle_swbp(struct pt_regs *regs)
>          * new and not-yet-analyzed uprobe at the same address, restart.
>          */
>         if (unlikely(!test_bit(UPROBE_COPY_INSN, &uprobe->flags)))
> -               goto out;
> +               return;
>
>         /*
>          * Pairs with the smp_wmb() in prepare_uprobe().
> @@ -2231,22 +2283,18 @@ static void handle_swbp(struct pt_regs *regs)
>
>         /* Tracing handlers use ->utask to communicate with fetch methods */
>         if (!get_utask())
> -               goto out;
> +               return;
>
>         if (arch_uprobe_ignore(&uprobe->arch, regs))
> -               goto out;
> +               return;
>
>         handler_chain(uprobe, regs);
>
>         if (arch_uprobe_skip_sstep(&uprobe->arch, regs))
> -               goto out;
> +               return;
>
>         if (!pre_ssout(uprobe, regs, bp_vaddr))
>                 return;
> -
> -       /* arch_uprobe_skip_sstep() succeeded, or restart if can't singlestep */
> -out:
> -       put_uprobe(uprobe);
>  }
>
>  /*
> @@ -2266,7 +2314,7 @@ static void handle_singlestep(struct uprobe_task *utask, struct pt_regs *regs)
>         else
>                 WARN_ON_ONCE(1);
>
> -       put_uprobe(uprobe);
> +       srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx);
>         utask->active_uprobe = NULL;
>         utask->state = UTASK_RUNNING;
>         xol_free_insn_slot(current);
> diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
> index bc4b58b0204e..d8cda9003da4 100644
> --- a/kernel/rcu/srcutree.c
> +++ b/kernel/rcu/srcutree.c
> @@ -720,6 +720,11 @@ int __srcu_read_lock(struct srcu_struct *ssp)
>  }
>  EXPORT_SYMBOL_GPL(__srcu_read_lock);
>
> +int __srcu_read_clone_lock(struct srcu_struct *ssp, int idx)
> +{
> +       this_cpu_inc(ssp->sda->srcu_lock_count[idx].counter);
> +}
> +
>  /*
>   * Removes the count for the old reader from the appropriate per-CPU
>   * element of the srcu_struct.  Note that this may well be a different
diff mbox series

Patch

diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c
index 23449a8c5e7e..560cf1ca512a 100644
--- a/kernel/events/uprobes.c
+++ b/kernel/events/uprobes.c
@@ -53,9 +53,10 @@  DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem);
 
 struct uprobe {
 	struct rb_node		rb_node;	/* node in the rb tree */
-	refcount_t		ref;
+	atomic64_t		ref;		/* see UPROBE_REFCNT_GET below */
 	struct rw_semaphore	register_rwsem;
 	struct rw_semaphore	consumer_rwsem;
+	struct rcu_head		rcu;
 	struct list_head	pending_list;
 	struct uprobe_consumer	*consumers;
 	struct inode		*inode;		/* Also hold a ref to inode */
@@ -587,15 +588,138 @@  set_orig_insn(struct arch_uprobe *auprobe, struct mm_struct *mm, unsigned long v
 			*(uprobe_opcode_t *)&auprobe->insn);
 }
 
-static struct uprobe *get_uprobe(struct uprobe *uprobe)
+/*
+ * Uprobe's 64-bit refcount is actually two independent counters co-located in
+ * a single u64 value:
+ *   - lower 32 bits are just a normal refcount with is increment and
+ *   decremented on get and put, respectively, just like normal refcount
+ *   would;
+ *   - upper 32 bits are a tag (or epoch, if you will), which is always
+ *   incremented by one, no matter whether get or put operation is done.
+ *
+ * This upper counter is meant to distinguish between:
+ *   - one CPU dropping refcnt from 1 -> 0 and proceeding with "destruction",
+ *   - while another CPU continuing further meanwhile with 0 -> 1 -> 0 refcnt
+ *   sequence, also proceeding to "destruction".
+ *
+ * In both cases refcount drops to zero, but in one case it will have epoch N,
+ * while the second drop to zero will have a different epoch N + 2, allowing
+ * first destructor to bail out because epoch changed between refcount going
+ * to zero and put_uprobe() taking uprobes_treelock (under which overall
+ * 64-bit refcount is double-checked, see put_uprobe() for details).
+ *
+ * Lower 32-bit counter is not meant to over overflow, while it's expected
+ * that upper 32-bit counter will overflow occasionally. Note, though, that we
+ * can't allow upper 32-bit counter to "bleed over" into lower 32-bit counter,
+ * so whenever epoch counter gets highest bit set to 1, __get_uprobe() and
+ * put_uprobe() will attempt to clear upper bit with cmpxchg(). This makes
+ * epoch effectively a 31-bit counter with highest bit used as a flag to
+ * perform a fix-up. This ensures epoch and refcnt parts do not "interfere".
+ *
+ * UPROBE_REFCNT_GET constant is chosen such that it will *increment both*
+ * epoch and refcnt parts atomically with one atomic_add().
+ * UPROBE_REFCNT_PUT is chosen such that it will *decrement* refcnt part and
+ * *increment* epoch part.
+ */
+#define UPROBE_REFCNT_GET ((1LL << 32) + 1LL) /* 0x0000000100000001LL */
+#define UPROBE_REFCNT_PUT ((1LL << 32) - 1LL) /* 0x00000000ffffffffLL */
+
+/*
+ * Caller has to make sure that:
+ *   a) either uprobe's refcnt is positive before this call;
+ *   b) or uprobes_treelock is held (doesn't matter if for read or write),
+ *      preventing uprobe's destructor from removing it from uprobes_tree.
+ *
+ * In the latter case, uprobe's destructor will "resurrect" uprobe instance if
+ * it detects that its refcount went back to being positive again inbetween it
+ * dropping to zero at some point and (potentially delayed) destructor
+ * callback actually running.
+ */
+static struct uprobe *__get_uprobe(struct uprobe *uprobe)
 {
-	refcount_inc(&uprobe->ref);
+	s64 v;
+
+	v = atomic64_add_return(UPROBE_REFCNT_GET, &uprobe->ref);
+
+	/*
+	 * If the highest bit is set, we need to clear it. If cmpxchg() fails,
+	 * we don't retry because there is another CPU that just managed to
+	 * update refcnt and will attempt the same "fix up". Eventually one of
+	 * them will succeed to clear highset bit.
+	 */
+	if (unlikely(v < 0))
+		(void)atomic64_cmpxchg(&uprobe->ref, v, v & ~(1ULL << 63));
+
 	return uprobe;
 }
 
+static inline bool uprobe_is_active(struct uprobe *uprobe)
+{
+	return !RB_EMPTY_NODE(&uprobe->rb_node);
+}
+
+static void uprobe_free_rcu(struct rcu_head *rcu)
+{
+	struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu);
+
+	kfree(uprobe);
+}
+
 static void put_uprobe(struct uprobe *uprobe)
 {
-	if (refcount_dec_and_test(&uprobe->ref)) {
+	s64 v;
+
+	/*
+	 * here uprobe instance is guaranteed to be alive, so we use Tasks
+	 * Trace RCU to guarantee that uprobe won't be freed from under us, if
+	 * we end up being a losing "destructor" inside uprobe_treelock'ed
+	 * section double-checking uprobe->ref value below.
+	 * Note call_rcu_tasks_trace() + uprobe_free_rcu below.
+	 */
+	rcu_read_lock_trace();
+
+	v = atomic64_add_return(UPROBE_REFCNT_PUT, &uprobe->ref);
+
+	if (unlikely((u32)v == 0)) {
+		bool destroy;
+
+		write_lock(&uprobes_treelock);
+		/*
+		 * We might race with find_uprobe()->__get_uprobe() executed
+		 * from inside read-locked uprobes_treelock, which can bump
+		 * refcount from zero back to one, after we got here. Even
+		 * worse, it's possible for another CPU to do 0 -> 1 -> 0
+		 * transition between this CPU doing atomic_add() and taking
+		 * uprobes_treelock. In either case this CPU should bail out
+		 * and not proceed with destruction.
+		 *
+		 * So now that we have exclusive write lock, we double check
+		 * the total 64-bit refcount value, which includes the epoch.
+		 * If nothing changed (i.e., epoch is the same and refcnt is
+		 * still zero), we are good and we proceed with the clean up.
+		 *
+		 * But if it managed to be updated back at least once, we just
+		 * pretend it never went to zero. If lower 32-bit refcnt part
+		 * drops to zero again, another CPU will proceed with
+		 * destruction, due to more up to date epoch.
+		 */
+		destroy = atomic64_read(&uprobe->ref) == v;
+		if (destroy && uprobe_is_active(uprobe))
+			rb_erase(&uprobe->rb_node, &uprobes_tree);
+		write_unlock(&uprobes_treelock);
+
+		/*
+		 * Beyond here we don't need RCU protection, we are either the
+		 * winning destructor and we control the rest of uprobe's
+		 * lifetime; or we lost and we are bailing without accessing
+		 * uprobe fields anymore.
+		 */
+		rcu_read_unlock_trace();
+
+		/* uprobe got resurrected, pretend we never tried to free it */
+		if (!destroy)
+			return;
+
 		/*
 		 * If application munmap(exec_vma) before uprobe_unregister()
 		 * gets called, we don't get a chance to remove uprobe from
@@ -604,8 +728,21 @@  static void put_uprobe(struct uprobe *uprobe)
 		mutex_lock(&delayed_uprobe_lock);
 		delayed_uprobe_remove(uprobe, NULL);
 		mutex_unlock(&delayed_uprobe_lock);
-		kfree(uprobe);
+
+		call_rcu_tasks_trace(&uprobe->rcu, uprobe_free_rcu);
+		return;
 	}
+
+	/*
+	 * If the highest bit is set, we need to clear it. If cmpxchg() fails,
+	 * we don't retry because there is another CPU that just managed to
+	 * update refcnt and will attempt the same "fix up". Eventually one of
+	 * them will succeed to clear highset bit.
+	 */
+	if (unlikely(v < 0))
+		(void)atomic64_cmpxchg(&uprobe->ref, v, v & ~(1ULL << 63));
+
+	rcu_read_unlock_trace();
 }
 
 static __always_inline
@@ -653,12 +790,15 @@  static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset)
 		.inode = inode,
 		.offset = offset,
 	};
-	struct rb_node *node = rb_find(&key, &uprobes_tree, __uprobe_cmp_key);
+	struct rb_node *node;
+	struct uprobe *u = NULL;
 
+	node = rb_find(&key, &uprobes_tree, __uprobe_cmp_key);
 	if (node)
-		return get_uprobe(__node_2_uprobe(node));
+		/* we hold uprobes_treelock, so it's safe to __get_uprobe() */
+		u = __get_uprobe(__node_2_uprobe(node));
 
-	return NULL;
+	return u;
 }
 
 /*
@@ -676,26 +816,37 @@  static struct uprobe *find_uprobe(struct inode *inode, loff_t offset)
 	return uprobe;
 }
 
+/*
+ * Attempt to insert a new uprobe into uprobes_tree.
+ *
+ * If uprobe already exists (for given inode+offset), we just increment
+ * refcount of previously existing uprobe.
+ *
+ * If not, a provided new instance of uprobe is inserted into the tree (with
+ * assumed initial refcount == 1).
+ *
+ * In any case, we return a uprobe instance that ends up being in uprobes_tree.
+ * Caller has to clean up new uprobe instance, if it ended up not being
+ * inserted into the tree.
+ *
+ * We assume that uprobes_treelock is held for writing.
+ */
 static struct uprobe *__insert_uprobe(struct uprobe *uprobe)
 {
 	struct rb_node *node;
+	struct uprobe *u = uprobe;
 
 	node = rb_find_add(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp);
 	if (node)
-		return get_uprobe(__node_2_uprobe(node));
+		/* we hold uprobes_treelock, so it's safe to __get_uprobe() */
+		u = __get_uprobe(__node_2_uprobe(node));
 
-	/* get access + creation ref */
-	refcount_set(&uprobe->ref, 2);
-	return NULL;
+	return u;
 }
 
 /*
- * Acquire uprobes_treelock.
- * Matching uprobe already exists in rbtree;
- *	increment (access refcount) and return the matching uprobe.
- *
- * No matching uprobe; insert the uprobe in rb_tree;
- *	get a double refcount (access + creation) and return NULL.
+ * Acquire uprobes_treelock and insert uprobe into uprobes_tree
+ * (or reuse existing one, see __insert_uprobe() comments above).
  */
 static struct uprobe *insert_uprobe(struct uprobe *uprobe)
 {
@@ -732,11 +883,13 @@  static struct uprobe *alloc_uprobe(struct inode *inode, loff_t offset,
 	uprobe->ref_ctr_offset = ref_ctr_offset;
 	init_rwsem(&uprobe->register_rwsem);
 	init_rwsem(&uprobe->consumer_rwsem);
+	RB_CLEAR_NODE(&uprobe->rb_node);
+	atomic64_set(&uprobe->ref, 1);
 
 	/* add to uprobes_tree, sorted on inode:offset */
 	cur_uprobe = insert_uprobe(uprobe);
 	/* a uprobe exists for this inode:offset combination */
-	if (cur_uprobe) {
+	if (cur_uprobe != uprobe) {
 		if (cur_uprobe->ref_ctr_offset != uprobe->ref_ctr_offset) {
 			ref_ctr_mismatch_warn(cur_uprobe, uprobe);
 			put_uprobe(cur_uprobe);
@@ -921,27 +1074,6 @@  remove_breakpoint(struct uprobe *uprobe, struct mm_struct *mm, unsigned long vad
 	return set_orig_insn(&uprobe->arch, mm, vaddr);
 }
 
-static inline bool uprobe_is_active(struct uprobe *uprobe)
-{
-	return !RB_EMPTY_NODE(&uprobe->rb_node);
-}
-/*
- * There could be threads that have already hit the breakpoint. They
- * will recheck the current insn and restart if find_uprobe() fails.
- * See find_active_uprobe().
- */
-static void delete_uprobe(struct uprobe *uprobe)
-{
-	if (WARN_ON(!uprobe_is_active(uprobe)))
-		return;
-
-	write_lock(&uprobes_treelock);
-	rb_erase(&uprobe->rb_node, &uprobes_tree);
-	write_unlock(&uprobes_treelock);
-	RB_CLEAR_NODE(&uprobe->rb_node); /* for uprobe_is_active() */
-	put_uprobe(uprobe);
-}
-
 struct map_info {
 	struct map_info *next;
 	struct mm_struct *mm;
@@ -1082,15 +1214,11 @@  register_for_each_vma(struct uprobe *uprobe, struct uprobe_consumer *new)
 static void
 __uprobe_unregister(struct uprobe *uprobe, struct uprobe_consumer *uc)
 {
-	int err;
-
 	if (WARN_ON(!consumer_del(uprobe, uc)))
 		return;
 
-	err = register_for_each_vma(uprobe, NULL);
 	/* TODO : cant unregister? schedule a worker thread */
-	if (!uprobe->consumers && !err)
-		delete_uprobe(uprobe);
+	(void)register_for_each_vma(uprobe, NULL);
 }
 
 /*
@@ -1159,28 +1287,20 @@  static int __uprobe_register(struct inode *inode, loff_t offset,
 	if (!IS_ALIGNED(ref_ctr_offset, sizeof(short)))
 		return -EINVAL;
 
- retry:
 	uprobe = alloc_uprobe(inode, offset, ref_ctr_offset);
 	if (IS_ERR(uprobe))
 		return PTR_ERR(uprobe);
 
-	/*
-	 * We can race with uprobe_unregister()->delete_uprobe().
-	 * Check uprobe_is_active() and retry if it is false.
-	 */
 	down_write(&uprobe->register_rwsem);
-	ret = -EAGAIN;
-	if (likely(uprobe_is_active(uprobe))) {
-		consumer_add(uprobe, uc);
-		ret = register_for_each_vma(uprobe, uc);
-		if (ret)
-			__uprobe_unregister(uprobe, uc);
-	}
+	consumer_add(uprobe, uc);
+	ret = register_for_each_vma(uprobe, uc);
+	if (ret)
+		__uprobe_unregister(uprobe, uc);
 	up_write(&uprobe->register_rwsem);
-	put_uprobe(uprobe);
 
-	if (unlikely(ret == -EAGAIN))
-		goto retry;
+	if (ret)
+		put_uprobe(uprobe);
+
 	return ret;
 }
 
@@ -1303,15 +1423,15 @@  static void build_probe_list(struct inode *inode,
 			u = rb_entry(t, struct uprobe, rb_node);
 			if (u->inode != inode || u->offset < min)
 				break;
+			__get_uprobe(u); /* uprobes_treelock is held */
 			list_add(&u->pending_list, head);
-			get_uprobe(u);
 		}
 		for (t = n; (t = rb_next(t)); ) {
 			u = rb_entry(t, struct uprobe, rb_node);
 			if (u->inode != inode || u->offset > max)
 				break;
+			__get_uprobe(u); /* uprobes_treelock is held */
 			list_add(&u->pending_list, head);
-			get_uprobe(u);
 		}
 	}
 	read_unlock(&uprobes_treelock);
@@ -1769,7 +1889,14 @@  static int dup_utask(struct task_struct *t, struct uprobe_task *o_utask)
 			return -ENOMEM;
 
 		*n = *o;
-		get_uprobe(n->uprobe);
+		/*
+		 * uprobe's refcnt has to be positive at this point, kept by
+		 * utask->return_instances items; return_instances can't be
+		 * removed right now, as task is blocked due to duping; so
+		 * __get_uprobe() is safe to use here without holding
+		 * uprobes_treelock.
+		 */
+		__get_uprobe(n->uprobe);
 		n->next = NULL;
 
 		*p = n;
@@ -1911,8 +2038,11 @@  static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs)
 		}
 		orig_ret_vaddr = utask->return_instances->orig_ret_vaddr;
 	}
-
-	ri->uprobe = get_uprobe(uprobe);
+	 /*
+	  * uprobe's refcnt is positive, held by caller, so it's safe to
+	  * unconditionally bump it one more time here
+	  */
+	ri->uprobe = __get_uprobe(uprobe);
 	ri->func = instruction_pointer(regs);
 	ri->stack = user_stack_pointer(regs);
 	ri->orig_ret_vaddr = orig_ret_vaddr;