diff mbox series

[19/24] rcu/tree: Support reclaim for head-less object

Message ID 20200428205903.61704-20-urezki@gmail.com (mailing list archive)
State New, archived
Headers show
Series Introduce kvfree_rcu(1 or 2 arguments) | expand

Commit Message

Uladzislau Rezki April 28, 2020, 8:58 p.m. UTC
Update the kvfree_call_rcu() with head-less support, it
means an object without any rcu_head structure can be
reclaimed after GP.

To store pointers there are two chain-arrays maintained
one for SLAB and another one is for vmalloc. Both types
of objects(head-less variant and regular one) are placed
there based on the type.

It can be that maintaining of arrays becomes impossible
due to high memory pressure. For such reason there is an
emergency path. In that case objects with rcu_head inside
are just queued building one way list. Later on that list
is drained.

As for head-less variant. Such objects do not have any
rcu_head helper inside. Thus it is dynamically attached.
As a result an object consists of back-pointer and regular
rcu_head. It implies that emergency path can detect such
object type, therefore they are tagged. So a back-pointer
could be freed as well as dynamically attached wrapper.

Even though such approach requires dynamic memory it needs
only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
thus SLAB is used to obtain it. Finally if attaching of the
rcu_head and queuing get failed, the current context has
to follow might_sleep() annotation, thus below steps could
be applied:
   a) wait until a grace period has elapsed;
   b) direct inlining of the kvfree() call.

Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
---
 kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 98 insertions(+), 4 deletions(-)

Comments

Paul E. McKenney May 1, 2020, 10:39 p.m. UTC | #1
On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> Update the kvfree_call_rcu() with head-less support, it
> means an object without any rcu_head structure can be
> reclaimed after GP.
> 
> To store pointers there are two chain-arrays maintained
> one for SLAB and another one is for vmalloc. Both types
> of objects(head-less variant and regular one) are placed
> there based on the type.
> 
> It can be that maintaining of arrays becomes impossible
> due to high memory pressure. For such reason there is an
> emergency path. In that case objects with rcu_head inside
> are just queued building one way list. Later on that list
> is drained.
> 
> As for head-less variant. Such objects do not have any
> rcu_head helper inside. Thus it is dynamically attached.
> As a result an object consists of back-pointer and regular
> rcu_head. It implies that emergency path can detect such
> object type, therefore they are tagged. So a back-pointer
> could be freed as well as dynamically attached wrapper.
> 
> Even though such approach requires dynamic memory it needs
> only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> thus SLAB is used to obtain it. Finally if attaching of the
> rcu_head and queuing get failed, the current context has
> to follow might_sleep() annotation, thus below steps could
> be applied:
>    a) wait until a grace period has elapsed;
>    b) direct inlining of the kvfree() call.
> 
> Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> ---
>  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 98 insertions(+), 4 deletions(-)
> 
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index 51726e4c3b4d..501cac02146d 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
>  	 */
>  	for (; head; head = next) {
>  		unsigned long offset = (unsigned long)head->func;
> -		void *ptr = (void *)head - offset;
> +		bool headless;
> +		void *ptr;
>  
>  		next = head->next;
> +
> +		/* We tag the headless object, if so adjust offset. */
> +		headless = (((unsigned long) head - offset) & BIT(0));
> +		if (headless)
> +			offset -= 1;
> +
> +		ptr = (void *) head - offset;
> +
>  		debug_rcu_head_unqueue((struct rcu_head *)ptr);
>  		rcu_lock_acquire(&rcu_callback_map);
>  		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
>  
> -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> +		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> +			/*
> +			 * If headless free the back-pointer first.
> +			 */
> +			if (headless)
> +				kvfree((void *) *((unsigned long *) ptr));
> +
>  			kvfree(ptr);
> +		}
>  
>  		rcu_lock_release(&rcu_callback_map);
>  		cond_resched_tasks_rcu_qs();
> @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
>  			if (IS_ENABLED(CONFIG_PREEMPT_RT))
>  				return false;
>  
> +			/*
> +			 * TODO: For one argument of kvfree_rcu() we can
> +			 * drop the lock and get the page in sleepable
> +			 * context. That would allow to maintain an array
> +			 * for the CONFIG_PREEMPT_RT as well. Thus we could
> +			 * get rid of dynamic rcu_head attaching code.
> +			 */
>  			bnode = (struct kvfree_rcu_bulk_data *)
>  				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
>  		}
> @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
>  	return true;
>  }
>  
> +static inline struct rcu_head *
> +attach_rcu_head_to_object(void *obj)
> +{
> +	unsigned long *ptr;
> +
> +	ptr = kmalloc(sizeof(unsigned long *) +
> +			sizeof(struct rcu_head), GFP_NOWAIT |
> +				__GFP_RECLAIM |	/* can do direct reclaim. */
> +				__GFP_NORETRY |	/* only lightweight one.  */
> +				__GFP_NOWARN);	/* no failure reports. */

Again, let's please not do this single-pointer-sized allocation.  If
a full page is not available and this is a single-argument kfree_rcu(),
just call synchronize_rcu() and then free the object directly.

It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
For example, have some kind of reservation protocol so that a task
that drops the lock can retry the page allocation and be sure of having
a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
more pages per CPU.  Or maybe that would not be necessary.

							Thanx, Paul

> +	if (!ptr)
> +		return NULL;
> +
> +	ptr[0] = (unsigned long) obj;
> +	return ((struct rcu_head *) ++ptr);
> +}
> +
>  /*
>   * Queue a request for lazy invocation of appropriate free routine after a
>   * grace period. Please note there are three paths are maintained, two are the
> @@ -3260,16 +3300,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>  {
>  	unsigned long flags;
>  	struct kfree_rcu_cpu *krcp;
> +	bool success;
>  	void *ptr;
>  
> +	if (head) {
> +		ptr = (void *) head - (unsigned long) func;
> +	} else {
> +		/*
> +		 * Please note there is a limitation for the head-less
> +		 * variant, that is why there is a clear rule for such
> +		 * objects:
> +		 *
> +		 * it can be used from might_sleep() context only. For
> +		 * other places please embed an rcu_head to your data.
> +		 */
> +		might_sleep();
> +		ptr = (unsigned long *) func;
> +	}
> +
>  	krcp = krc_this_cpu_lock(&flags);
> -	ptr = (void *)head - (unsigned long)func;
>  
>  	/* Queue the object but don't yet schedule the batch. */
>  	if (debug_rcu_head_queue(ptr)) {
>  		/* Probable double kfree_rcu(), just leak. */
>  		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
>  			  __func__, head);
> +
> +		/* Mark as success and leave. */
> +		success = true;
>  		goto unlock_return;
>  	}
>  
> @@ -3277,10 +3335,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>  	 * Under high memory pressure GFP_NOWAIT can fail,
>  	 * in that case the emergency path is maintained.
>  	 */
> -	if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
> +	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
> +	if (!success) {
> +		if (head == NULL) {
> +			/*
> +			 * Headless(one argument kvfree_rcu()) can sleep.
> +			 * Drop the lock and tack it back. So it can do
> +			 * direct lightweight reclaim.
> +			 */
> +			krc_this_cpu_unlock(krcp, flags);
> +			head = attach_rcu_head_to_object(ptr);
> +			krcp = krc_this_cpu_lock(&flags);
> +
> +			if (head == NULL)
> +				goto unlock_return;
> +
> +			/*
> +			 * Tag the headless object. Such objects have a
> +			 * back-pointer to the original allocated memory,
> +			 * that has to be freed as well as dynamically
> +			 * attached wrapper/head.
> +			 */
> +			func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
> +		}
> +
>  		head->func = func;
>  		head->next = krcp->head;
>  		krcp->head = head;
> +		success = true;
>  	}
>  
>  	WRITE_ONCE(krcp->count, krcp->count + 1);
> @@ -3294,6 +3376,18 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
>  
>  unlock_return:
>  	krc_this_cpu_unlock(krcp, flags);
> +
> +	/*
> +	 * High memory pressure, so inline kvfree() after
> +	 * synchronize_rcu(). We can do it from might_sleep()
> +	 * context only, so the current CPU can pass the QS
> +	 * state.
> +	 */
> +	if (!success) {
> +		debug_rcu_head_unqueue(ptr);
> +		synchronize_rcu();
> +		kvfree(ptr);
> +	}
>  }
>  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
>  
> -- 
> 2.20.1
>
Joel Fernandes May 4, 2020, 12:12 a.m. UTC | #2
On Fri, May 01, 2020 at 03:39:09PM -0700, Paul E. McKenney wrote:
> On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> > Update the kvfree_call_rcu() with head-less support, it
> > means an object without any rcu_head structure can be
> > reclaimed after GP.
> > 
> > To store pointers there are two chain-arrays maintained
> > one for SLAB and another one is for vmalloc. Both types
> > of objects(head-less variant and regular one) are placed
> > there based on the type.
> > 
> > It can be that maintaining of arrays becomes impossible
> > due to high memory pressure. For such reason there is an
> > emergency path. In that case objects with rcu_head inside
> > are just queued building one way list. Later on that list
> > is drained.
> > 
> > As for head-less variant. Such objects do not have any
> > rcu_head helper inside. Thus it is dynamically attached.
> > As a result an object consists of back-pointer and regular
> > rcu_head. It implies that emergency path can detect such
> > object type, therefore they are tagged. So a back-pointer
> > could be freed as well as dynamically attached wrapper.
> > 
> > Even though such approach requires dynamic memory it needs
> > only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> > thus SLAB is used to obtain it. Finally if attaching of the
> > rcu_head and queuing get failed, the current context has
> > to follow might_sleep() annotation, thus below steps could
> > be applied:
> >    a) wait until a grace period has elapsed;
> >    b) direct inlining of the kvfree() call.
> > 
> > Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > ---
> >  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
> >  1 file changed, 98 insertions(+), 4 deletions(-)
> > 
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index 51726e4c3b4d..501cac02146d 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
> >  	 */
> >  	for (; head; head = next) {
> >  		unsigned long offset = (unsigned long)head->func;
> > -		void *ptr = (void *)head - offset;
> > +		bool headless;
> > +		void *ptr;
> >  
> >  		next = head->next;
> > +
> > +		/* We tag the headless object, if so adjust offset. */
> > +		headless = (((unsigned long) head - offset) & BIT(0));
> > +		if (headless)
> > +			offset -= 1;
> > +
> > +		ptr = (void *) head - offset;
> > +
> >  		debug_rcu_head_unqueue((struct rcu_head *)ptr);
> >  		rcu_lock_acquire(&rcu_callback_map);
> >  		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
> >  
> > -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> > +		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> > +			/*
> > +			 * If headless free the back-pointer first.
> > +			 */
> > +			if (headless)
> > +				kvfree((void *) *((unsigned long *) ptr));
> > +
> >  			kvfree(ptr);
> > +		}
> >  
> >  		rcu_lock_release(&rcu_callback_map);
> >  		cond_resched_tasks_rcu_qs();
> > @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> >  			if (IS_ENABLED(CONFIG_PREEMPT_RT))
> >  				return false;
> >  
> > +			/*
> > +			 * TODO: For one argument of kvfree_rcu() we can
> > +			 * drop the lock and get the page in sleepable
> > +			 * context. That would allow to maintain an array
> > +			 * for the CONFIG_PREEMPT_RT as well. Thus we could
> > +			 * get rid of dynamic rcu_head attaching code.
> > +			 */
> >  			bnode = (struct kvfree_rcu_bulk_data *)
> >  				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
> >  		}
> > @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> >  	return true;
> >  }
> >  
> > +static inline struct rcu_head *
> > +attach_rcu_head_to_object(void *obj)
> > +{
> > +	unsigned long *ptr;
> > +
> > +	ptr = kmalloc(sizeof(unsigned long *) +
> > +			sizeof(struct rcu_head), GFP_NOWAIT |
> > +				__GFP_RECLAIM |	/* can do direct reclaim. */
> > +				__GFP_NORETRY |	/* only lightweight one.  */
> > +				__GFP_NOWARN);	/* no failure reports. */
> 
> Again, let's please not do this single-pointer-sized allocation.  If
> a full page is not available and this is a single-argument kfree_rcu(),
> just call synchronize_rcu() and then free the object directly.

With the additional caching, lack of full page should not be very likely. I
agree we can avoid doing any allocation and just straight to
synchroize_rcu().

> It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
> For example, have some kind of reservation protocol so that a task
> that drops the lock can retry the page allocation and be sure of having
> a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
> more pages per CPU.  Or maybe that would not be necessary.

If we are not doing single-pointer allocation, then that would also eliminate
entering the low-level page allocator for single-pointer allocations.

Or did you mean entry into the allocator for the full-page allocations
related to the pointer array for PREEMPT_RT? Even if we skip entry into the
allocator for those, we will still have additional caching which further
reduces chances of getting a full page. In the event of such failure, we can
simply queue the rcu_head.

Thoughts?

thanks,

 - Joel

> 
> 							Thanx, Paul
> 
> > +	if (!ptr)
> > +		return NULL;
> > +
> > +	ptr[0] = (unsigned long) obj;
> > +	return ((struct rcu_head *) ++ptr);
> > +}
> > +
> >  /*
> >   * Queue a request for lazy invocation of appropriate free routine after a
> >   * grace period. Please note there are three paths are maintained, two are the
> > @@ -3260,16 +3300,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  {
> >  	unsigned long flags;
> >  	struct kfree_rcu_cpu *krcp;
> > +	bool success;
> >  	void *ptr;
> >  
> > +	if (head) {
> > +		ptr = (void *) head - (unsigned long) func;
> > +	} else {
> > +		/*
> > +		 * Please note there is a limitation for the head-less
> > +		 * variant, that is why there is a clear rule for such
> > +		 * objects:
> > +		 *
> > +		 * it can be used from might_sleep() context only. For
> > +		 * other places please embed an rcu_head to your data.
> > +		 */
> > +		might_sleep();
> > +		ptr = (unsigned long *) func;
> > +	}
> > +
> >  	krcp = krc_this_cpu_lock(&flags);
> > -	ptr = (void *)head - (unsigned long)func;
> >  
> >  	/* Queue the object but don't yet schedule the batch. */
> >  	if (debug_rcu_head_queue(ptr)) {
> >  		/* Probable double kfree_rcu(), just leak. */
> >  		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> >  			  __func__, head);
> > +
> > +		/* Mark as success and leave. */
> > +		success = true;
> >  		goto unlock_return;
> >  	}
> >  
> > @@ -3277,10 +3335,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  	 * Under high memory pressure GFP_NOWAIT can fail,
> >  	 * in that case the emergency path is maintained.
> >  	 */
> > -	if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
> > +	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
> > +	if (!success) {
> > +		if (head == NULL) {
> > +			/*
> > +			 * Headless(one argument kvfree_rcu()) can sleep.
> > +			 * Drop the lock and tack it back. So it can do
> > +			 * direct lightweight reclaim.
> > +			 */
> > +			krc_this_cpu_unlock(krcp, flags);
> > +			head = attach_rcu_head_to_object(ptr);
> > +			krcp = krc_this_cpu_lock(&flags);
> > +
> > +			if (head == NULL)
> > +				goto unlock_return;
> > +
> > +			/*
> > +			 * Tag the headless object. Such objects have a
> > +			 * back-pointer to the original allocated memory,
> > +			 * that has to be freed as well as dynamically
> > +			 * attached wrapper/head.
> > +			 */
> > +			func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
> > +		}
> > +
> >  		head->func = func;
> >  		head->next = krcp->head;
> >  		krcp->head = head;
> > +		success = true;
> >  	}
> >  
> >  	WRITE_ONCE(krcp->count, krcp->count + 1);
> > @@ -3294,6 +3376,18 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  
> >  unlock_return:
> >  	krc_this_cpu_unlock(krcp, flags);
> > +
> > +	/*
> > +	 * High memory pressure, so inline kvfree() after
> > +	 * synchronize_rcu(). We can do it from might_sleep()
> > +	 * context only, so the current CPU can pass the QS
> > +	 * state.
> > +	 */
> > +	if (!success) {
> > +		debug_rcu_head_unqueue(ptr);
> > +		synchronize_rcu();
> > +		kvfree(ptr);
> > +	}
> >  }
> >  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
> >  
> > -- 
> > 2.20.1
> >
Paul E. McKenney May 4, 2020, 12:28 a.m. UTC | #3
On Sun, May 03, 2020 at 08:12:58PM -0400, Joel Fernandes wrote:
> On Fri, May 01, 2020 at 03:39:09PM -0700, Paul E. McKenney wrote:
> > On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> > > Update the kvfree_call_rcu() with head-less support, it
> > > means an object without any rcu_head structure can be
> > > reclaimed after GP.
> > > 
> > > To store pointers there are two chain-arrays maintained
> > > one for SLAB and another one is for vmalloc. Both types
> > > of objects(head-less variant and regular one) are placed
> > > there based on the type.
> > > 
> > > It can be that maintaining of arrays becomes impossible
> > > due to high memory pressure. For such reason there is an
> > > emergency path. In that case objects with rcu_head inside
> > > are just queued building one way list. Later on that list
> > > is drained.
> > > 
> > > As for head-less variant. Such objects do not have any
> > > rcu_head helper inside. Thus it is dynamically attached.
> > > As a result an object consists of back-pointer and regular
> > > rcu_head. It implies that emergency path can detect such
> > > object type, therefore they are tagged. So a back-pointer
> > > could be freed as well as dynamically attached wrapper.
> > > 
> > > Even though such approach requires dynamic memory it needs
> > > only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> > > thus SLAB is used to obtain it. Finally if attaching of the
> > > rcu_head and queuing get failed, the current context has
> > > to follow might_sleep() annotation, thus below steps could
> > > be applied:
> > >    a) wait until a grace period has elapsed;
> > >    b) direct inlining of the kvfree() call.
> > > 
> > > Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > ---
> > >  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
> > >  1 file changed, 98 insertions(+), 4 deletions(-)
> > > 
> > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > > index 51726e4c3b4d..501cac02146d 100644
> > > --- a/kernel/rcu/tree.c
> > > +++ b/kernel/rcu/tree.c
> > > @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
> > >  	 */
> > >  	for (; head; head = next) {
> > >  		unsigned long offset = (unsigned long)head->func;
> > > -		void *ptr = (void *)head - offset;
> > > +		bool headless;
> > > +		void *ptr;
> > >  
> > >  		next = head->next;
> > > +
> > > +		/* We tag the headless object, if so adjust offset. */
> > > +		headless = (((unsigned long) head - offset) & BIT(0));
> > > +		if (headless)
> > > +			offset -= 1;
> > > +
> > > +		ptr = (void *) head - offset;
> > > +
> > >  		debug_rcu_head_unqueue((struct rcu_head *)ptr);
> > >  		rcu_lock_acquire(&rcu_callback_map);
> > >  		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
> > >  
> > > -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> > > +		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> > > +			/*
> > > +			 * If headless free the back-pointer first.
> > > +			 */
> > > +			if (headless)
> > > +				kvfree((void *) *((unsigned long *) ptr));
> > > +
> > >  			kvfree(ptr);
> > > +		}
> > >  
> > >  		rcu_lock_release(&rcu_callback_map);
> > >  		cond_resched_tasks_rcu_qs();
> > > @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> > >  			if (IS_ENABLED(CONFIG_PREEMPT_RT))
> > >  				return false;
> > >  
> > > +			/*
> > > +			 * TODO: For one argument of kvfree_rcu() we can
> > > +			 * drop the lock and get the page in sleepable
> > > +			 * context. That would allow to maintain an array
> > > +			 * for the CONFIG_PREEMPT_RT as well. Thus we could
> > > +			 * get rid of dynamic rcu_head attaching code.
> > > +			 */
> > >  			bnode = (struct kvfree_rcu_bulk_data *)
> > >  				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
> > >  		}
> > > @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> > >  	return true;
> > >  }
> > >  
> > > +static inline struct rcu_head *
> > > +attach_rcu_head_to_object(void *obj)
> > > +{
> > > +	unsigned long *ptr;
> > > +
> > > +	ptr = kmalloc(sizeof(unsigned long *) +
> > > +			sizeof(struct rcu_head), GFP_NOWAIT |
> > > +				__GFP_RECLAIM |	/* can do direct reclaim. */
> > > +				__GFP_NORETRY |	/* only lightweight one.  */
> > > +				__GFP_NOWARN);	/* no failure reports. */
> > 
> > Again, let's please not do this single-pointer-sized allocation.  If
> > a full page is not available and this is a single-argument kfree_rcu(),
> > just call synchronize_rcu() and then free the object directly.
> 
> With the additional caching, lack of full page should not be very likely. I
> agree we can avoid doing any allocation and just straight to
> synchroize_rcu().

That sounds good to me!

> > It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
> > For example, have some kind of reservation protocol so that a task
> > that drops the lock can retry the page allocation and be sure of having
> > a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
> > more pages per CPU.  Or maybe that would not be necessary.
> 
> If we are not doing single-pointer allocation, then that would also eliminate
> entering the low-level page allocator for single-pointer allocations.
> 
> Or did you mean entry into the allocator for the full-page allocations
> related to the pointer array for PREEMPT_RT? Even if we skip entry into the
> allocator for those, we will still have additional caching which further
> reduces chances of getting a full page. In the event of such failure, we can
> simply queue the rcu_head.
> 
> Thoughts?

I was just trying to guess why you kept the single-pointer allocation.
It looks like I guessed wrong.  ;-)

If, as you say above, you make it go straight to synchronize_rcu()
upon full-page allocation failure, that would be good!

							Thanx, Paul

> thanks,
> 
>  - Joel
> 
> > 
> > 							Thanx, Paul
> > 
> > > +	if (!ptr)
> > > +		return NULL;
> > > +
> > > +	ptr[0] = (unsigned long) obj;
> > > +	return ((struct rcu_head *) ++ptr);
> > > +}
> > > +
> > >  /*
> > >   * Queue a request for lazy invocation of appropriate free routine after a
> > >   * grace period. Please note there are three paths are maintained, two are the
> > > @@ -3260,16 +3300,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >  {
> > >  	unsigned long flags;
> > >  	struct kfree_rcu_cpu *krcp;
> > > +	bool success;
> > >  	void *ptr;
> > >  
> > > +	if (head) {
> > > +		ptr = (void *) head - (unsigned long) func;
> > > +	} else {
> > > +		/*
> > > +		 * Please note there is a limitation for the head-less
> > > +		 * variant, that is why there is a clear rule for such
> > > +		 * objects:
> > > +		 *
> > > +		 * it can be used from might_sleep() context only. For
> > > +		 * other places please embed an rcu_head to your data.
> > > +		 */
> > > +		might_sleep();
> > > +		ptr = (unsigned long *) func;
> > > +	}
> > > +
> > >  	krcp = krc_this_cpu_lock(&flags);
> > > -	ptr = (void *)head - (unsigned long)func;
> > >  
> > >  	/* Queue the object but don't yet schedule the batch. */
> > >  	if (debug_rcu_head_queue(ptr)) {
> > >  		/* Probable double kfree_rcu(), just leak. */
> > >  		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> > >  			  __func__, head);
> > > +
> > > +		/* Mark as success and leave. */
> > > +		success = true;
> > >  		goto unlock_return;
> > >  	}
> > >  
> > > @@ -3277,10 +3335,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >  	 * Under high memory pressure GFP_NOWAIT can fail,
> > >  	 * in that case the emergency path is maintained.
> > >  	 */
> > > -	if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
> > > +	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
> > > +	if (!success) {
> > > +		if (head == NULL) {
> > > +			/*
> > > +			 * Headless(one argument kvfree_rcu()) can sleep.
> > > +			 * Drop the lock and tack it back. So it can do
> > > +			 * direct lightweight reclaim.
> > > +			 */
> > > +			krc_this_cpu_unlock(krcp, flags);
> > > +			head = attach_rcu_head_to_object(ptr);
> > > +			krcp = krc_this_cpu_lock(&flags);
> > > +
> > > +			if (head == NULL)
> > > +				goto unlock_return;
> > > +
> > > +			/*
> > > +			 * Tag the headless object. Such objects have a
> > > +			 * back-pointer to the original allocated memory,
> > > +			 * that has to be freed as well as dynamically
> > > +			 * attached wrapper/head.
> > > +			 */
> > > +			func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
> > > +		}
> > > +
> > >  		head->func = func;
> > >  		head->next = krcp->head;
> > >  		krcp->head = head;
> > > +		success = true;
> > >  	}
> > >  
> > >  	WRITE_ONCE(krcp->count, krcp->count + 1);
> > > @@ -3294,6 +3376,18 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >  
> > >  unlock_return:
> > >  	krc_this_cpu_unlock(krcp, flags);
> > > +
> > > +	/*
> > > +	 * High memory pressure, so inline kvfree() after
> > > +	 * synchronize_rcu(). We can do it from might_sleep()
> > > +	 * context only, so the current CPU can pass the QS
> > > +	 * state.
> > > +	 */
> > > +	if (!success) {
> > > +		debug_rcu_head_unqueue(ptr);
> > > +		synchronize_rcu();
> > > +		kvfree(ptr);
> > > +	}
> > >  }
> > >  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
> > >  
> > > -- 
> > > 2.20.1
> > >
Joel Fernandes May 4, 2020, 12:32 a.m. UTC | #4
On Sun, May 03, 2020 at 05:28:55PM -0700, Paul E. McKenney wrote:
> On Sun, May 03, 2020 at 08:12:58PM -0400, Joel Fernandes wrote:
> > On Fri, May 01, 2020 at 03:39:09PM -0700, Paul E. McKenney wrote:
> > > On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> > > > Update the kvfree_call_rcu() with head-less support, it
> > > > means an object without any rcu_head structure can be
> > > > reclaimed after GP.
> > > > 
> > > > To store pointers there are two chain-arrays maintained
> > > > one for SLAB and another one is for vmalloc. Both types
> > > > of objects(head-less variant and regular one) are placed
> > > > there based on the type.
> > > > 
> > > > It can be that maintaining of arrays becomes impossible
> > > > due to high memory pressure. For such reason there is an
> > > > emergency path. In that case objects with rcu_head inside
> > > > are just queued building one way list. Later on that list
> > > > is drained.
> > > > 
> > > > As for head-less variant. Such objects do not have any
> > > > rcu_head helper inside. Thus it is dynamically attached.
> > > > As a result an object consists of back-pointer and regular
> > > > rcu_head. It implies that emergency path can detect such
> > > > object type, therefore they are tagged. So a back-pointer
> > > > could be freed as well as dynamically attached wrapper.
> > > > 
> > > > Even though such approach requires dynamic memory it needs
> > > > only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> > > > thus SLAB is used to obtain it. Finally if attaching of the
> > > > rcu_head and queuing get failed, the current context has
> > > > to follow might_sleep() annotation, thus below steps could
> > > > be applied:
> > > >    a) wait until a grace period has elapsed;
> > > >    b) direct inlining of the kvfree() call.
> > > > 
> > > > Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > > > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > > Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > > > ---
> > > >  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
> > > >  1 file changed, 98 insertions(+), 4 deletions(-)
> > > > 
> > > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > > > index 51726e4c3b4d..501cac02146d 100644
> > > > --- a/kernel/rcu/tree.c
> > > > +++ b/kernel/rcu/tree.c
> > > > @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
> > > >  	 */
> > > >  	for (; head; head = next) {
> > > >  		unsigned long offset = (unsigned long)head->func;
> > > > -		void *ptr = (void *)head - offset;
> > > > +		bool headless;
> > > > +		void *ptr;
> > > >  
> > > >  		next = head->next;
> > > > +
> > > > +		/* We tag the headless object, if so adjust offset. */
> > > > +		headless = (((unsigned long) head - offset) & BIT(0));
> > > > +		if (headless)
> > > > +			offset -= 1;
> > > > +
> > > > +		ptr = (void *) head - offset;
> > > > +
> > > >  		debug_rcu_head_unqueue((struct rcu_head *)ptr);
> > > >  		rcu_lock_acquire(&rcu_callback_map);
> > > >  		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
> > > >  
> > > > -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> > > > +		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> > > > +			/*
> > > > +			 * If headless free the back-pointer first.
> > > > +			 */
> > > > +			if (headless)
> > > > +				kvfree((void *) *((unsigned long *) ptr));
> > > > +
> > > >  			kvfree(ptr);
> > > > +		}
> > > >  
> > > >  		rcu_lock_release(&rcu_callback_map);
> > > >  		cond_resched_tasks_rcu_qs();
> > > > @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> > > >  			if (IS_ENABLED(CONFIG_PREEMPT_RT))
> > > >  				return false;
> > > >  
> > > > +			/*
> > > > +			 * TODO: For one argument of kvfree_rcu() we can
> > > > +			 * drop the lock and get the page in sleepable
> > > > +			 * context. That would allow to maintain an array
> > > > +			 * for the CONFIG_PREEMPT_RT as well. Thus we could
> > > > +			 * get rid of dynamic rcu_head attaching code.
> > > > +			 */
> > > >  			bnode = (struct kvfree_rcu_bulk_data *)
> > > >  				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
> > > >  		}
> > > > @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> > > >  	return true;
> > > >  }
> > > >  
> > > > +static inline struct rcu_head *
> > > > +attach_rcu_head_to_object(void *obj)
> > > > +{
> > > > +	unsigned long *ptr;
> > > > +
> > > > +	ptr = kmalloc(sizeof(unsigned long *) +
> > > > +			sizeof(struct rcu_head), GFP_NOWAIT |
> > > > +				__GFP_RECLAIM |	/* can do direct reclaim. */
> > > > +				__GFP_NORETRY |	/* only lightweight one.  */
> > > > +				__GFP_NOWARN);	/* no failure reports. */
> > > 
> > > Again, let's please not do this single-pointer-sized allocation.  If
> > > a full page is not available and this is a single-argument kfree_rcu(),
> > > just call synchronize_rcu() and then free the object directly.
> > 
> > With the additional caching, lack of full page should not be very likely. I
> > agree we can avoid doing any allocation and just straight to
> > synchroize_rcu().
> 
> That sounds good to me!
> 
> > > It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
> > > For example, have some kind of reservation protocol so that a task
> > > that drops the lock can retry the page allocation and be sure of having
> > > a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
> > > more pages per CPU.  Or maybe that would not be necessary.
> > 
> > If we are not doing single-pointer allocation, then that would also eliminate
> > entering the low-level page allocator for single-pointer allocations.
> > 
> > Or did you mean entry into the allocator for the full-page allocations
> > related to the pointer array for PREEMPT_RT? Even if we skip entry into the
> > allocator for those, we will still have additional caching which further
> > reduces chances of getting a full page. In the event of such failure, we can
> > simply queue the rcu_head.
> > 
> > Thoughts?
> 
> I was just trying to guess why you kept the single-pointer allocation.
> It looks like I guessed wrong.  ;-)
> 
> If, as you say above, you make it go straight to synchronize_rcu()
> upon full-page allocation failure, that would be good!

Paul, sounds good. Vlad, are you also Ok with that?

thanks,

 - Joel


> 							Thanx, Paul
> 
> > thanks,
> > 
> >  - Joel
> > 
> > > 
> > > 							Thanx, Paul
> > > 
> > > > +	if (!ptr)
> > > > +		return NULL;
> > > > +
> > > > +	ptr[0] = (unsigned long) obj;
> > > > +	return ((struct rcu_head *) ++ptr);
> > > > +}
> > > > +
> > > >  /*
> > > >   * Queue a request for lazy invocation of appropriate free routine after a
> > > >   * grace period. Please note there are three paths are maintained, two are the
> > > > @@ -3260,16 +3300,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > >  {
> > > >  	unsigned long flags;
> > > >  	struct kfree_rcu_cpu *krcp;
> > > > +	bool success;
> > > >  	void *ptr;
> > > >  
> > > > +	if (head) {
> > > > +		ptr = (void *) head - (unsigned long) func;
> > > > +	} else {
> > > > +		/*
> > > > +		 * Please note there is a limitation for the head-less
> > > > +		 * variant, that is why there is a clear rule for such
> > > > +		 * objects:
> > > > +		 *
> > > > +		 * it can be used from might_sleep() context only. For
> > > > +		 * other places please embed an rcu_head to your data.
> > > > +		 */
> > > > +		might_sleep();
> > > > +		ptr = (unsigned long *) func;
> > > > +	}
> > > > +
> > > >  	krcp = krc_this_cpu_lock(&flags);
> > > > -	ptr = (void *)head - (unsigned long)func;
> > > >  
> > > >  	/* Queue the object but don't yet schedule the batch. */
> > > >  	if (debug_rcu_head_queue(ptr)) {
> > > >  		/* Probable double kfree_rcu(), just leak. */
> > > >  		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
> > > >  			  __func__, head);
> > > > +
> > > > +		/* Mark as success and leave. */
> > > > +		success = true;
> > > >  		goto unlock_return;
> > > >  	}
> > > >  
> > > > @@ -3277,10 +3335,34 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > >  	 * Under high memory pressure GFP_NOWAIT can fail,
> > > >  	 * in that case the emergency path is maintained.
> > > >  	 */
> > > > -	if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
> > > > +	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
> > > > +	if (!success) {
> > > > +		if (head == NULL) {
> > > > +			/*
> > > > +			 * Headless(one argument kvfree_rcu()) can sleep.
> > > > +			 * Drop the lock and tack it back. So it can do
> > > > +			 * direct lightweight reclaim.
> > > > +			 */
> > > > +			krc_this_cpu_unlock(krcp, flags);
> > > > +			head = attach_rcu_head_to_object(ptr);
> > > > +			krcp = krc_this_cpu_lock(&flags);
> > > > +
> > > > +			if (head == NULL)
> > > > +				goto unlock_return;
> > > > +
> > > > +			/*
> > > > +			 * Tag the headless object. Such objects have a
> > > > +			 * back-pointer to the original allocated memory,
> > > > +			 * that has to be freed as well as dynamically
> > > > +			 * attached wrapper/head.
> > > > +			 */
> > > > +			func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
> > > > +		}
> > > > +
> > > >  		head->func = func;
> > > >  		head->next = krcp->head;
> > > >  		krcp->head = head;
> > > > +		success = true;
> > > >  	}
> > > >  
> > > >  	WRITE_ONCE(krcp->count, krcp->count + 1);
> > > > @@ -3294,6 +3376,18 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > >  
> > > >  unlock_return:
> > > >  	krc_this_cpu_unlock(krcp, flags);
> > > > +
> > > > +	/*
> > > > +	 * High memory pressure, so inline kvfree() after
> > > > +	 * synchronize_rcu(). We can do it from might_sleep()
> > > > +	 * context only, so the current CPU can pass the QS
> > > > +	 * state.
> > > > +	 */
> > > > +	if (!success) {
> > > > +		debug_rcu_head_unqueue(ptr);
> > > > +		synchronize_rcu();
> > > > +		kvfree(ptr);
> > > > +	}
> > > >  }
> > > >  EXPORT_SYMBOL_GPL(kvfree_call_rcu);
> > > >  
> > > > -- 
> > > > 2.20.1
> > > >
Uladzislau Rezki May 4, 2020, 12:57 p.m. UTC | #5
On Fri, May 01, 2020 at 03:39:09PM -0700, Paul E. McKenney wrote:
> On Tue, Apr 28, 2020 at 10:58:58PM +0200, Uladzislau Rezki (Sony) wrote:
> > Update the kvfree_call_rcu() with head-less support, it
> > means an object without any rcu_head structure can be
> > reclaimed after GP.
> > 
> > To store pointers there are two chain-arrays maintained
> > one for SLAB and another one is for vmalloc. Both types
> > of objects(head-less variant and regular one) are placed
> > there based on the type.
> > 
> > It can be that maintaining of arrays becomes impossible
> > due to high memory pressure. For such reason there is an
> > emergency path. In that case objects with rcu_head inside
> > are just queued building one way list. Later on that list
> > is drained.
> > 
> > As for head-less variant. Such objects do not have any
> > rcu_head helper inside. Thus it is dynamically attached.
> > As a result an object consists of back-pointer and regular
> > rcu_head. It implies that emergency path can detect such
> > object type, therefore they are tagged. So a back-pointer
> > could be freed as well as dynamically attached wrapper.
> > 
> > Even though such approach requires dynamic memory it needs
> > only sizeof(unsigned long *) + sizeof(struct rcu_head) bytes,
> > thus SLAB is used to obtain it. Finally if attaching of the
> > rcu_head and queuing get failed, the current context has
> > to follow might_sleep() annotation, thus below steps could
> > be applied:
> >    a) wait until a grace period has elapsed;
> >    b) direct inlining of the kvfree() call.
> > 
> > Reviewed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com>
> > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > Co-developed-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > ---
> >  kernel/rcu/tree.c | 102 ++++++++++++++++++++++++++++++++++++++++++++--
> >  1 file changed, 98 insertions(+), 4 deletions(-)
> > 
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index 51726e4c3b4d..501cac02146d 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -3072,15 +3072,31 @@ static void kfree_rcu_work(struct work_struct *work)
> >  	 */
> >  	for (; head; head = next) {
> >  		unsigned long offset = (unsigned long)head->func;
> > -		void *ptr = (void *)head - offset;
> > +		bool headless;
> > +		void *ptr;
> >  
> >  		next = head->next;
> > +
> > +		/* We tag the headless object, if so adjust offset. */
> > +		headless = (((unsigned long) head - offset) & BIT(0));
> > +		if (headless)
> > +			offset -= 1;
> > +
> > +		ptr = (void *) head - offset;
> > +
> >  		debug_rcu_head_unqueue((struct rcu_head *)ptr);
> >  		rcu_lock_acquire(&rcu_callback_map);
> >  		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
> >  
> > -		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
> > +		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
> > +			/*
> > +			 * If headless free the back-pointer first.
> > +			 */
> > +			if (headless)
> > +				kvfree((void *) *((unsigned long *) ptr));
> > +
> >  			kvfree(ptr);
> > +		}
> >  
> >  		rcu_lock_release(&rcu_callback_map);
> >  		cond_resched_tasks_rcu_qs();
> > @@ -3221,6 +3237,13 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> >  			if (IS_ENABLED(CONFIG_PREEMPT_RT))
> >  				return false;
> >  
> > +			/*
> > +			 * TODO: For one argument of kvfree_rcu() we can
> > +			 * drop the lock and get the page in sleepable
> > +			 * context. That would allow to maintain an array
> > +			 * for the CONFIG_PREEMPT_RT as well. Thus we could
> > +			 * get rid of dynamic rcu_head attaching code.
> > +			 */
> >  			bnode = (struct kvfree_rcu_bulk_data *)
> >  				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
> >  		}
> > @@ -3244,6 +3267,23 @@ kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
> >  	return true;
> >  }
> >  
> > +static inline struct rcu_head *
> > +attach_rcu_head_to_object(void *obj)
> > +{
> > +	unsigned long *ptr;
> > +
> > +	ptr = kmalloc(sizeof(unsigned long *) +
> > +			sizeof(struct rcu_head), GFP_NOWAIT |
> > +				__GFP_RECLAIM |	/* can do direct reclaim. */
> > +				__GFP_NORETRY |	/* only lightweight one.  */
> > +				__GFP_NOWARN);	/* no failure reports. */
> 
> Again, let's please not do this single-pointer-sized allocation.  If
> a full page is not available and this is a single-argument kfree_rcu(),
> just call synchronize_rcu() and then free the object directly.
> 
> It should not be -that- hard to adjust locking for CONFIG_PREEMPT_RT!
> For example, have some kind of reservation protocol so that a task
> that drops the lock can retry the page allocation and be sure of having
> a place to put it.  This might entail making CONFIG_PREEMPT_RT reserve
> more pages per CPU.  Or maybe that would not be necessary.
> 
Agreed. Will drop it!

--
Vlad Rezki
Uladzislau Rezki May 4, 2020, 2:21 p.m. UTC | #6
> > > 
> > > If we are not doing single-pointer allocation, then that would also eliminate
> > > entering the low-level page allocator for single-pointer allocations.
> > > 
> > > Or did you mean entry into the allocator for the full-page allocations
> > > related to the pointer array for PREEMPT_RT? Even if we skip entry into the
> > > allocator for those, we will still have additional caching which further
> > > reduces chances of getting a full page. In the event of such failure, we can
> > > simply queue the rcu_head.
> > > 
> > > Thoughts?
> > 
> > I was just trying to guess why you kept the single-pointer allocation.
> > It looks like I guessed wrong.  ;-)
> > 
> > If, as you say above, you make it go straight to synchronize_rcu()
> > upon full-page allocation failure, that would be good!
> 
> Paul, sounds good. Vlad, are you also Ok with that?
> 
OK, let's drop it and keep it simple :)

BTW, for PREEMPT_RT we still can do a page allocation for single
argument of kvfree_rcu(). In case of double we just revert everything
to the rcu_head if no cache.

For single argument we can drop the lock before the entry to the page
allocator. Because it follows might_sleep() anotation we avoid of having
a situation when spinlock(rt mutex) is taken from any atomic context.

Since the lock is dropped the current context can be interrupted by
an IRQ which in its turn can also call kvfree_rcu() on current CPU.
In that case it must be double argument(single is not allowed) kvfree_rcu()
call. For PREEMPT_RT if no cache everything is reverted to rcu_head usage,
i.e. the entry to page allocator is bypassed.

It can be addressed as a separate patch and send out later on if we
are on the same page.

Paul, Joel what are your opinions?

--
Vlad Rezki
Paul E. McKenney May 4, 2020, 3:31 p.m. UTC | #7
On Mon, May 04, 2020 at 04:21:53PM +0200, Uladzislau Rezki wrote:
> > > > 
> > > > If we are not doing single-pointer allocation, then that would also eliminate
> > > > entering the low-level page allocator for single-pointer allocations.
> > > > 
> > > > Or did you mean entry into the allocator for the full-page allocations
> > > > related to the pointer array for PREEMPT_RT? Even if we skip entry into the
> > > > allocator for those, we will still have additional caching which further
> > > > reduces chances of getting a full page. In the event of such failure, we can
> > > > simply queue the rcu_head.
> > > > 
> > > > Thoughts?
> > > 
> > > I was just trying to guess why you kept the single-pointer allocation.
> > > It looks like I guessed wrong.  ;-)
> > > 
> > > If, as you say above, you make it go straight to synchronize_rcu()
> > > upon full-page allocation failure, that would be good!
> > 
> > Paul, sounds good. Vlad, are you also Ok with that?
> > 
> OK, let's drop it and keep it simple :)
> 
> BTW, for PREEMPT_RT we still can do a page allocation for single
> argument of kvfree_rcu(). In case of double we just revert everything
> to the rcu_head if no cache.
> 
> For single argument we can drop the lock before the entry to the page
> allocator. Because it follows might_sleep() anotation we avoid of having
> a situation when spinlock(rt mutex) is taken from any atomic context.
> 
> Since the lock is dropped the current context can be interrupted by
> an IRQ which in its turn can also call kvfree_rcu() on current CPU.
> In that case it must be double argument(single is not allowed) kvfree_rcu()
> call. For PREEMPT_RT if no cache everything is reverted to rcu_head usage,
> i.e. the entry to page allocator is bypassed.
> 
> It can be addressed as a separate patch and send out later on if we
> are on the same page.
> 
> Paul, Joel what are your opinions?

I strongly prefer that it be removed from the series.  I do understand
that this is a bit more hassle right now, but this does help avoid
confusion in the future, plus perhaps also avoiding issues with future
bisections.

							Thanx, Paul
Uladzislau Rezki May 4, 2020, 4:56 p.m. UTC | #8
> > 
> > For single argument we can drop the lock before the entry to the page
> > allocator. Because it follows might_sleep() anotation we avoid of having
> > a situation when spinlock(rt mutex) is taken from any atomic context.
> > 
> > Since the lock is dropped the current context can be interrupted by
> > an IRQ which in its turn can also call kvfree_rcu() on current CPU.
> > In that case it must be double argument(single is not allowed) kvfree_rcu()
> > call. For PREEMPT_RT if no cache everything is reverted to rcu_head usage,
> > i.e. the entry to page allocator is bypassed.
> > 
> > It can be addressed as a separate patch and send out later on if we
> > are on the same page.
> > 
> > Paul, Joel what are your opinions?
> 
> I strongly prefer that it be removed from the series.  I do understand
> that this is a bit more hassle right now, but this does help avoid
> confusion in the future, plus perhaps also avoiding issues with future
> bisections.
> 
We have already decided to get rid of it, i mean small allocations(dynamic
rcu_head attaching). I will exclude it from next patch-set version. 

--
Vlad Rezki
Paul E. McKenney May 4, 2020, 5:08 p.m. UTC | #9
On Mon, May 04, 2020 at 06:56:29PM +0200, Uladzislau Rezki wrote:
> > > 
> > > For single argument we can drop the lock before the entry to the page
> > > allocator. Because it follows might_sleep() anotation we avoid of having
> > > a situation when spinlock(rt mutex) is taken from any atomic context.
> > > 
> > > Since the lock is dropped the current context can be interrupted by
> > > an IRQ which in its turn can also call kvfree_rcu() on current CPU.
> > > In that case it must be double argument(single is not allowed) kvfree_rcu()
> > > call. For PREEMPT_RT if no cache everything is reverted to rcu_head usage,
> > > i.e. the entry to page allocator is bypassed.
> > > 
> > > It can be addressed as a separate patch and send out later on if we
> > > are on the same page.
> > > 
> > > Paul, Joel what are your opinions?
> > 
> > I strongly prefer that it be removed from the series.  I do understand
> > that this is a bit more hassle right now, but this does help avoid
> > confusion in the future, plus perhaps also avoiding issues with future
> > bisections.
> > 
> We have already decided to get rid of it, i mean small allocations(dynamic
> rcu_head attaching). I will exclude it from next patch-set version. 

Very good, and thank you!!!

							Thanx, Paul
diff mbox series

Patch

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 51726e4c3b4d..501cac02146d 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3072,15 +3072,31 @@  static void kfree_rcu_work(struct work_struct *work)
 	 */
 	for (; head; head = next) {
 		unsigned long offset = (unsigned long)head->func;
-		void *ptr = (void *)head - offset;
+		bool headless;
+		void *ptr;
 
 		next = head->next;
+
+		/* We tag the headless object, if so adjust offset. */
+		headless = (((unsigned long) head - offset) & BIT(0));
+		if (headless)
+			offset -= 1;
+
+		ptr = (void *) head - offset;
+
 		debug_rcu_head_unqueue((struct rcu_head *)ptr);
 		rcu_lock_acquire(&rcu_callback_map);
 		trace_rcu_invoke_kvfree_callback(rcu_state.name, head, offset);
 
-		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset)))
+		if (!WARN_ON_ONCE(!__is_kvfree_rcu_offset(offset))) {
+			/*
+			 * If headless free the back-pointer first.
+			 */
+			if (headless)
+				kvfree((void *) *((unsigned long *) ptr));
+
 			kvfree(ptr);
+		}
 
 		rcu_lock_release(&rcu_callback_map);
 		cond_resched_tasks_rcu_qs();
@@ -3221,6 +3237,13 @@  kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
 			if (IS_ENABLED(CONFIG_PREEMPT_RT))
 				return false;
 
+			/*
+			 * TODO: For one argument of kvfree_rcu() we can
+			 * drop the lock and get the page in sleepable
+			 * context. That would allow to maintain an array
+			 * for the CONFIG_PREEMPT_RT as well. Thus we could
+			 * get rid of dynamic rcu_head attaching code.
+			 */
 			bnode = (struct kvfree_rcu_bulk_data *)
 				__get_free_page(GFP_NOWAIT | __GFP_NOWARN);
 		}
@@ -3244,6 +3267,23 @@  kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
 	return true;
 }
 
+static inline struct rcu_head *
+attach_rcu_head_to_object(void *obj)
+{
+	unsigned long *ptr;
+
+	ptr = kmalloc(sizeof(unsigned long *) +
+			sizeof(struct rcu_head), GFP_NOWAIT |
+				__GFP_RECLAIM |	/* can do direct reclaim. */
+				__GFP_NORETRY |	/* only lightweight one.  */
+				__GFP_NOWARN);	/* no failure reports. */
+	if (!ptr)
+		return NULL;
+
+	ptr[0] = (unsigned long) obj;
+	return ((struct rcu_head *) ++ptr);
+}
+
 /*
  * Queue a request for lazy invocation of appropriate free routine after a
  * grace period. Please note there are three paths are maintained, two are the
@@ -3260,16 +3300,34 @@  void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
 	unsigned long flags;
 	struct kfree_rcu_cpu *krcp;
+	bool success;
 	void *ptr;
 
+	if (head) {
+		ptr = (void *) head - (unsigned long) func;
+	} else {
+		/*
+		 * Please note there is a limitation for the head-less
+		 * variant, that is why there is a clear rule for such
+		 * objects:
+		 *
+		 * it can be used from might_sleep() context only. For
+		 * other places please embed an rcu_head to your data.
+		 */
+		might_sleep();
+		ptr = (unsigned long *) func;
+	}
+
 	krcp = krc_this_cpu_lock(&flags);
-	ptr = (void *)head - (unsigned long)func;
 
 	/* Queue the object but don't yet schedule the batch. */
 	if (debug_rcu_head_queue(ptr)) {
 		/* Probable double kfree_rcu(), just leak. */
 		WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
 			  __func__, head);
+
+		/* Mark as success and leave. */
+		success = true;
 		goto unlock_return;
 	}
 
@@ -3277,10 +3335,34 @@  void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 	 * Under high memory pressure GFP_NOWAIT can fail,
 	 * in that case the emergency path is maintained.
 	 */
-	if (unlikely(!kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr))) {
+	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
+	if (!success) {
+		if (head == NULL) {
+			/*
+			 * Headless(one argument kvfree_rcu()) can sleep.
+			 * Drop the lock and tack it back. So it can do
+			 * direct lightweight reclaim.
+			 */
+			krc_this_cpu_unlock(krcp, flags);
+			head = attach_rcu_head_to_object(ptr);
+			krcp = krc_this_cpu_lock(&flags);
+
+			if (head == NULL)
+				goto unlock_return;
+
+			/*
+			 * Tag the headless object. Such objects have a
+			 * back-pointer to the original allocated memory,
+			 * that has to be freed as well as dynamically
+			 * attached wrapper/head.
+			 */
+			func = (rcu_callback_t) (sizeof(unsigned long *) + 1);
+		}
+
 		head->func = func;
 		head->next = krcp->head;
 		krcp->head = head;
+		success = true;
 	}
 
 	WRITE_ONCE(krcp->count, krcp->count + 1);
@@ -3294,6 +3376,18 @@  void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 
 unlock_return:
 	krc_this_cpu_unlock(krcp, flags);
+
+	/*
+	 * High memory pressure, so inline kvfree() after
+	 * synchronize_rcu(). We can do it from might_sleep()
+	 * context only, so the current CPU can pass the QS
+	 * state.
+	 */
+	if (!success) {
+		debug_rcu_head_unqueue(ptr);
+		synchronize_rcu();
+		kvfree(ptr);
+	}
 }
 EXPORT_SYMBOL_GPL(kvfree_call_rcu);