diff mbox series

[v2,1/8] rcu: Introduce call_rcu_lazy() API implementation

Message ID 20220622225102.2112026-3-joel@joelfernandes.org (mailing list archive)
State New, archived
Headers show
Series [v2,1/8] rcu: Introduce call_rcu_lazy() API implementation | expand

Commit Message

Joel Fernandes June 22, 2022, 10:50 p.m. UTC
Implement timer-based RCU lazy callback batching. The batch is flushed
whenever a certain amount of time has passed, or the batch on a
particular CPU grows too big. Also memory pressure will flush it in a
future patch.

To handle several corner cases automagically (such as rcu_barrier() and
hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
length has the lazy CB length included in it. A separate lazy CB length
counter is also introduced to keep track of the number of lazy CBs.

Suggested-by: Paul McKenney <paulmck@kernel.org>
Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
---
 include/linux/rcu_segcblist.h |   1 +
 include/linux/rcupdate.h      |   6 ++
 kernel/rcu/Kconfig            |   8 +++
 kernel/rcu/rcu_segcblist.c    |  19 ++++++
 kernel/rcu/rcu_segcblist.h    |  14 ++++
 kernel/rcu/tree.c             |  24 +++++--
 kernel/rcu/tree.h             |  10 +--
 kernel/rcu/tree_nocb.h        | 125 +++++++++++++++++++++++++---------
 8 files changed, 164 insertions(+), 43 deletions(-)

Comments

Joel Fernandes June 22, 2022, 11:18 p.m. UTC | #1
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
[..]
> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> index 2ccf5845957d..fec4fad6654b 100644
> --- a/kernel/rcu/tree.h
> +++ b/kernel/rcu/tree.h
> @@ -267,8 +267,9 @@ struct rcu_data {
>  /* Values for nocb_defer_wakeup field in struct rcu_data. */
>  #define RCU_NOCB_WAKE_NOT	0
>  #define RCU_NOCB_WAKE_BYPASS	1
> -#define RCU_NOCB_WAKE		2
> -#define RCU_NOCB_WAKE_FORCE	3
> +#define RCU_NOCB_WAKE_LAZY	2
> +#define RCU_NOCB_WAKE		3
> +#define RCU_NOCB_WAKE_FORCE	4
>  
>  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
>  					/* For jiffies_till_first_fqs and */
> @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
>  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
>  static void rcu_init_one_nocb(struct rcu_node *rnp);
>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				  unsigned long j);
> +				  unsigned long j, bool lazy);
>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				bool *was_alldone, unsigned long flags);
> +				bool *was_alldone, unsigned long flags,
> +				bool lazy);
>  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
>  				 unsigned long flags);
>  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> index e369efe94fda..b9244f22e102 100644
> --- a/kernel/rcu/tree_nocb.h
> +++ b/kernel/rcu/tree_nocb.h
> @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
>  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
>  }
>  
> +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> +
>  /*
>   * Arrange to wake the GP kthread for this NOCB group at some future
>   * time when it is safe to do so.
> @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>  	 * Bypass wakeup overrides previous deferments. In case
>  	 * of callback storm, no need to wake up too early.
>  	 */
> -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
> +	if (waketype == RCU_NOCB_WAKE_LAZY) {
> +		mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES);
> +		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> +	} else if (waketype == RCU_NOCB_WAKE_BYPASS) {
>  		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
>  		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>  	} else {
> @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>   * Note that this function always returns true if rhp is NULL.
>   */
>  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				     unsigned long j)
> +				     unsigned long j, bool lazy)
>  {
>  	struct rcu_cblist rcl;
>  
> @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>  	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
>  	if (rhp)
>  		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> -	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
> +
> +	trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len));

Before anyone yells at me, that trace_printk() has been politely asked to take
a walk :-). It got mad at me, but on the next iteration, it wont be there.

thanks,

 - Joel
kernel test robot June 23, 2022, 1:38 a.m. UTC | #2
Hi "Joel,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on linus/master]
[also build test WARNING on v5.19-rc3 next-20220622]
[cannot apply to paulmck-rcu/dev]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch]

url:    https://github.com/intel-lab-lkp/linux/commits/Joel-Fernandes-Google/Implement-call_rcu_lazy-and-miscellaneous-fixes/20220623-065447
base:   https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git de5c208d533a46a074eb46ea17f672cc005a7269
config: riscv-nommu_k210_defconfig (https://download.01.org/0day-ci/archive/20220623/202206230916.2YtpR3sO-lkp@intel.com/config)
compiler: riscv64-linux-gcc (GCC) 11.3.0
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # https://github.com/intel-lab-lkp/linux/commit/543ee31928d1cff057320ff64603283a34fe0052
        git remote add linux-review https://github.com/intel-lab-lkp/linux
        git fetch --no-tags linux-review Joel-Fernandes-Google/Implement-call_rcu_lazy-and-miscellaneous-fixes/20220623-065447
        git checkout 543ee31928d1cff057320ff64603283a34fe0052
        # save the config file
        mkdir build_dir && cp config build_dir/.config
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-11.3.0 make.cross W=1 O=build_dir ARCH=riscv SHELL=/bin/bash kernel/rcu/

If you fix the issue, kindly add following tag where applicable
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>):

   kernel/rcu/tree.c:3103: warning: Function parameter or member 'lazy' not described in '__call_rcu_common'
>> kernel/rcu/tree.c:3103: warning: expecting prototype for call_rcu(). Prototype was for __call_rcu_common() instead


vim +3103 kernel/rcu/tree.c

b2b00ddf193bf83 kernel/rcu/tree.c Paul E. McKenney        2019-10-30  3060  
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3061  /**
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3062   * call_rcu() - Queue an RCU callback for invocation after a grace period.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3063   * @head: structure to be used for queueing the RCU updates.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3064   * @func: actual callback function to be invoked after the grace period
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3065   *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3066   * The callback function will be invoked some time after a full grace
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3067   * period elapses, in other words after all pre-existing RCU read-side
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3068   * critical sections have completed.  However, the callback function
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3069   * might well execute concurrently with RCU read-side critical sections
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3070   * that started after call_rcu() was invoked.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3071   *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3072   * RCU read-side critical sections are delimited by rcu_read_lock()
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3073   * and rcu_read_unlock(), and may be nested.  In addition, but only in
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3074   * v5.0 and later, regions of code across which interrupts, preemption,
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3075   * or softirqs have been disabled also serve as RCU read-side critical
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3076   * sections.  This includes hardware interrupt handlers, softirq handlers,
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3077   * and NMI handlers.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3078   *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3079   * Note that all CPUs must agree that the grace period extended beyond
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3080   * all pre-existing RCU read-side critical section.  On systems with more
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3081   * than one CPU, this means that when "func()" is invoked, each CPU is
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3082   * guaranteed to have executed a full memory barrier since the end of its
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3083   * last RCU read-side critical section whose beginning preceded the call
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3084   * to call_rcu().  It also means that each CPU executing an RCU read-side
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3085   * critical section that continues beyond the start of "func()" must have
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3086   * executed a memory barrier after the call_rcu() but before the beginning
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3087   * of that RCU read-side critical section.  Note that these guarantees
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3088   * include CPUs that are offline, idle, or executing in user mode, as
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3089   * well as CPUs that are executing in the kernel.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3090   *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3091   * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3092   * resulting RCU callback function "func()", then both CPU A and CPU B are
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3093   * guaranteed to execute a full memory barrier during the time interval
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3094   * between the call to call_rcu() and the invocation of "func()" -- even
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3095   * if CPU A and CPU B are the same CPU (but again only if the system has
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3096   * more than one CPU).
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3097   *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3098   * Implementation of these memory-ordering guarantees is described here:
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3099   * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3100   */
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google  2022-06-22  3101) static void
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google  2022-06-22  3102) __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18 @3103  {
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney        2020-12-08  3104  	static atomic_t doublefrees;
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3105  	unsigned long flags;
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3106  	struct rcu_data *rdp;
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3107  	bool was_alldone;
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3108  
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney        2016-08-23  3109  	/* Misaligned rcu_head! */
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney        2016-08-23  3110  	WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney        2016-08-23  3111  
ae15018456c44b7 kernel/rcutree.c  Paul E. McKenney        2013-04-23  3112  	if (debug_rcu_head_queue(head)) {
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney        2017-05-03  3113  		/*
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney        2017-05-03  3114  		 * Probable double call_rcu(), so leak the callback.
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney        2017-05-03  3115  		 * Use rcu:rcu_callback trace event to find the previous
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney        2021-12-18  3116  		 * time callback was passed to call_rcu().
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney        2017-05-03  3117  		 */
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney        2020-12-08  3118  		if (atomic_inc_return(&doublefrees) < 4) {
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney        2020-12-08  3119  			pr_err("%s(): Double-freed CB %p->%pS()!!!  ", __func__, head, head->func);
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney        2020-12-08  3120  			mem_dump_obj(head);
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney        2020-12-08  3121  		}
7d0ae8086b82831 kernel/rcu/tree.c Paul E. McKenney        2015-03-03  3122  		WRITE_ONCE(head->func, rcu_leak_callback);
ae15018456c44b7 kernel/rcutree.c  Paul E. McKenney        2013-04-23  3123  		return;
ae15018456c44b7 kernel/rcutree.c  Paul E. McKenney        2013-04-23  3124  	}
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3125  	head->func = func;
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3126  	head->next = NULL;
300c0c5e721834f kernel/rcu/tree.c Jun Miao                2021-11-16  3127  	kasan_record_aux_stack_noalloc(head);
d818cc76e2b4d5f kernel/rcu/tree.c Zqiang                  2021-12-26  3128  	local_irq_save(flags);
da1df50d16171f4 kernel/rcu/tree.c Paul E. McKenney        2018-07-03  3129  	rdp = this_cpu_ptr(&rcu_data);
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3130  
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3131  	/* Add the callback to our list. */
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3132  	if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3133  		// This can trigger due to call_rcu() from offline CPU:
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3134  		WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
34404ca8fb252cc kernel/rcu/tree.c Paul E. McKenney        2015-01-19  3135  		WARN_ON_ONCE(!rcu_is_watching());
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3136  		// Very early boot, before rcu_init().  Initialize if needed
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3137  		// and then drop through to queue the callback.
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney        2017-02-08  3138  		if (rcu_segcblist_empty(&rdp->cblist))
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney        2017-02-08  3139  			rcu_segcblist_init(&rdp->cblist);
143da9c2fc030a5 kernel/rcu/tree.c Paul E. McKenney        2015-01-19  3140  	}
77a40f97030b27b kernel/rcu/tree.c Joel Fernandes (Google  2019-08-30  3141) 
b2b00ddf193bf83 kernel/rcu/tree.c Paul E. McKenney        2019-10-30  3142  	check_cb_ovld(rdp);
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google  2022-06-22  3143) 	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
d1b222c6be1f8bf kernel/rcu/tree.c Paul E. McKenney        2019-07-02  3144  		return; // Enqueued onto ->nocb_bypass, so just leave.
b692dc4adfcff54 kernel/rcu/tree.c Paul E. McKenney        2020-02-11  3145  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
77a40f97030b27b kernel/rcu/tree.c Joel Fernandes (Google  2019-08-30  3146) 	rcu_segcblist_enqueue(&rdp->cblist, head);
c408b215f58f715 kernel/rcu/tree.c Uladzislau Rezki (Sony  2020-05-25  3147) 	if (__is_kvfree_rcu_offset((unsigned long)func))
c408b215f58f715 kernel/rcu/tree.c Uladzislau Rezki (Sony  2020-05-25  3148) 		trace_rcu_kvfree_callback(rcu_state.name, head,
3c779dfef2c4524 kernel/rcu/tree.c Paul E. McKenney        2018-07-05  3149  					 (unsigned long)func,
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney        2017-02-08  3150  					 rcu_segcblist_n_cbs(&rdp->cblist));
d4c08f2ac311a36 kernel/rcutree.c  Paul E. McKenney        2011-06-25  3151  	else
3c779dfef2c4524 kernel/rcu/tree.c Paul E. McKenney        2018-07-05  3152  		trace_rcu_callback(rcu_state.name, head,
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney        2017-02-08  3153  				   rcu_segcblist_n_cbs(&rdp->cblist));
d4c08f2ac311a36 kernel/rcutree.c  Paul E. McKenney        2011-06-25  3154  
3afe7fa535491ec kernel/rcu/tree.c Joel Fernandes (Google  2020-11-14  3155) 	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
3afe7fa535491ec kernel/rcu/tree.c Joel Fernandes (Google  2020-11-14  3156) 
29154c57e35a191 kernel/rcutree.c  Paul E. McKenney        2012-05-30  3157  	/* Go handle any RCU core processing required. */
3820b513a2e33d6 kernel/rcu/tree.c Frederic Weisbecker     2020-11-12  3158  	if (unlikely(rcu_rdp_is_offloaded(rdp))) {
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3159  		__call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3160  	} else {
5c7d89676bc5196 kernel/rcu/tree.c Paul E. McKenney        2018-07-03  3161  		__call_rcu_core(rdp, head, flags);
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3162  		local_irq_restore(flags);
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3163  	}
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney        2019-05-15  3164  }
64db4cfff99c04c kernel/rcutree.c  Paul E. McKenney        2008-12-18  3165
Paul E. McKenney June 26, 2022, 4 a.m. UTC | #3
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> Implement timer-based RCU lazy callback batching. The batch is flushed
> whenever a certain amount of time has passed, or the batch on a
> particular CPU grows too big. Also memory pressure will flush it in a
> future patch.
> 
> To handle several corner cases automagically (such as rcu_barrier() and
> hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
> length has the lazy CB length included in it. A separate lazy CB length
> counter is also introduced to keep track of the number of lazy CBs.
> 
> Suggested-by: Paul McKenney <paulmck@kernel.org>
> Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>

Not bad, but some questions and comments below.

							Thanx, Paul

> ---
>  include/linux/rcu_segcblist.h |   1 +
>  include/linux/rcupdate.h      |   6 ++
>  kernel/rcu/Kconfig            |   8 +++
>  kernel/rcu/rcu_segcblist.c    |  19 ++++++
>  kernel/rcu/rcu_segcblist.h    |  14 ++++
>  kernel/rcu/tree.c             |  24 +++++--
>  kernel/rcu/tree.h             |  10 +--
>  kernel/rcu/tree_nocb.h        | 125 +++++++++++++++++++++++++---------
>  8 files changed, 164 insertions(+), 43 deletions(-)
> 
> diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
> index 659d13a7ddaa..9a992707917b 100644
> --- a/include/linux/rcu_segcblist.h
> +++ b/include/linux/rcu_segcblist.h
> @@ -22,6 +22,7 @@ struct rcu_cblist {
>  	struct rcu_head *head;
>  	struct rcu_head **tail;
>  	long len;
> +	long lazy_len;
>  };
>  
>  #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> index 1a32036c918c..9191a3d88087 100644
> --- a/include/linux/rcupdate.h
> +++ b/include/linux/rcupdate.h
> @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void)
>  
>  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
>  
> +#ifdef CONFIG_RCU_LAZY
> +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func);
> +#else
> +#define call_rcu_lazy(head, func) call_rcu(head, func)
> +#endif
> +
>  /* Internal to kernel */
>  void rcu_init(void);
>  extern int rcu_scheduler_active;
> diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
> index 27aab870ae4c..0bffa992fdc4 100644
> --- a/kernel/rcu/Kconfig
> +++ b/kernel/rcu/Kconfig
> @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB
>  	  Say N here if you hate read-side memory barriers.
>  	  Take the default if you are unsure.
>  
> +config RCU_LAZY
> +	bool "RCU callback lazy invocation functionality"
> +	depends on RCU_NOCB_CPU
> +	default n
> +	help
> +	  To save power, batch RCU callbacks and flush after delay, memory
> +          pressure or callback list growing too big.

Spaces vs. tabs.

The checkpatch warning is unhelpful ("please write a help paragraph that
fully describes the config symbol"), but please fix the whitespace if
you have not already done so.

>  endmenu # "RCU Subsystem"
> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> index c54ea2b6a36b..627a3218a372 100644
> --- a/kernel/rcu/rcu_segcblist.c
> +++ b/kernel/rcu/rcu_segcblist.c
> @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp)
>  	rclp->head = NULL;
>  	rclp->tail = &rclp->head;
>  	rclp->len = 0;
> +	rclp->lazy_len = 0;
>  }
>  
>  /*
> @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
>  	WRITE_ONCE(rclp->len, rclp->len + 1);
>  }
>  
> +/*
> + * Enqueue an rcu_head structure onto the specified callback list.

Please also note the fact that it is enqueuing lazily.

> + */
> +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp)
> +{
> +	rcu_cblist_enqueue(rclp, rhp);
> +	WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1);

Except...  Why not just add a "lazy" parameter to rcu_cblist_enqueue()?
IS_ENABLED() can make it fast.

> +}
> +
>  /*
>   * Flush the second rcu_cblist structure onto the first one, obliterating
>   * any contents of the first.  If rhp is non-NULL, enqueue it as the sole
> @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
>  	}
>  }

Header comment, please.  It can be short, referring to that of the
function rcu_cblist_flush_enqueue().

> +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> +			      struct rcu_cblist *srclp,
> +			      struct rcu_head *rhp)

Please line up the "struct" keywords.  (Picky, I know...)

> +{
> +	rcu_cblist_flush_enqueue(drclp, srclp, rhp);
> +	if (rhp)
> +		WRITE_ONCE(srclp->lazy_len, 1);

Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()?
Concerns about speed in the !RCU_LAZY case can be addressed using
IS_ENABLED(), for example:

	if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp)
		WRITE_ONCE(srclp->lazy_len, 1);

> +}
> +
>  /*
>   * Dequeue the oldest rcu_head structure from the specified callback
>   * list.
> diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> index 431cee212467..c3d7de65b689 100644
> --- a/kernel/rcu/rcu_segcblist.h
> +++ b/kernel/rcu/rcu_segcblist.h
> @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
>  	return READ_ONCE(rclp->len);
>  }
>  
> +/* Return number of callbacks in the specified callback list. */
> +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
> +{
> +#ifdef CONFIG_RCU_LAZY
> +	return READ_ONCE(rclp->lazy_len);
> +#else
> +	return 0;
> +#endif

Please use IS_ENABLED().  This saves a line (and lots of characters)
but compiles just as efficienctly.

> +}
> +
>  /* Return number of callbacks in segmented callback list by summing seglen. */
>  long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
>  
>  void rcu_cblist_init(struct rcu_cblist *rclp);
>  void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
> +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp);
>  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
>  			      struct rcu_cblist *srclp,
>  			      struct rcu_head *rhp);
> +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> +			      struct rcu_cblist *srclp,
> +			      struct rcu_head *rhp);

Please line up the "struct" keywords.  (Still picky, I know...)

> +{
> +	rcu_cblist_flush_enqueue(drclp, srclp, rhp);
> +	if (rhp)
> +		WRITE_ONCE(srclp->lazy_len, 1);
> +}
> +
>  /*
>   * Dequeue the oldest rcu_head structure from the specified callback
>   * list.
> diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> index 431cee212467..c3d7de65b689 100644
> --- a/kernel/rcu/rcu_segcblist.h
> +++ b/kernel/rcu/rcu_segcblist.h
> @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
>  	return READ_ONCE(rclp->len);
>  }
>  
> +/* Return number of callbacks in the specified callback list. */
> +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
> +{
> +#ifdef CONFIG_RCU_LAZY
> +	return READ_ONCE(rclp->lazy_len);
> +#else
> +	return 0;
> +#endif

Please use IS_ENABLED().  This saves a line (and lots of characters)
but compiles just as efficienctly.

> +}
> +
>  /* Return number of callbacks in segmented callback list by summing seglen. */
>  long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
>  
>  void rcu_cblist_init(struct rcu_cblist *rclp);
>  void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
> +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp);
>  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
>  			      struct rcu_cblist *srclp,
>  			      struct rcu_head *rhp);
> +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> +			      struct rcu_cblist *srclp,
> +			      struct rcu_head *rhp);

Please line up the "struct" keywords.  (Even more pickiness, I know...)

>  struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
>  
>  /*
> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> index c25ba442044a..d2e3d6e176d2 100644
> --- a/kernel/rcu/tree.c
> +++ b/kernel/rcu/tree.c
> @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
>   * Implementation of these memory-ordering guarantees is described here:
>   * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
>   */

The above docbook comment needs to move to call_rcu().

> -void call_rcu(struct rcu_head *head, rcu_callback_t func)
> +static void
> +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
>  {
>  	static atomic_t doublefrees;
>  	unsigned long flags;
> @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>  	}
>  
>  	check_cb_ovld(rdp);
> -	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
> +	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
>  		return; // Enqueued onto ->nocb_bypass, so just leave.
>  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
>  	rcu_segcblist_enqueue(&rdp->cblist, head);
> @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>  		local_irq_restore(flags);
>  	}
>  }
> -EXPORT_SYMBOL_GPL(call_rcu);

Please add a docbook comment for call_rcu_lazy().  It can be brief, for
example, by referring to call_rcu()'s docbook comment for memory-ordering
details.

> +#ifdef CONFIG_RCU_LAZY
> +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
> +{
> +	return __call_rcu_common(head, func, true);
> +}
> +EXPORT_SYMBOL_GPL(call_rcu_lazy);
> +#endif
> +
> +void call_rcu(struct rcu_head *head, rcu_callback_t func)
> +{
> +	return __call_rcu_common(head, func, false);
> +
> +}
> +EXPORT_SYMBOL_GPL(call_rcu);
>  
>  /* Maximum number of jiffies to wait before draining a batch. */
>  #define KFREE_DRAIN_JIFFIES (HZ / 50)
> @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
>  	rdp->barrier_head.func = rcu_barrier_callback;
>  	debug_rcu_head_queue(&rdp->barrier_head);
>  	rcu_nocb_lock(rdp);
> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
>  	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
>  		atomic_inc(&rcu_state.barrier_cpu_count);
>  	} else {
> @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu)
>  	my_rdp = this_cpu_ptr(&rcu_data);
>  	my_rnp = my_rdp->mynode;
>  	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
>  	raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
>  	/* Leverage recent GPs and set GP for new callbacks. */
>  	needwake = rcu_advance_cbs(my_rnp, rdp) ||
> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> index 2ccf5845957d..fec4fad6654b 100644
> --- a/kernel/rcu/tree.h
> +++ b/kernel/rcu/tree.h
> @@ -267,8 +267,9 @@ struct rcu_data {
>  /* Values for nocb_defer_wakeup field in struct rcu_data. */
>  #define RCU_NOCB_WAKE_NOT	0
>  #define RCU_NOCB_WAKE_BYPASS	1
> -#define RCU_NOCB_WAKE		2
> -#define RCU_NOCB_WAKE_FORCE	3
> +#define RCU_NOCB_WAKE_LAZY	2
> +#define RCU_NOCB_WAKE		3
> +#define RCU_NOCB_WAKE_FORCE	4
>  
>  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
>  					/* For jiffies_till_first_fqs and */
> @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
>  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
>  static void rcu_init_one_nocb(struct rcu_node *rnp);
>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				  unsigned long j);
> +				  unsigned long j, bool lazy);
>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				bool *was_alldone, unsigned long flags);
> +				bool *was_alldone, unsigned long flags,
> +				bool lazy);
>  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
>  				 unsigned long flags);
>  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> index e369efe94fda..b9244f22e102 100644
> --- a/kernel/rcu/tree_nocb.h
> +++ b/kernel/rcu/tree_nocb.h
> @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
>  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
>  }

Comment on LAZY_FLUSH_JIFFIES purpose in life, please!  (At some point
more flexibility may be required, but let's not unnecessarily rush
into that.)

> +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> +
>  /*
>   * Arrange to wake the GP kthread for this NOCB group at some future
>   * time when it is safe to do so.
> @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>  	 * Bypass wakeup overrides previous deferments. In case
>  	 * of callback storm, no need to wake up too early.
>  	 */
> -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
> +	if (waketype == RCU_NOCB_WAKE_LAZY) {

Presumably we get here only if all of this CPU's callbacks are lazy?

> +		mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES);
> +		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> +	} else if (waketype == RCU_NOCB_WAKE_BYPASS) {
>  		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
>  		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>  	} else {
> @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>   * Note that this function always returns true if rhp is NULL.
>   */
>  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				     unsigned long j)
> +				     unsigned long j, bool lazy)
>  {
>  	struct rcu_cblist rcl;
>  
> @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>  	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
>  	if (rhp)
>  		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> -	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
> +
> +	trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len));

This debug code need to go, of course.  If you would like, you could
replace it with a trace event.

> +	/* The lazy CBs are being flushed, but a new one might be enqueued. */
> +	if (lazy)
> +		rcu_cblist_flush_enqueue_lazy(&rcl, &rdp->nocb_bypass, rhp);
> +	else
> +		rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);

Shouldn't these be a single function with a "lazy" argument, as noted
earlier?

>  	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
>  	WRITE_ONCE(rdp->nocb_bypass_first, j);
>  	rcu_nocb_bypass_unlock(rdp);
> @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>   * Note that this function always returns true if rhp is NULL.
>   */
>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				  unsigned long j)
> +				  unsigned long j, bool lazy)
>  {
>  	if (!rcu_rdp_is_offloaded(rdp))
>  		return true;
>  	rcu_lockdep_assert_cblist_protected(rdp);
>  	rcu_nocb_bypass_lock(rdp);
> -	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
> +	return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
>  }
>  
>  /*
> @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>  	if (!rcu_rdp_is_offloaded(rdp) ||
>  	    !rcu_nocb_bypass_trylock(rdp))
>  		return;
> -	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
> +	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
>  }
>  
>  /*
> @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>   * there is only one CPU in operation.
>   */
>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				bool *was_alldone, unsigned long flags)
> +				bool *was_alldone, unsigned long flags,
> +				bool lazy)
>  {
>  	unsigned long c;
>  	unsigned long cur_gp_seq;
>  	unsigned long j = jiffies;
>  	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> +	long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
>  
>  	lockdep_assert_irqs_disabled();
>  
> @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>  	}
>  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
>  
> -	// If there hasn't yet been all that many ->cblist enqueues
> -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> -	// ->nocb_bypass first.
> -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> +	// If caller passed a non-lazy CB and there hasn't yet been all that
> +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> +	// number of CBs (lazy + non-lazy) grows too much.
> +	//
> +	// Note that if the bypass list has lazy CBs, and the main list is
> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> +	// the lazy CBs to the main list as well. That's the right thing to do,
> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> +	// one, we can just reuse that GP for the already queued-up lazy ones.
> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> +	    (lazy && n_lazy_cbs >= qhimark)) {
>  		rcu_nocb_lock(rdp);
>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>  		if (*was_alldone)
>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> -					    TPS("FirstQ"));
> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));

The "false" here instead of "lazy" is because the caller is to do the
enqueuing, correct?

>  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>  		return false; // Caller must enqueue the callback.
>  	}
>  
>  	// If ->nocb_bypass has been used too long or is too full,
>  	// flush ->nocb_bypass to ->cblist.
> -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> -	    ncbs >= qhimark) {
> +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
>  		rcu_nocb_lock(rdp);
> -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
> +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {

But shouldn't this "true" be "lazy"?  I don't see how we are guaranteed
that the callback is in fact lazy at this point in the code.  Also,
there is not yet a guarantee that the caller will do the enqueuing.
So what am I missing?

>  			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>  			if (*was_alldone)
>  				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> -						    TPS("FirstQ"));
> +						    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
>  			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>  			return false; // Caller must enqueue the callback.
>  		}
> @@ -455,12 +475,20 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>  	rcu_nocb_wait_contended(rdp);
>  	rcu_nocb_bypass_lock(rdp);
>  	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> +	n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
>  	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> -	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
> +	if (lazy)
> +		rcu_cblist_enqueue_lazy(&rdp->nocb_bypass, rhp);
> +	else
> +		rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);

And this is one reason to add the "lazy" parameter to rcu_cblist_enqueue().

>  	if (!ncbs) {
>  		WRITE_ONCE(rdp->nocb_bypass_first, j);
> -		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
> +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> +				    lazy ? TPS("FirstLazyBQ") : TPS("FirstBQ"));
> +	} else if (!n_lazy_cbs && lazy) {
> +		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstLazyBQ"));

But in this case, there already are callbacks.  In that case, how does it
help to keep track of the laziness of this callback?

In fact, aren't the only meaningful states "empty", "all lazy", and
"at least one non-lazy callback"?  After all, if you have at least one
non-lazy callback, it needs to be business as usual, correct?

>  	}
> +
>  	rcu_nocb_bypass_unlock(rdp);
>  	smp_mb(); /* Order enqueue before wake. */
>  	if (ncbs) {
> @@ -493,7 +521,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>  {
>  	unsigned long cur_gp_seq;
>  	unsigned long j;
> -	long len;
> +	long len, lazy_len, bypass_len;
>  	struct task_struct *t;
>  
>  	// If we are being polled or there is no kthread, just leave.
> @@ -506,9 +534,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>  	}
>  	// Need to actually to a wakeup.
>  	len = rcu_segcblist_n_cbs(&rdp->cblist);
> +	bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> +	lazy_len = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
>  	if (was_alldone) {
>  		rdp->qlen_last_fqs_check = len;
> -		if (!irqs_disabled_flags(flags)) {
> +		// Only lazy CBs in bypass list
> +		if (lazy_len && bypass_len == lazy_len) {

And this is one piece of evidence -- you are only checking for all
callbacks being lazy.  So do you really need to count them, as
opposed to note that all are lazy on the one hand or some are
non-lazy on the other?

> +			rcu_nocb_unlock_irqrestore(rdp, flags);
> +			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
> +					   TPS("WakeLazy"));
> +		} else if (!irqs_disabled_flags(flags)) {
>  			/* ... if queue was empty ... */
>  			rcu_nocb_unlock_irqrestore(rdp, flags);
>  			wake_nocb_gp(rdp, false);
> @@ -599,8 +634,8 @@ static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
>   */
>  static void nocb_gp_wait(struct rcu_data *my_rdp)
>  {
> -	bool bypass = false;
> -	long bypass_ncbs;
> +	bool bypass = false, lazy = false;
> +	long bypass_ncbs, lazy_ncbs;
>  	int __maybe_unused cpu = my_rdp->cpu;
>  	unsigned long cur_gp_seq;
>  	unsigned long flags;
> @@ -648,12 +683,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>  			continue;
>  		}
>  		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> -		if (bypass_ncbs &&
> +		lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
> +		if (lazy_ncbs &&

This one works either way.  The current approach works because the
timeout is longer.  If you have exceeded the lazy timeout, you don't
care that there are non-lazy callbacks queued.

> +		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + LAZY_FLUSH_JIFFIES) ||
> +		     bypass_ncbs > qhimark)) {
> +			// Bypass full or old, so flush it.
> +			(void)rcu_nocb_try_flush_bypass(rdp, j);
> +			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> +			lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
> +		} else if (bypass_ncbs &&
>  		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
>  		     bypass_ncbs > 2 * qhimark)) {
>  			// Bypass full or old, so flush it.
>  			(void)rcu_nocb_try_flush_bypass(rdp, j);
>  			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> +			lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);

And these two "if" bodies are identical, correct?  Can they be merged?

>  		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
>  			rcu_nocb_unlock_irqrestore(rdp, flags);
>  			if (needwake_state)
> @@ -662,8 +706,11 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>  		}
>  		if (bypass_ncbs) {
>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> -					    TPS("Bypass"));
> -			bypass = true;
> +				    bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
> +			if (bypass_ncbs == lazy_ncbs)
> +				lazy = true;
> +			else
> +				bypass = true;

Another place where a lazy flag rather than count would suit.

>  		}
>  		rnp = rdp->mynode;
>  
> @@ -713,12 +760,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>  	my_rdp->nocb_gp_gp = needwait_gp;
>  	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
>  
> -	if (bypass && !rcu_nocb_poll) {
> -		// At least one child with non-empty ->nocb_bypass, so set
> -		// timer in order to avoid stranding its callbacks.
> -		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
> -				   TPS("WakeBypassIsDeferred"));
> +	// At least one child with non-empty ->nocb_bypass, so set
> +	// timer in order to avoid stranding its callbacks.
> +	if (!rcu_nocb_poll) {
> +		// If bypass list only has lazy CBs. Add a deferred
> +		// lazy wake up.
> +		if (lazy && !bypass) {
> +			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
> +					TPS("WakeLazyIsDeferred"));
> +		// Otherwise add a deferred bypass wake up.
> +		} else if (bypass) {
> +			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
> +					TPS("WakeBypassIsDeferred"));
> +		}
>  	}
> +
>  	if (rcu_nocb_poll) {
>  		/* Polling, so trace if first poll in the series. */
>  		if (gotcbs)
> @@ -999,7 +1055,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
>  	 * return false, which means that future calls to rcu_nocb_try_bypass()
>  	 * will refuse to put anything into the bypass.
>  	 */
> -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
>  	/*
>  	 * Start with invoking rcu_core() early. This way if the current thread
>  	 * happens to preempt an ongoing call to rcu_core() in the middle,
> @@ -1500,13 +1556,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
>  }
>  
>  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				  unsigned long j)
> +				  unsigned long j, bool lazy)
>  {
>  	return true;
>  }
>  
>  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> -				bool *was_alldone, unsigned long flags)
> +				bool *was_alldone, unsigned long flags,
> +				bool lazy)
>  {
>  	return false;
>  }
> -- 
> 2.37.0.rc0.104.g0611611a94-goog
>
Paul E. McKenney June 26, 2022, 4 a.m. UTC | #4
On Wed, Jun 22, 2022 at 11:18:02PM +0000, Joel Fernandes wrote:
> On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> [..]
> > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> > index 2ccf5845957d..fec4fad6654b 100644
> > --- a/kernel/rcu/tree.h
> > +++ b/kernel/rcu/tree.h
> > @@ -267,8 +267,9 @@ struct rcu_data {
> >  /* Values for nocb_defer_wakeup field in struct rcu_data. */
> >  #define RCU_NOCB_WAKE_NOT	0
> >  #define RCU_NOCB_WAKE_BYPASS	1
> > -#define RCU_NOCB_WAKE		2
> > -#define RCU_NOCB_WAKE_FORCE	3
> > +#define RCU_NOCB_WAKE_LAZY	2
> > +#define RCU_NOCB_WAKE		3
> > +#define RCU_NOCB_WAKE_FORCE	4
> >  
> >  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
> >  					/* For jiffies_till_first_fqs and */
> > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> >  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> >  static void rcu_init_one_nocb(struct rcu_node *rnp);
> >  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				  unsigned long j);
> > +				  unsigned long j, bool lazy);
> >  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				bool *was_alldone, unsigned long flags);
> > +				bool *was_alldone, unsigned long flags,
> > +				bool lazy);
> >  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
> >  				 unsigned long flags);
> >  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> > index e369efe94fda..b9244f22e102 100644
> > --- a/kernel/rcu/tree_nocb.h
> > +++ b/kernel/rcu/tree_nocb.h
> > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
> >  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
> >  }
> >  
> > +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> > +
> >  /*
> >   * Arrange to wake the GP kthread for this NOCB group at some future
> >   * time when it is safe to do so.
> > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> >  	 * Bypass wakeup overrides previous deferments. In case
> >  	 * of callback storm, no need to wake up too early.
> >  	 */
> > -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
> > +	if (waketype == RCU_NOCB_WAKE_LAZY) {
> > +		mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES);
> > +		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> > +	} else if (waketype == RCU_NOCB_WAKE_BYPASS) {
> >  		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
> >  		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
> >  	} else {
> > @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> >   * Note that this function always returns true if rhp is NULL.
> >   */
> >  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				     unsigned long j)
> > +				     unsigned long j, bool lazy)
> >  {
> >  	struct rcu_cblist rcl;
> >  
> > @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >  	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
> >  	if (rhp)
> >  		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
> > -	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
> > +
> > +	trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len));
> 
> Before anyone yells at me, that trace_printk() has been politely asked to take
> a walk :-). It got mad at me, but on the next iteration, it wont be there.

;-) ;-) ;-)

							Thanx, Paul
Frederic Weisbecker June 29, 2022, 11:53 a.m. UTC | #5
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>  	}
>  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
>  
> -	// If there hasn't yet been all that many ->cblist enqueues
> -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> -	// ->nocb_bypass first.
> -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> +	// If caller passed a non-lazy CB and there hasn't yet been all that
> +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> +	// number of CBs (lazy + non-lazy) grows too much.
> +	//
> +	// Note that if the bypass list has lazy CBs, and the main list is
> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> +	// the lazy CBs to the main list as well. That's the right thing to do,
> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> +	// one, we can just reuse that GP for the already queued-up lazy ones.
> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> +	    (lazy && n_lazy_cbs >= qhimark)) {
>  		rcu_nocb_lock(rdp);
>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>  		if (*was_alldone)
>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> -					    TPS("FirstQ"));
> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));

That's outside the scope of this patchset but this makes me realize we
unconditionally try to flush the bypass from call_rcu() fastpath, and
therefore we unconditionally lock the bypass lock from call_rcu() fastpath.

It shouldn't be contended at this stage since we are holding the nocb_lock
already, and only the local CPU can hold the nocb_bypass_lock without holding
the nocb_lock. But still...

It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass))
before doing anything. Only the local CPU can enqueue to the bypass list.

Adding that to my TODO list...
Paul E. McKenney June 29, 2022, 5:05 p.m. UTC | #6
On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote:
> On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >  	}
> >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> >  
> > -	// If there hasn't yet been all that many ->cblist enqueues
> > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > -	// ->nocb_bypass first.
> > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > +	// number of CBs (lazy + non-lazy) grows too much.
> > +	//
> > +	// Note that if the bypass list has lazy CBs, and the main list is
> > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > +	    (lazy && n_lazy_cbs >= qhimark)) {
> >  		rcu_nocb_lock(rdp);
> >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >  		if (*was_alldone)
> >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > -					    TPS("FirstQ"));
> > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> 
> That's outside the scope of this patchset but this makes me realize we
> unconditionally try to flush the bypass from call_rcu() fastpath, and
> therefore we unconditionally lock the bypass lock from call_rcu() fastpath.
> 
> It shouldn't be contended at this stage since we are holding the nocb_lock
> already, and only the local CPU can hold the nocb_bypass_lock without holding
> the nocb_lock. But still...
> 
> It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass))
> before doing anything. Only the local CPU can enqueue to the bypass list.
> 
> Adding that to my TODO list...

That does sound like a potentially very helpful approach!  As always,
please analyze and test carefully!

							Thanx, Paul
Joel Fernandes June 29, 2022, 8:29 p.m. UTC | #7
On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote:
> On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >  	}
> >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> >  
> > -	// If there hasn't yet been all that many ->cblist enqueues
> > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > -	// ->nocb_bypass first.
> > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > +	// number of CBs (lazy + non-lazy) grows too much.
> > +	//
> > +	// Note that if the bypass list has lazy CBs, and the main list is
> > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > +	    (lazy && n_lazy_cbs >= qhimark)) {
> >  		rcu_nocb_lock(rdp);
> >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >  		if (*was_alldone)
> >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > -					    TPS("FirstQ"));
> > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> 
> That's outside the scope of this patchset but this makes me realize we
> unconditionally try to flush the bypass from call_rcu() fastpath, and
> therefore we unconditionally lock the bypass lock from call_rcu() fastpath.
> 
> It shouldn't be contended at this stage since we are holding the nocb_lock
> already, and only the local CPU can hold the nocb_bypass_lock without holding
> the nocb_lock. But still...
> 
> It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass))
> before doing anything. Only the local CPU can enqueue to the bypass list.
> 
> Adding that to my TODO list...
> 

I am afraid I did not understand your comment. The bypass list lock is held
once we have decided to use the bypass list to queue something on to it.

The bypass flushing is also conditional on either the bypass cblist growing
too big or a jiffie elapsing since the first bypass queue.

So in both cases, acquiring the lock is conditional. What do you mean it is
unconditionally acquiring the bypass lock? Where?

Thanks!

 - Joel
Frederic Weisbecker June 29, 2022, 10:01 p.m. UTC | #8
On Wed, Jun 29, 2022 at 08:29:48PM +0000, Joel Fernandes wrote:
> On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote:
> > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > >  	}
> > >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> > >  
> > > -	// If there hasn't yet been all that many ->cblist enqueues
> > > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > > -	// ->nocb_bypass first.
> > > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > > +	// number of CBs (lazy + non-lazy) grows too much.
> > > +	//
> > > +	// Note that if the bypass list has lazy CBs, and the main list is
> > > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > > +	    (lazy && n_lazy_cbs >= qhimark)) {
> > >  		rcu_nocb_lock(rdp);
> > >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> > >  		if (*was_alldone)
> > >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > > -					    TPS("FirstQ"));
> > > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> > 
> > That's outside the scope of this patchset but this makes me realize we
> > unconditionally try to flush the bypass from call_rcu() fastpath, and
> > therefore we unconditionally lock the bypass lock from call_rcu() fastpath.
> > 
> > It shouldn't be contended at this stage since we are holding the nocb_lock
> > already, and only the local CPU can hold the nocb_bypass_lock without holding
> > the nocb_lock. But still...
> > 
> > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass))
> > before doing anything. Only the local CPU can enqueue to the bypass list.
> > 
> > Adding that to my TODO list...
> > 
> 
> I am afraid I did not understand your comment. The bypass list lock is held
> once we have decided to use the bypass list to queue something on to it.
> 
> The bypass flushing is also conditional on either the bypass cblist growing
> too big or a jiffie elapsing since the first bypass queue.
> 
> So in both cases, acquiring the lock is conditional. What do you mean it is
> unconditionally acquiring the bypass lock? Where?

Just to make sure we are talking about the same thing, I'm referring to this
path:

	// If there hasn't yet been all that many ->cblist enqueues
	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
	// ->nocb_bypass first.
	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
		rcu_nocb_lock(rdp);
		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
		if (*was_alldone)
			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
					    TPS("FirstQ"));
		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
		return false; // Caller must enqueue the callback.
	}

This is called whenever we decide not to queue to the bypass list because
there is no flooding detected (rdp->nocb_nobypass_count hasn't reached
nocb_nobypass_lim_per_jiffy for the current jiffy). I call this the fast path
because this is what I would except in a normal load, as opposed to callbacks
flooding.

And in this fastpath, the above rcu_nocb_flush_bypass() is unconditional.

> 
> Thanks!
> 
>  - Joel
>
Joel Fernandes June 30, 2022, 2:08 p.m. UTC | #9
On Thu, Jun 30, 2022 at 12:01:14AM +0200, Frederic Weisbecker wrote:
> On Wed, Jun 29, 2022 at 08:29:48PM +0000, Joel Fernandes wrote:
> > On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote:
> > > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > > >  	}
> > > >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> > > >  
> > > > -	// If there hasn't yet been all that many ->cblist enqueues
> > > > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > > > -	// ->nocb_bypass first.
> > > > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > > > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > > > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > > > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > > > +	// number of CBs (lazy + non-lazy) grows too much.
> > > > +	//
> > > > +	// Note that if the bypass list has lazy CBs, and the main list is
> > > > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > > > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > > > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > > > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > > > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > > > +	    (lazy && n_lazy_cbs >= qhimark)) {
> > > >  		rcu_nocb_lock(rdp);
> > > >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> > > >  		if (*was_alldone)
> > > >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > > > -					    TPS("FirstQ"));
> > > > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > > > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > > > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> > > 
> > > That's outside the scope of this patchset but this makes me realize we
> > > unconditionally try to flush the bypass from call_rcu() fastpath, and
> > > therefore we unconditionally lock the bypass lock from call_rcu() fastpath.
> > > 
> > > It shouldn't be contended at this stage since we are holding the nocb_lock
> > > already, and only the local CPU can hold the nocb_bypass_lock without holding
> > > the nocb_lock. But still...
> > > 
> > > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass))
> > > before doing anything. Only the local CPU can enqueue to the bypass list.
> > > 
> > > Adding that to my TODO list...
> > > 
> > 
> > I am afraid I did not understand your comment. The bypass list lock is held
> > once we have decided to use the bypass list to queue something on to it.
> > 
> > The bypass flushing is also conditional on either the bypass cblist growing
> > too big or a jiffie elapsing since the first bypass queue.
> > 
> > So in both cases, acquiring the lock is conditional. What do you mean it is
> > unconditionally acquiring the bypass lock? Where?
> 
> Just to make sure we are talking about the same thing, I'm referring to this
> path:
> 
> 	// If there hasn't yet been all that many ->cblist enqueues
> 	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> 	// ->nocb_bypass first.
> 	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> 		rcu_nocb_lock(rdp);
> 		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> 		if (*was_alldone)
> 			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> 					    TPS("FirstQ"));
> 		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> 		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> 		return false; // Caller must enqueue the callback.
> 	}
> 
> This is called whenever we decide not to queue to the bypass list because
> there is no flooding detected (rdp->nocb_nobypass_count hasn't reached
> nocb_nobypass_lim_per_jiffy for the current jiffy). I call this the fast path
> because this is what I would except in a normal load, as opposed to callbacks
> flooding.
> 
> And in this fastpath, the above rcu_nocb_flush_bypass() is unconditional.

Sorry you are right, I see that now.

Another reason for why the contention is probably not a big deal (other than
the nocb lock being held), is that all other callers of the flush appear to
be in slow paths except for this one. Unless someone is offloading/deoffloading
rapidly or something :)

thanks,

 - Joel
Joel Fernandes July 8, 2022, 6:43 p.m. UTC | #10
On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote:
> On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > Implement timer-based RCU lazy callback batching. The batch is flushed
> > whenever a certain amount of time has passed, or the batch on a
> > particular CPU grows too big. Also memory pressure will flush it in a
> > future patch.
> > 
> > To handle several corner cases automagically (such as rcu_barrier() and
> > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
> > length has the lazy CB length included in it. A separate lazy CB length
> > counter is also introduced to keep track of the number of lazy CBs.
> > 
> > Suggested-by: Paul McKenney <paulmck@kernel.org>
> > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> 
> Not bad, but some questions and comments below.

Thanks a lot for these, real helpful and I replied below:

> > diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
> > index 659d13a7ddaa..9a992707917b 100644
> > --- a/include/linux/rcu_segcblist.h
> > +++ b/include/linux/rcu_segcblist.h
> > @@ -22,6 +22,7 @@ struct rcu_cblist {
> >  	struct rcu_head *head;
> >  	struct rcu_head **tail;
> >  	long len;
> > +	long lazy_len;
> >  };
> >  
> >  #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
> > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> > index 1a32036c918c..9191a3d88087 100644
> > --- a/include/linux/rcupdate.h
> > +++ b/include/linux/rcupdate.h
> > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void)
> >  
> >  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
> >  
> > +#ifdef CONFIG_RCU_LAZY
> > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func);
> > +#else
> > +#define call_rcu_lazy(head, func) call_rcu(head, func)
> > +#endif
> > +
> >  /* Internal to kernel */
> >  void rcu_init(void);
> >  extern int rcu_scheduler_active;
> > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
> > index 27aab870ae4c..0bffa992fdc4 100644
> > --- a/kernel/rcu/Kconfig
> > +++ b/kernel/rcu/Kconfig
> > @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB
> >  	  Say N here if you hate read-side memory barriers.
> >  	  Take the default if you are unsure.
> >  
> > +config RCU_LAZY
> > +	bool "RCU callback lazy invocation functionality"
> > +	depends on RCU_NOCB_CPU
> > +	default n
> > +	help
> > +	  To save power, batch RCU callbacks and flush after delay, memory
> > +          pressure or callback list growing too big.
> 
> Spaces vs. tabs.

Fixed, thanks.

> The checkpatch warning is unhelpful ("please write a help paragraph that
> fully describes the config symbol")

Good old checkpatch :D

> >  endmenu # "RCU Subsystem"
> > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> > index c54ea2b6a36b..627a3218a372 100644
> > --- a/kernel/rcu/rcu_segcblist.c
> > +++ b/kernel/rcu/rcu_segcblist.c
> > @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp)
> >  	rclp->head = NULL;
> >  	rclp->tail = &rclp->head;
> >  	rclp->len = 0;
> > +	rclp->lazy_len = 0;
> >  }
> >  
> >  /*
> > @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
> >  	WRITE_ONCE(rclp->len, rclp->len + 1);
> >  }
> >  
> > +/*
> > + * Enqueue an rcu_head structure onto the specified callback list.
> 
> Please also note the fact that it is enqueuing lazily.

Sorry, done.

> > + */
> > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp)
> > +{
> > +	rcu_cblist_enqueue(rclp, rhp);
> > +	WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1);
> 
> Except...  Why not just add a "lazy" parameter to rcu_cblist_enqueue()?
> IS_ENABLED() can make it fast.

Yeah good idea, it simplifies the code too. Thank you!

So you mean I should add in this function so that the branch gets optimized:
if (lazy && IS_ENABLE(CONFIG_RCU_LAZY)) {
  ...
}

That makes total sense considering the compiler may otherwise not be able to
optimize the function viewing just the individual translation unit. I fixed
it.

The 6 month old baby and wife are calling my attention now. I will continue
to reply to the other parts of this and other emails this evening and thanks
for your help!

thanks,

 - Joel
Paul E. McKenney July 8, 2022, 11:10 p.m. UTC | #11
On Fri, Jul 08, 2022 at 06:43:21PM +0000, Joel Fernandes wrote:
> On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote:
> > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote:
> > > Implement timer-based RCU lazy callback batching. The batch is flushed
> > > whenever a certain amount of time has passed, or the batch on a
> > > particular CPU grows too big. Also memory pressure will flush it in a
> > > future patch.
> > > 
> > > To handle several corner cases automagically (such as rcu_barrier() and
> > > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
> > > length has the lazy CB length included in it. A separate lazy CB length
> > > counter is also introduced to keep track of the number of lazy CBs.
> > > 
> > > Suggested-by: Paul McKenney <paulmck@kernel.org>
> > > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
> > 
> > Not bad, but some questions and comments below.
> 
> Thanks a lot for these, real helpful and I replied below:
> 
> > > diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
> > > index 659d13a7ddaa..9a992707917b 100644
> > > --- a/include/linux/rcu_segcblist.h
> > > +++ b/include/linux/rcu_segcblist.h
> > > @@ -22,6 +22,7 @@ struct rcu_cblist {
> > >  	struct rcu_head *head;
> > >  	struct rcu_head **tail;
> > >  	long len;
> > > +	long lazy_len;
> > >  };
> > >  
> > >  #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
> > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> > > index 1a32036c918c..9191a3d88087 100644
> > > --- a/include/linux/rcupdate.h
> > > +++ b/include/linux/rcupdate.h
> > > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void)
> > >  
> > >  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
> > >  
> > > +#ifdef CONFIG_RCU_LAZY
> > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func);
> > > +#else
> > > +#define call_rcu_lazy(head, func) call_rcu(head, func)
> > > +#endif
> > > +
> > >  /* Internal to kernel */
> > >  void rcu_init(void);
> > >  extern int rcu_scheduler_active;
> > > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
> > > index 27aab870ae4c..0bffa992fdc4 100644
> > > --- a/kernel/rcu/Kconfig
> > > +++ b/kernel/rcu/Kconfig
> > > @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB
> > >  	  Say N here if you hate read-side memory barriers.
> > >  	  Take the default if you are unsure.
> > >  
> > > +config RCU_LAZY
> > > +	bool "RCU callback lazy invocation functionality"
> > > +	depends on RCU_NOCB_CPU
> > > +	default n
> > > +	help
> > > +	  To save power, batch RCU callbacks and flush after delay, memory
> > > +          pressure or callback list growing too big.
> > 
> > Spaces vs. tabs.
> 
> Fixed, thanks.
> 
> > The checkpatch warning is unhelpful ("please write a help paragraph that
> > fully describes the config symbol")
> 
> Good old checkpatch :D

;-) ;-) ;-)

> > >  endmenu # "RCU Subsystem"
> > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> > > index c54ea2b6a36b..627a3218a372 100644
> > > --- a/kernel/rcu/rcu_segcblist.c
> > > +++ b/kernel/rcu/rcu_segcblist.c
> > > @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp)
> > >  	rclp->head = NULL;
> > >  	rclp->tail = &rclp->head;
> > >  	rclp->len = 0;
> > > +	rclp->lazy_len = 0;
> > >  }
> > >  
> > >  /*
> > > @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
> > >  	WRITE_ONCE(rclp->len, rclp->len + 1);
> > >  }
> > >  
> > > +/*
> > > + * Enqueue an rcu_head structure onto the specified callback list.
> > 
> > Please also note the fact that it is enqueuing lazily.
> 
> Sorry, done.
> 
> > > + */
> > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp)
> > > +{
> > > +	rcu_cblist_enqueue(rclp, rhp);
> > > +	WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1);
> > 
> > Except...  Why not just add a "lazy" parameter to rcu_cblist_enqueue()?
> > IS_ENABLED() can make it fast.
> 
> Yeah good idea, it simplifies the code too. Thank you!
> 
> So you mean I should add in this function so that the branch gets optimized:
> if (lazy && IS_ENABLE(CONFIG_RCU_LAZY)) {
>   ...
> }
> 
> That makes total sense considering the compiler may otherwise not be able to
> optimize the function viewing just the individual translation unit. I fixed
> it.

Or the other way around:

	if (IS_ENABLE(CONFIG_RCU_LAZY) && lazy) {

Just in case the compiler is stumbling over its boolean logic.  Or in
case the human reader is.  ;-)

> The 6 month old baby and wife are calling my attention now. I will continue
> to reply to the other parts of this and other emails this evening and thanks
> for your help!

Ah, for those who believe that SIGCHLD can be ignored in real life!  ;-)

							Thanx, Paul
Joel Fernandes July 10, 2022, 2:26 a.m. UTC | #12
I replied some more and will reply more soon, its a really long thread and
thank you for the detailed review :)

On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote:
[..]
> > +}
> > +
> >  /*
> >   * Flush the second rcu_cblist structure onto the first one, obliterating
> >   * any contents of the first.  If rhp is non-NULL, enqueue it as the sole
> > @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
> >  	}
> >  }
> 
> Header comment, please.  It can be short, referring to that of the
> function rcu_cblist_flush_enqueue().

Done, what I ended up doing is nuking the new function and doing the same
IS_ENABLED() trick to the existing rcu_cblist_flush_enqueue(). diffstat is
also happy!

> > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> > +			      struct rcu_cblist *srclp,
> > +			      struct rcu_head *rhp)
> 
> Please line up the "struct" keywords.  (Picky, I know...)
> 
> > +{
> > +	rcu_cblist_flush_enqueue(drclp, srclp, rhp);
> > +	if (rhp)
> > +		WRITE_ONCE(srclp->lazy_len, 1);
> 
> Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()?
> Concerns about speed in the !RCU_LAZY case can be addressed using
> IS_ENABLED(), for example:
> 
> 	if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp)
> 		WRITE_ONCE(srclp->lazy_len, 1);

Ah indeed exactly what I ended up doing.

> > +}
> > +
> >  /*
> >   * Dequeue the oldest rcu_head structure from the specified callback
> >   * list.
> > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> > index 431cee212467..c3d7de65b689 100644
> > --- a/kernel/rcu/rcu_segcblist.h
> > +++ b/kernel/rcu/rcu_segcblist.h
> > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
> >  	return READ_ONCE(rclp->len);
> >  }
> >  
> > +/* Return number of callbacks in the specified callback list. */
> > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
> > +{
> > +#ifdef CONFIG_RCU_LAZY
> > +	return READ_ONCE(rclp->lazy_len);
> > +#else
> > +	return 0;
> > +#endif
> 
> Please use IS_ENABLED().  This saves a line (and lots of characters)
> but compiles just as efficienctly.

Sounds good, looks a lot better, thanks!

It ends up looking like:

static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
{
        if (IS_ENABLED(CONFIG_RCU_LAZY))
                return READ_ONCE(rclp->lazy_len);
        return 0;
}

static inline void rcu_cblist_reset_lazy_len(struct rcu_cblist *rclp)
{
        if (IS_ENABLED(CONFIG_RCU_LAZY))
                WRITE_ONCE(rclp->lazy_len, 0);
}

> > +}
> > +
> >  /* Return number of callbacks in segmented callback list by summing seglen. */
> >  long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
> >  
> >  void rcu_cblist_init(struct rcu_cblist *rclp);
> >  void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
> > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp);
> >  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
> >  			      struct rcu_cblist *srclp,
> >  			      struct rcu_head *rhp);
> > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> > +			      struct rcu_cblist *srclp,
> > +			      struct rcu_head *rhp);
> 
> Please line up the "struct" keywords.  (Still picky, I know...)

Nuked it due to new lazy parameter so no issue now.
 
> >  struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
> >  
> >  /*
> > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > index c25ba442044a..d2e3d6e176d2 100644
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
> >   * Implementation of these memory-ordering guarantees is described here:
> >   * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
> >   */
> 
> The above docbook comment needs to move to call_rcu().

Ok sure.

> > -void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > +static void
> > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
> >  {
> >  	static atomic_t doublefrees;
> >  	unsigned long flags;
> > @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  	}
> >  
> >  	check_cb_ovld(rdp);
> > -	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
> > +	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> >  		return; // Enqueued onto ->nocb_bypass, so just leave.
> >  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> >  	rcu_segcblist_enqueue(&rdp->cblist, head);
> > @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> >  		local_irq_restore(flags);
> >  	}
> >  }
> > -EXPORT_SYMBOL_GPL(call_rcu);
> 
> Please add a docbook comment for call_rcu_lazy().  It can be brief, for
> example, by referring to call_rcu()'s docbook comment for memory-ordering
> details.

I added something like the following, hope it looks OK:

#ifdef CONFIG_RCU_LAZY
/**
 * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period.
 * @head: structure to be used for queueing the RCU updates.
 * @func: actual callback function to be invoked after the grace period
 *
 * The callback function will be invoked some time after a full grace
 * period elapses, in other words after all pre-existing RCU read-side
 * critical sections have completed.
 *
 * Use this API instead of call_rcu() if you don't mind the callback being
 * invoked after very long periods of time on systems without memory pressure
 * and on systems which are lightly loaded or mostly idle.
 *
 * Other than the extra delay in callbacks being invoked, this function is
 * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
 * details about memory ordering and other functionality.
 */
void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
{
        return __call_rcu_common(head, func, true);
}
EXPORT_SYMBOL_GPL(call_rcu_lazy);
#endif

> 
> > +#ifdef CONFIG_RCU_LAZY
> > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
> > +{
> > +	return __call_rcu_common(head, func, true);
> > +}
> > +EXPORT_SYMBOL_GPL(call_rcu_lazy);
> > +#endif
> > +
> > +void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > +{
> > +	return __call_rcu_common(head, func, false);
> > +
> > +}
> > +EXPORT_SYMBOL_GPL(call_rcu);
> >  
> >  /* Maximum number of jiffies to wait before draining a batch. */
> >  #define KFREE_DRAIN_JIFFIES (HZ / 50)
> > @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
> >  	rdp->barrier_head.func = rcu_barrier_callback;
> >  	debug_rcu_head_queue(&rdp->barrier_head);
> >  	rcu_nocb_lock(rdp);
> > -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> > +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
> >  	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
> >  		atomic_inc(&rcu_state.barrier_cpu_count);
> >  	} else {
> > @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu)
> >  	my_rdp = this_cpu_ptr(&rcu_data);
> >  	my_rnp = my_rdp->mynode;
> >  	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
> > -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
> > +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
> >  	raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
> >  	/* Leverage recent GPs and set GP for new callbacks. */
> >  	needwake = rcu_advance_cbs(my_rnp, rdp) ||
> > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> > index 2ccf5845957d..fec4fad6654b 100644
> > --- a/kernel/rcu/tree.h
> > +++ b/kernel/rcu/tree.h
> > @@ -267,8 +267,9 @@ struct rcu_data {
> >  /* Values for nocb_defer_wakeup field in struct rcu_data. */
> >  #define RCU_NOCB_WAKE_NOT	0
> >  #define RCU_NOCB_WAKE_BYPASS	1
> > -#define RCU_NOCB_WAKE		2
> > -#define RCU_NOCB_WAKE_FORCE	3
> > +#define RCU_NOCB_WAKE_LAZY	2
> > +#define RCU_NOCB_WAKE		3
> > +#define RCU_NOCB_WAKE_FORCE	4
> >  
> >  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
> >  					/* For jiffies_till_first_fqs and */
> > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> >  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> >  static void rcu_init_one_nocb(struct rcu_node *rnp);
> >  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				  unsigned long j);
> > +				  unsigned long j, bool lazy);
> >  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				bool *was_alldone, unsigned long flags);
> > +				bool *was_alldone, unsigned long flags,
> > +				bool lazy);
> >  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
> >  				 unsigned long flags);
> >  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> > index e369efe94fda..b9244f22e102 100644
> > --- a/kernel/rcu/tree_nocb.h
> > +++ b/kernel/rcu/tree_nocb.h
> > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
> >  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
> >  }
> 
> Comment on LAZY_FLUSH_JIFFIES purpose in life, please!  (At some point
> more flexibility may be required, but let's not unnecessarily rush
> into that.)

I added this:
/*
 * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
 * can elapse before lazy callbacks are flushed. Lazy callbacks
 * could be flushed much earlier for a number of other reasons
 * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
 * left unsubmitted to RCU after those many jiffies.
 */

> > +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> > +
> >  /*
> >   * Arrange to wake the GP kthread for this NOCB group at some future
> >   * time when it is safe to do so.
> > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> >  	 * Bypass wakeup overrides previous deferments. In case
> >  	 * of callback storm, no need to wake up too early.
> >  	 */
> > -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
> > +	if (waketype == RCU_NOCB_WAKE_LAZY) {
> 
> Presumably we get here only if all of this CPU's callbacks are lazy?

Yes that's right.

> >  	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
> >  	WRITE_ONCE(rdp->nocb_bypass_first, j);
> >  	rcu_nocb_bypass_unlock(rdp);
> > @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >   * Note that this function always returns true if rhp is NULL.
> >   */
> >  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				  unsigned long j)
> > +				  unsigned long j, bool lazy)
> >  {
> >  	if (!rcu_rdp_is_offloaded(rdp))
> >  		return true;
> >  	rcu_lockdep_assert_cblist_protected(rdp);
> >  	rcu_nocb_bypass_lock(rdp);
> > -	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
> > +	return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
> >  }
> >  
> >  /*
> > @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> >  	if (!rcu_rdp_is_offloaded(rdp) ||
> >  	    !rcu_nocb_bypass_trylock(rdp))
> >  		return;
> > -	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
> > +	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
> >  }
> >  
> >  /*
> > @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> >   * there is only one CPU in operation.
> >   */
> >  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > -				bool *was_alldone, unsigned long flags)
> > +				bool *was_alldone, unsigned long flags,
> > +				bool lazy)
> >  {
> >  	unsigned long c;
> >  	unsigned long cur_gp_seq;
> >  	unsigned long j = jiffies;
> >  	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> > +	long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
> >  
> >  	lockdep_assert_irqs_disabled();
> >  
> > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> >  	}
> >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> >  
> > -	// If there hasn't yet been all that many ->cblist enqueues
> > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > -	// ->nocb_bypass first.
> > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > +	// number of CBs (lazy + non-lazy) grows too much.
> > +	//
> > +	// Note that if the bypass list has lazy CBs, and the main list is
> > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > +	    (lazy && n_lazy_cbs >= qhimark)) {
> >  		rcu_nocb_lock(rdp);
> >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >  		if (*was_alldone)
> >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > -					    TPS("FirstQ"));
> > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> 
> The "false" here instead of "lazy" is because the caller is to do the
> enqueuing, correct?

There is no difference between using false or lazy here, because the bypass
flush is not also enqueuing the lazy callback, right?

We can also pass lazy instead of false if that's less confusing.

Or maybe I missed the issue you're raising?

> >  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> >  		return false; // Caller must enqueue the callback.
> >  	}
> >  
> >  	// If ->nocb_bypass has been used too long or is too full,
> >  	// flush ->nocb_bypass to ->cblist.
> > -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> > -	    ncbs >= qhimark) {
> > +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
> >  		rcu_nocb_lock(rdp);
> > -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
> > +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {
> 
> But shouldn't this "true" be "lazy"?  I don't see how we are guaranteed
> that the callback is in fact lazy at this point in the code.  Also,
> there is not yet a guarantee that the caller will do the enqueuing.
> So what am I missing?

Sorry I screwed this part up. I think I meant 'false' here, if the list grew
too big- then I think I would prefer if the new lazy CB instead is treated as
non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What
do you think?

Will reply more to the rest of the comments soon, thanks!

thanks,

 - Joel
Paul E. McKenney July 10, 2022, 4:03 p.m. UTC | #13
On Sun, Jul 10, 2022 at 02:26:51AM +0000, Joel Fernandes wrote:
> I replied some more and will reply more soon, its a really long thread and
> thank you for the detailed review :)
> 
> On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote:
> [..]
> > > +}
> > > +
> > >  /*
> > >   * Flush the second rcu_cblist structure onto the first one, obliterating
> > >   * any contents of the first.  If rhp is non-NULL, enqueue it as the sole
> > > @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
> > >  	}
> > >  }
> > 
> > Header comment, please.  It can be short, referring to that of the
> > function rcu_cblist_flush_enqueue().
> 
> Done, what I ended up doing is nuking the new function and doing the same
> IS_ENABLED() trick to the existing rcu_cblist_flush_enqueue(). diffstat is
> also happy!
> 
> > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> > > +			      struct rcu_cblist *srclp,
> > > +			      struct rcu_head *rhp)
> > 
> > Please line up the "struct" keywords.  (Picky, I know...)
> > 
> > > +{
> > > +	rcu_cblist_flush_enqueue(drclp, srclp, rhp);
> > > +	if (rhp)
> > > +		WRITE_ONCE(srclp->lazy_len, 1);
> > 
> > Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()?
> > Concerns about speed in the !RCU_LAZY case can be addressed using
> > IS_ENABLED(), for example:
> > 
> > 	if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp)
> > 		WRITE_ONCE(srclp->lazy_len, 1);
> 
> Ah indeed exactly what I ended up doing.
> 
> > > +}
> > > +
> > >  /*
> > >   * Dequeue the oldest rcu_head structure from the specified callback
> > >   * list.
> > > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> > > index 431cee212467..c3d7de65b689 100644
> > > --- a/kernel/rcu/rcu_segcblist.h
> > > +++ b/kernel/rcu/rcu_segcblist.h
> > > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
> > >  	return READ_ONCE(rclp->len);
> > >  }
> > >  
> > > +/* Return number of callbacks in the specified callback list. */
> > > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
> > > +{
> > > +#ifdef CONFIG_RCU_LAZY
> > > +	return READ_ONCE(rclp->lazy_len);
> > > +#else
> > > +	return 0;
> > > +#endif
> > 
> > Please use IS_ENABLED().  This saves a line (and lots of characters)
> > but compiles just as efficienctly.
> 
> Sounds good, looks a lot better, thanks!
> 
> It ends up looking like:
> 
> static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
> {
>         if (IS_ENABLED(CONFIG_RCU_LAZY))
>                 return READ_ONCE(rclp->lazy_len);
>         return 0;
> }
> 
> static inline void rcu_cblist_reset_lazy_len(struct rcu_cblist *rclp)
> {
>         if (IS_ENABLED(CONFIG_RCU_LAZY))
>                 WRITE_ONCE(rclp->lazy_len, 0);
> }

All much better, thank you!

> > > +}
> > > +
> > >  /* Return number of callbacks in segmented callback list by summing seglen. */
> > >  long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
> > >  
> > >  void rcu_cblist_init(struct rcu_cblist *rclp);
> > >  void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
> > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp);
> > >  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
> > >  			      struct rcu_cblist *srclp,
> > >  			      struct rcu_head *rhp);
> > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
> > > +			      struct rcu_cblist *srclp,
> > > +			      struct rcu_head *rhp);
> > 
> > Please line up the "struct" keywords.  (Still picky, I know...)
> 
> Nuked it due to new lazy parameter so no issue now.
>  
> > >  struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
> > >  
> > >  /*
> > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
> > > index c25ba442044a..d2e3d6e176d2 100644
> > > --- a/kernel/rcu/tree.c
> > > +++ b/kernel/rcu/tree.c
> > > @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
> > >   * Implementation of these memory-ordering guarantees is described here:
> > >   * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
> > >   */
> > 
> > The above docbook comment needs to move to call_rcu().
> 
> Ok sure.
> 
> > > -void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > +static void
> > > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
> > >  {
> > >  	static atomic_t doublefrees;
> > >  	unsigned long flags;
> > > @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >  	}
> > >  
> > >  	check_cb_ovld(rdp);
> > > -	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
> > > +	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
> > >  		return; // Enqueued onto ->nocb_bypass, so just leave.
> > >  	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
> > >  	rcu_segcblist_enqueue(&rdp->cblist, head);
> > > @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > >  		local_irq_restore(flags);
> > >  	}
> > >  }
> > > -EXPORT_SYMBOL_GPL(call_rcu);
> > 
> > Please add a docbook comment for call_rcu_lazy().  It can be brief, for
> > example, by referring to call_rcu()'s docbook comment for memory-ordering
> > details.
> 
> I added something like the following, hope it looks OK:
> 
> #ifdef CONFIG_RCU_LAZY
> /**
>  * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period.
>  * @head: structure to be used for queueing the RCU updates.
>  * @func: actual callback function to be invoked after the grace period
>  *
>  * The callback function will be invoked some time after a full grace
>  * period elapses, in other words after all pre-existing RCU read-side
>  * critical sections have completed.
>  *
>  * Use this API instead of call_rcu() if you don't mind the callback being
>  * invoked after very long periods of time on systems without memory pressure
>  * and on systems which are lightly loaded or mostly idle.
>  *
>  * Other than the extra delay in callbacks being invoked, this function is
>  * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
>  * details about memory ordering and other functionality.

Much better, thank you!

>  */
> void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
> {
>         return __call_rcu_common(head, func, true);
> }
> EXPORT_SYMBOL_GPL(call_rcu_lazy);
> #endif
> 
> > 
> > > +#ifdef CONFIG_RCU_LAZY
> > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
> > > +{
> > > +	return __call_rcu_common(head, func, true);
> > > +}
> > > +EXPORT_SYMBOL_GPL(call_rcu_lazy);
> > > +#endif
> > > +
> > > +void call_rcu(struct rcu_head *head, rcu_callback_t func)
> > > +{
> > > +	return __call_rcu_common(head, func, false);
> > > +
> > > +}
> > > +EXPORT_SYMBOL_GPL(call_rcu);
> > >  
> > >  /* Maximum number of jiffies to wait before draining a batch. */
> > >  #define KFREE_DRAIN_JIFFIES (HZ / 50)
> > > @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
> > >  	rdp->barrier_head.func = rcu_barrier_callback;
> > >  	debug_rcu_head_queue(&rdp->barrier_head);
> > >  	rcu_nocb_lock(rdp);
> > > -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
> > > +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
> > >  	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
> > >  		atomic_inc(&rcu_state.barrier_cpu_count);
> > >  	} else {
> > > @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu)
> > >  	my_rdp = this_cpu_ptr(&rcu_data);
> > >  	my_rnp = my_rdp->mynode;
> > >  	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
> > > -	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
> > > +	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
> > >  	raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
> > >  	/* Leverage recent GPs and set GP for new callbacks. */
> > >  	needwake = rcu_advance_cbs(my_rnp, rdp) ||
> > > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> > > index 2ccf5845957d..fec4fad6654b 100644
> > > --- a/kernel/rcu/tree.h
> > > +++ b/kernel/rcu/tree.h
> > > @@ -267,8 +267,9 @@ struct rcu_data {
> > >  /* Values for nocb_defer_wakeup field in struct rcu_data. */
> > >  #define RCU_NOCB_WAKE_NOT	0
> > >  #define RCU_NOCB_WAKE_BYPASS	1
> > > -#define RCU_NOCB_WAKE		2
> > > -#define RCU_NOCB_WAKE_FORCE	3
> > > +#define RCU_NOCB_WAKE_LAZY	2
> > > +#define RCU_NOCB_WAKE		3
> > > +#define RCU_NOCB_WAKE_FORCE	4
> > >  
> > >  #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
> > >  					/* For jiffies_till_first_fqs and */
> > > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
> > >  static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
> > >  static void rcu_init_one_nocb(struct rcu_node *rnp);
> > >  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > > -				  unsigned long j);
> > > +				  unsigned long j, bool lazy);
> > >  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > > -				bool *was_alldone, unsigned long flags);
> > > +				bool *was_alldone, unsigned long flags,
> > > +				bool lazy);
> > >  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
> > >  				 unsigned long flags);
> > >  static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
> > > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
> > > index e369efe94fda..b9244f22e102 100644
> > > --- a/kernel/rcu/tree_nocb.h
> > > +++ b/kernel/rcu/tree_nocb.h
> > > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
> > >  	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
> > >  }
> > 
> > Comment on LAZY_FLUSH_JIFFIES purpose in life, please!  (At some point
> > more flexibility may be required, but let's not unnecessarily rush
> > into that.)
> 
> I added this:
> /*
>  * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
>  * can elapse before lazy callbacks are flushed. Lazy callbacks
>  * could be flushed much earlier for a number of other reasons
>  * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
>  * left unsubmitted to RCU after those many jiffies.
>  */

Much better, thank you!

> > > +#define LAZY_FLUSH_JIFFIES (10 * HZ)
> > > +
> > >  /*
> > >   * Arrange to wake the GP kthread for this NOCB group at some future
> > >   * time when it is safe to do so.
> > > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
> > >  	 * Bypass wakeup overrides previous deferments. In case
> > >  	 * of callback storm, no need to wake up too early.
> > >  	 */
> > > -	if (waketype == RCU_NOCB_WAKE_BYPASS) {
> > > +	if (waketype == RCU_NOCB_WAKE_LAZY) {
> > 
> > Presumably we get here only if all of this CPU's callbacks are lazy?
> 
> Yes that's right.

OK, thank yoiu for the clarification.

> > >  	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
> > >  	WRITE_ONCE(rdp->nocb_bypass_first, j);
> > >  	rcu_nocb_bypass_unlock(rdp);
> > > @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > >   * Note that this function always returns true if rhp is NULL.
> > >   */
> > >  static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > > -				  unsigned long j)
> > > +				  unsigned long j, bool lazy)
> > >  {
> > >  	if (!rcu_rdp_is_offloaded(rdp))
> > >  		return true;
> > >  	rcu_lockdep_assert_cblist_protected(rdp);
> > >  	rcu_nocb_bypass_lock(rdp);
> > > -	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
> > > +	return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
> > >  }
> > >  
> > >  /*
> > > @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> > >  	if (!rcu_rdp_is_offloaded(rdp) ||
> > >  	    !rcu_nocb_bypass_trylock(rdp))
> > >  		return;
> > > -	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
> > > +	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
> > >  }
> > >  
> > >  /*
> > > @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
> > >   * there is only one CPU in operation.
> > >   */
> > >  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > > -				bool *was_alldone, unsigned long flags)
> > > +				bool *was_alldone, unsigned long flags,
> > > +				bool lazy)
> > >  {
> > >  	unsigned long c;
> > >  	unsigned long cur_gp_seq;
> > >  	unsigned long j = jiffies;
> > >  	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
> > > +	long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
> > >  
> > >  	lockdep_assert_irqs_disabled();
> > >  
> > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
> > >  	}
> > >  	WRITE_ONCE(rdp->nocb_nobypass_count, c);
> > >  
> > > -	// If there hasn't yet been all that many ->cblist enqueues
> > > -	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
> > > -	// ->nocb_bypass first.
> > > -	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
> > > +	// If caller passed a non-lazy CB and there hasn't yet been all that
> > > +	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
> > > +	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
> > > +	// number of CBs (lazy + non-lazy) grows too much.
> > > +	//
> > > +	// Note that if the bypass list has lazy CBs, and the main list is
> > > +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> > > +	// the lazy CBs to the main list as well. That's the right thing to do,
> > > +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> > > +	// one, we can just reuse that GP for the already queued-up lazy ones.
> > > +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> > > +	    (lazy && n_lazy_cbs >= qhimark)) {
> > >  		rcu_nocb_lock(rdp);
> > >  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> > >  		if (*was_alldone)
> > >  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> > > -					    TPS("FirstQ"));
> > > -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> > > +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> > > +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> > 
> > The "false" here instead of "lazy" is because the caller is to do the
> > enqueuing, correct?
> 
> There is no difference between using false or lazy here, because the bypass
> flush is not also enqueuing the lazy callback, right?
> 
> We can also pass lazy instead of false if that's less confusing.
> 
> Or maybe I missed the issue you're raising?

I am mostly checking up on your intended meaning of "lazy" in various
contexts.  It could mean only that the caller requested laziness, or in
some cases it could mean that the callback actually will be lazy.

I can rationalize the "false" above as a "don't care" in this case
because (as you say) there is not callback.  In which case this code
is OK as is, as long as the header comment for rcu_nocb_flush_bypass()
clearly states that this parameter has meaning only when there really
is a callback being queued.

> > >  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> > >  		return false; // Caller must enqueue the callback.
> > >  	}
> > >  
> > >  	// If ->nocb_bypass has been used too long or is too full,
> > >  	// flush ->nocb_bypass to ->cblist.
> > > -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> > > -	    ncbs >= qhimark) {
> > > +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
> > >  		rcu_nocb_lock(rdp);
> > > -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
> > > +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {
> > 
> > But shouldn't this "true" be "lazy"?  I don't see how we are guaranteed
> > that the callback is in fact lazy at this point in the code.  Also,
> > there is not yet a guarantee that the caller will do the enqueuing.
> > So what am I missing?
> 
> Sorry I screwed this part up. I think I meant 'false' here, if the list grew
> too big- then I think I would prefer if the new lazy CB instead is treated as
> non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What
> do you think?

Good point, if we are choosing to override the laziness requested by the
caller, then it should say "true".  It would be good to have a comment
saying that is what we are doing, correct?

> Will reply more to the rest of the comments soon, thanks!

Sounds good!  (Hey, wouldn't want you to be bored!)

							Thanx, Paul
Joel Fernandes July 12, 2022, 8:53 p.m. UTC | #14
On 7/10/2022 12:03 PM, Paul E. McKenney wrote:
[..]
>>>> +	// Note that if the bypass list has lazy CBs, and the main list is
>>>> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
>>>> +	// the lazy CBs to the main list as well. That's the right thing to do,
>>>> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
>>>> +	// one, we can just reuse that GP for the already queued-up lazy ones.
>>>> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
>>>> +	    (lazy && n_lazy_cbs >= qhimark)) {
>>>>  		rcu_nocb_lock(rdp);
>>>>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>>>  		if (*was_alldone)
>>>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>>> -					    TPS("FirstQ"));
>>>> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
>>>> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
>>>> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
>>>
>>> The "false" here instead of "lazy" is because the caller is to do the
>>> enqueuing, correct?
>>
>> There is no difference between using false or lazy here, because the bypass
>> flush is not also enqueuing the lazy callback, right?
>>
>> We can also pass lazy instead of false if that's less confusing.
>>
>> Or maybe I missed the issue you're raising?
> 
> I am mostly checking up on your intended meaning of "lazy" in various
> contexts.  It could mean only that the caller requested laziness, or in
> some cases it could mean that the callback actually will be lazy.
> 
> I can rationalize the "false" above as a "don't care" in this case
> because (as you say) there is not callback.  In which case this code
> is OK as is, as long as the header comment for rcu_nocb_flush_bypass()
> clearly states that this parameter has meaning only when there really
> is a callback being queued.

I decided to change this and the below to "lazy" variable instead of
true/false, as the code is cleaner and less confusing IMO. It makes
sense to me and in my testing it works fine. Hope that's Ok with you.

About changing the lazy length count to a flag, one drawback of doing
that is, say if there are some non-lazy CBs in the bypass list, then the
lazy shrinker will end up reporting an inaccurate count. Also
considering that it might be harder to add the count back later say if
we need it for tracing, I would say lets leave it as is. I will keep the
counter for v3 and we can discuss. Does that sound good to you?

I think some more testing, checkpatch running etc and I should be good
to send v3 :)

Thanks!

 - Joel


> 
>>>>  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>>>>  		return false; // Caller must enqueue the callback.
>>>>  	}
>>>>  
>>>>  	// If ->nocb_bypass has been used too long or is too full,
>>>>  	// flush ->nocb_bypass to ->cblist.
>>>> -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
>>>> -	    ncbs >= qhimark) {
>>>> +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
>>>>  		rcu_nocb_lock(rdp);
>>>> -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
>>>> +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {
>>>
>>> But shouldn't this "true" be "lazy"?  I don't see how we are guaranteed
>>> that the callback is in fact lazy at this point in the code.  Also,
>>> there is not yet a guarantee that the caller will do the enqueuing.
>>> So what am I missing?
>>
>> Sorry I screwed this part up. I think I meant 'false' here, if the list grew
>> too big- then I think I would prefer if the new lazy CB instead is treated as
>> non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What
>> do you think?
> 
> Good point, if we are choosing to override the laziness requested by the
> caller, then it should say "true".  It would be good to have a comment
> saying that is what we are doing, correct?
> 
>> Will reply more to the rest of the comments soon, thanks!
> 
> Sounds good!  (Hey, wouldn't want you to be bored!)
> 
> 							Thanx, Paul
Paul E. McKenney July 12, 2022, 9:04 p.m. UTC | #15
On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote:
> 
> On 7/10/2022 12:03 PM, Paul E. McKenney wrote:
> [..]
> >>>> +	// Note that if the bypass list has lazy CBs, and the main list is
> >>>> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> >>>> +	// the lazy CBs to the main list as well. That's the right thing to do,
> >>>> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> >>>> +	// one, we can just reuse that GP for the already queued-up lazy ones.
> >>>> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> >>>> +	    (lazy && n_lazy_cbs >= qhimark)) {
> >>>>  		rcu_nocb_lock(rdp);
> >>>>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >>>>  		if (*was_alldone)
> >>>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> >>>> -					    TPS("FirstQ"));
> >>>> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> >>>> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> >>>> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> >>>
> >>> The "false" here instead of "lazy" is because the caller is to do the
> >>> enqueuing, correct?
> >>
> >> There is no difference between using false or lazy here, because the bypass
> >> flush is not also enqueuing the lazy callback, right?
> >>
> >> We can also pass lazy instead of false if that's less confusing.
> >>
> >> Or maybe I missed the issue you're raising?
> > 
> > I am mostly checking up on your intended meaning of "lazy" in various
> > contexts.  It could mean only that the caller requested laziness, or in
> > some cases it could mean that the callback actually will be lazy.
> > 
> > I can rationalize the "false" above as a "don't care" in this case
> > because (as you say) there is not callback.  In which case this code
> > is OK as is, as long as the header comment for rcu_nocb_flush_bypass()
> > clearly states that this parameter has meaning only when there really
> > is a callback being queued.
> 
> I decided to change this and the below to "lazy" variable instead of
> true/false, as the code is cleaner and less confusing IMO. It makes
> sense to me and in my testing it works fine. Hope that's Ok with you.

Sounds plausible.

> About changing the lazy length count to a flag, one drawback of doing
> that is, say if there are some non-lazy CBs in the bypass list, then the
> lazy shrinker will end up reporting an inaccurate count. Also
> considering that it might be harder to add the count back later say if
> we need it for tracing, I would say lets leave it as is. I will keep the
> counter for v3 and we can discuss. Does that sound good to you?

You lost me on this one.  If there are any non-lazy callbacks, the whole
bypass list is already being treated as non-lazy, right?  If so, then
the lazy shrinker should report the full count if all callbacks are lazy,
and should report none otherwise.  Or am I missing something here?

> I think some more testing, checkpatch running etc and I should be good
> to send v3 :)

Sounds good!

							Thanx, Paul

> Thanks!
> 
>  - Joel
> 
> 
> > 
> >>>>  		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
> >>>>  		return false; // Caller must enqueue the callback.
> >>>>  	}
> >>>>  
> >>>>  	// If ->nocb_bypass has been used too long or is too full,
> >>>>  	// flush ->nocb_bypass to ->cblist.
> >>>> -	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
> >>>> -	    ncbs >= qhimark) {
> >>>> +	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
> >>>>  		rcu_nocb_lock(rdp);
> >>>> -		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
> >>>> +		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {
> >>>
> >>> But shouldn't this "true" be "lazy"?  I don't see how we are guaranteed
> >>> that the callback is in fact lazy at this point in the code.  Also,
> >>> there is not yet a guarantee that the caller will do the enqueuing.
> >>> So what am I missing?
> >>
> >> Sorry I screwed this part up. I think I meant 'false' here, if the list grew
> >> too big- then I think I would prefer if the new lazy CB instead is treated as
> >> non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What
> >> do you think?
> > 
> > Good point, if we are choosing to override the laziness requested by the
> > caller, then it should say "true".  It would be good to have a comment
> > saying that is what we are doing, correct?
> > 
> >> Will reply more to the rest of the comments soon, thanks!
> > 
> > Sounds good!  (Hey, wouldn't want you to be bored!)
> > 
> > 							Thanx, Paul
Joel Fernandes July 12, 2022, 9:10 p.m. UTC | #16
On 7/12/2022 5:04 PM, Paul E. McKenney wrote:
> On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote:
>>
>> On 7/10/2022 12:03 PM, Paul E. McKenney wrote:
>> [..]
>>>>>> +	// Note that if the bypass list has lazy CBs, and the main list is
>>>>>> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
>>>>>> +	// the lazy CBs to the main list as well. That's the right thing to do,
>>>>>> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
>>>>>> +	// one, we can just reuse that GP for the already queued-up lazy ones.
>>>>>> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
>>>>>> +	    (lazy && n_lazy_cbs >= qhimark)) {
>>>>>>  		rcu_nocb_lock(rdp);
>>>>>>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>>>>>  		if (*was_alldone)
>>>>>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>>>>> -					    TPS("FirstQ"));
>>>>>> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
>>>>>> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
>>>>>> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
>>>>>
>>>>> The "false" here instead of "lazy" is because the caller is to do the
>>>>> enqueuing, correct?
>>>>
>>>> There is no difference between using false or lazy here, because the bypass
>>>> flush is not also enqueuing the lazy callback, right?
>>>>
>>>> We can also pass lazy instead of false if that's less confusing.
>>>>
>>>> Or maybe I missed the issue you're raising?
>>>
>>> I am mostly checking up on your intended meaning of "lazy" in various
>>> contexts.  It could mean only that the caller requested laziness, or in
>>> some cases it could mean that the callback actually will be lazy.
>>>
>>> I can rationalize the "false" above as a "don't care" in this case
>>> because (as you say) there is not callback.  In which case this code
>>> is OK as is, as long as the header comment for rcu_nocb_flush_bypass()
>>> clearly states that this parameter has meaning only when there really
>>> is a callback being queued.
>>
>> I decided to change this and the below to "lazy" variable instead of
>> true/false, as the code is cleaner and less confusing IMO. It makes
>> sense to me and in my testing it works fine. Hope that's Ok with you.
> 
> Sounds plausible.
> 
>> About changing the lazy length count to a flag, one drawback of doing
>> that is, say if there are some non-lazy CBs in the bypass list, then the
>> lazy shrinker will end up reporting an inaccurate count. Also
>> considering that it might be harder to add the count back later say if
>> we need it for tracing, I would say lets leave it as is. I will keep the
>> counter for v3 and we can discuss. Does that sound good to you?
> 
> You lost me on this one.  If there are any non-lazy callbacks, the whole
> bypass list is already being treated as non-lazy, right?  If so, then
> the lazy shrinker should report the full count if all callbacks are lazy,
> and should report none otherwise.  Or am I missing something here?
> 

That's one way to interpret it, another way is say there were a 1000
lazy CBs, and now 1 non-lazy came in. The shrinker could report the lazy
count as 0 per your interpretation. Yes its true they will get flushed
out in the next jiffie, but for that time instant, the number of lazy
CBs in the list is not zero! :) Yeah OK its a weak argument, still an
argument! ;-)

In any case, we saw the need for the length of the segcb lists to figure
out things via tracing, so I suspect we may need this in the future as
well, its a small cost so I would rather keep it if that's Ok with you! :)

Thanks,

- Joel
Paul E. McKenney July 12, 2022, 10:41 p.m. UTC | #17
On Tue, Jul 12, 2022 at 05:10:41PM -0400, Joel Fernandes wrote:
> 
> 
> On 7/12/2022 5:04 PM, Paul E. McKenney wrote:
> > On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote:
> >>
> >> On 7/10/2022 12:03 PM, Paul E. McKenney wrote:
> >> [..]
> >>>>>> +	// Note that if the bypass list has lazy CBs, and the main list is
> >>>>>> +	// empty, and rhp happens to be non-lazy, then we end up flushing all
> >>>>>> +	// the lazy CBs to the main list as well. That's the right thing to do,
> >>>>>> +	// since we are kick-starting RCU GP processing anyway for the non-lazy
> >>>>>> +	// one, we can just reuse that GP for the already queued-up lazy ones.
> >>>>>> +	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
> >>>>>> +	    (lazy && n_lazy_cbs >= qhimark)) {
> >>>>>>  		rcu_nocb_lock(rdp);
> >>>>>>  		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
> >>>>>>  		if (*was_alldone)
> >>>>>>  			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
> >>>>>> -					    TPS("FirstQ"));
> >>>>>> -		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
> >>>>>> +					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
> >>>>>> +		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
> >>>>>
> >>>>> The "false" here instead of "lazy" is because the caller is to do the
> >>>>> enqueuing, correct?
> >>>>
> >>>> There is no difference between using false or lazy here, because the bypass
> >>>> flush is not also enqueuing the lazy callback, right?
> >>>>
> >>>> We can also pass lazy instead of false if that's less confusing.
> >>>>
> >>>> Or maybe I missed the issue you're raising?
> >>>
> >>> I am mostly checking up on your intended meaning of "lazy" in various
> >>> contexts.  It could mean only that the caller requested laziness, or in
> >>> some cases it could mean that the callback actually will be lazy.
> >>>
> >>> I can rationalize the "false" above as a "don't care" in this case
> >>> because (as you say) there is not callback.  In which case this code
> >>> is OK as is, as long as the header comment for rcu_nocb_flush_bypass()
> >>> clearly states that this parameter has meaning only when there really
> >>> is a callback being queued.
> >>
> >> I decided to change this and the below to "lazy" variable instead of
> >> true/false, as the code is cleaner and less confusing IMO. It makes
> >> sense to me and in my testing it works fine. Hope that's Ok with you.
> > 
> > Sounds plausible.
> > 
> >> About changing the lazy length count to a flag, one drawback of doing
> >> that is, say if there are some non-lazy CBs in the bypass list, then the
> >> lazy shrinker will end up reporting an inaccurate count. Also
> >> considering that it might be harder to add the count back later say if
> >> we need it for tracing, I would say lets leave it as is. I will keep the
> >> counter for v3 and we can discuss. Does that sound good to you?
> > 
> > You lost me on this one.  If there are any non-lazy callbacks, the whole
> > bypass list is already being treated as non-lazy, right?  If so, then
> > the lazy shrinker should report the full count if all callbacks are lazy,
> > and should report none otherwise.  Or am I missing something here?
> > 
> 
> That's one way to interpret it, another way is say there were a 1000
> lazy CBs, and now 1 non-lazy came in. The shrinker could report the lazy
> count as 0 per your interpretation. Yes its true they will get flushed
> out in the next jiffie, but for that time instant, the number of lazy
> CBs in the list is not zero! :) Yeah OK its a weak argument, still an
> argument! ;-)
> 
> In any case, we saw the need for the length of the segcb lists to figure
> out things via tracing, so I suspect we may need this in the future as
> well, its a small cost so I would rather keep it if that's Ok with you! :)

OK, being needed for tracing/diagnostics is a somewhat less weak argument...

Let's see what v3 looks like.

							Thanx, Paul
diff mbox series

Patch

diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h
index 659d13a7ddaa..9a992707917b 100644
--- a/include/linux/rcu_segcblist.h
+++ b/include/linux/rcu_segcblist.h
@@ -22,6 +22,7 @@  struct rcu_cblist {
 	struct rcu_head *head;
 	struct rcu_head **tail;
 	long len;
+	long lazy_len;
 };
 
 #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 1a32036c918c..9191a3d88087 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -82,6 +82,12 @@  static inline int rcu_preempt_depth(void)
 
 #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
 
+#ifdef CONFIG_RCU_LAZY
+void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func);
+#else
+#define call_rcu_lazy(head, func) call_rcu(head, func)
+#endif
+
 /* Internal to kernel */
 void rcu_init(void);
 extern int rcu_scheduler_active;
diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
index 27aab870ae4c..0bffa992fdc4 100644
--- a/kernel/rcu/Kconfig
+++ b/kernel/rcu/Kconfig
@@ -293,4 +293,12 @@  config TASKS_TRACE_RCU_READ_MB
 	  Say N here if you hate read-side memory barriers.
 	  Take the default if you are unsure.
 
+config RCU_LAZY
+	bool "RCU callback lazy invocation functionality"
+	depends on RCU_NOCB_CPU
+	default n
+	help
+	  To save power, batch RCU callbacks and flush after delay, memory
+          pressure or callback list growing too big.
+
 endmenu # "RCU Subsystem"
diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index c54ea2b6a36b..627a3218a372 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -20,6 +20,7 @@  void rcu_cblist_init(struct rcu_cblist *rclp)
 	rclp->head = NULL;
 	rclp->tail = &rclp->head;
 	rclp->len = 0;
+	rclp->lazy_len = 0;
 }
 
 /*
@@ -32,6 +33,15 @@  void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp)
 	WRITE_ONCE(rclp->len, rclp->len + 1);
 }
 
+/*
+ * Enqueue an rcu_head structure onto the specified callback list.
+ */
+void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp)
+{
+	rcu_cblist_enqueue(rclp, rhp);
+	WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1);
+}
+
 /*
  * Flush the second rcu_cblist structure onto the first one, obliterating
  * any contents of the first.  If rhp is non-NULL, enqueue it as the sole
@@ -60,6 +70,15 @@  void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
 	}
 }
 
+void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
+			      struct rcu_cblist *srclp,
+			      struct rcu_head *rhp)
+{
+	rcu_cblist_flush_enqueue(drclp, srclp, rhp);
+	if (rhp)
+		WRITE_ONCE(srclp->lazy_len, 1);
+}
+
 /*
  * Dequeue the oldest rcu_head structure from the specified callback
  * list.
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 431cee212467..c3d7de65b689 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -15,14 +15,28 @@  static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
 	return READ_ONCE(rclp->len);
 }
 
+/* Return number of callbacks in the specified callback list. */
+static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
+{
+#ifdef CONFIG_RCU_LAZY
+	return READ_ONCE(rclp->lazy_len);
+#else
+	return 0;
+#endif
+}
+
 /* Return number of callbacks in segmented callback list by summing seglen. */
 long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
 
 void rcu_cblist_init(struct rcu_cblist *rclp);
 void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
+void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp);
 void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
 			      struct rcu_cblist *srclp,
 			      struct rcu_head *rhp);
+void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp,
+			      struct rcu_cblist *srclp,
+			      struct rcu_head *rhp);
 struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp);
 
 /*
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index c25ba442044a..d2e3d6e176d2 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3098,7 +3098,8 @@  static void check_cb_ovld(struct rcu_data *rdp)
  * Implementation of these memory-ordering guarantees is described here:
  * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
  */
-void call_rcu(struct rcu_head *head, rcu_callback_t func)
+static void
+__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
 {
 	static atomic_t doublefrees;
 	unsigned long flags;
@@ -3139,7 +3140,7 @@  void call_rcu(struct rcu_head *head, rcu_callback_t func)
 	}
 
 	check_cb_ovld(rdp);
-	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+	if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
 		return; // Enqueued onto ->nocb_bypass, so just leave.
 	// If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
 	rcu_segcblist_enqueue(&rdp->cblist, head);
@@ -3161,8 +3162,21 @@  void call_rcu(struct rcu_head *head, rcu_callback_t func)
 		local_irq_restore(flags);
 	}
 }
-EXPORT_SYMBOL_GPL(call_rcu);
 
+#ifdef CONFIG_RCU_LAZY
+void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func)
+{
+	return __call_rcu_common(head, func, true);
+}
+EXPORT_SYMBOL_GPL(call_rcu_lazy);
+#endif
+
+void call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+	return __call_rcu_common(head, func, false);
+
+}
+EXPORT_SYMBOL_GPL(call_rcu);
 
 /* Maximum number of jiffies to wait before draining a batch. */
 #define KFREE_DRAIN_JIFFIES (HZ / 50)
@@ -4056,7 +4070,7 @@  static void rcu_barrier_entrain(struct rcu_data *rdp)
 	rdp->barrier_head.func = rcu_barrier_callback;
 	debug_rcu_head_queue(&rdp->barrier_head);
 	rcu_nocb_lock(rdp);
-	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
 	if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
 		atomic_inc(&rcu_state.barrier_cpu_count);
 	} else {
@@ -4476,7 +4490,7 @@  void rcutree_migrate_callbacks(int cpu)
 	my_rdp = this_cpu_ptr(&rcu_data);
 	my_rnp = my_rdp->mynode;
 	rcu_nocb_lock(my_rdp); /* irqs already disabled. */
-	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
+	WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
 	raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
 	/* Leverage recent GPs and set GP for new callbacks. */
 	needwake = rcu_advance_cbs(my_rnp, rdp) ||
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 2ccf5845957d..fec4fad6654b 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -267,8 +267,9 @@  struct rcu_data {
 /* Values for nocb_defer_wakeup field in struct rcu_data. */
 #define RCU_NOCB_WAKE_NOT	0
 #define RCU_NOCB_WAKE_BYPASS	1
-#define RCU_NOCB_WAKE		2
-#define RCU_NOCB_WAKE_FORCE	3
+#define RCU_NOCB_WAKE_LAZY	2
+#define RCU_NOCB_WAKE		3
+#define RCU_NOCB_WAKE_FORCE	4
 
 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
 					/* For jiffies_till_first_fqs and */
@@ -436,9 +437,10 @@  static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
 static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
 static void rcu_init_one_nocb(struct rcu_node *rnp);
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j);
+				  unsigned long j, bool lazy);
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags);
+				bool *was_alldone, unsigned long flags,
+				bool lazy);
 static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
 				 unsigned long flags);
 static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index e369efe94fda..b9244f22e102 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -256,6 +256,8 @@  static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
 	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
 }
 
+#define LAZY_FLUSH_JIFFIES (10 * HZ)
+
 /*
  * Arrange to wake the GP kthread for this NOCB group at some future
  * time when it is safe to do so.
@@ -272,7 +274,10 @@  static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
 	 * Bypass wakeup overrides previous deferments. In case
 	 * of callback storm, no need to wake up too early.
 	 */
-	if (waketype == RCU_NOCB_WAKE_BYPASS) {
+	if (waketype == RCU_NOCB_WAKE_LAZY) {
+		mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES);
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	} else if (waketype == RCU_NOCB_WAKE_BYPASS) {
 		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
 		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
 	} else {
@@ -296,7 +301,7 @@  static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
  * Note that this function always returns true if rhp is NULL.
  */
 static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				     unsigned long j)
+				     unsigned long j, bool lazy)
 {
 	struct rcu_cblist rcl;
 
@@ -310,7 +315,13 @@  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
 	/* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
 	if (rhp)
 		rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+
+	trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len));
+	/* The lazy CBs are being flushed, but a new one might be enqueued. */
+	if (lazy)
+		rcu_cblist_flush_enqueue_lazy(&rcl, &rdp->nocb_bypass, rhp);
+	else
+		rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
 	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
 	WRITE_ONCE(rdp->nocb_bypass_first, j);
 	rcu_nocb_bypass_unlock(rdp);
@@ -326,13 +337,13 @@  static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
  * Note that this function always returns true if rhp is NULL.
  */
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
+				  unsigned long j, bool lazy)
 {
 	if (!rcu_rdp_is_offloaded(rdp))
 		return true;
 	rcu_lockdep_assert_cblist_protected(rdp);
 	rcu_nocb_bypass_lock(rdp);
-	return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+	return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
 }
 
 /*
@@ -345,7 +356,7 @@  static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
 	if (!rcu_rdp_is_offloaded(rdp) ||
 	    !rcu_nocb_bypass_trylock(rdp))
 		return;
-	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+	WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
 }
 
 /*
@@ -367,12 +378,14 @@  static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
  * there is only one CPU in operation.
  */
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
+				bool *was_alldone, unsigned long flags,
+				bool lazy)
 {
 	unsigned long c;
 	unsigned long cur_gp_seq;
 	unsigned long j = jiffies;
 	long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+	long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
 
 	lockdep_assert_irqs_disabled();
 
@@ -414,30 +427,37 @@  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
 	}
 	WRITE_ONCE(rdp->nocb_nobypass_count, c);
 
-	// If there hasn't yet been all that many ->cblist enqueues
-	// this jiffy, tell the caller to enqueue onto ->cblist.  But flush
-	// ->nocb_bypass first.
-	if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+	// If caller passed a non-lazy CB and there hasn't yet been all that
+	// many ->cblist enqueues this jiffy, tell the caller to enqueue it
+	// onto ->cblist.  But flush ->nocb_bypass first. Also do so, if total
+	// number of CBs (lazy + non-lazy) grows too much.
+	//
+	// Note that if the bypass list has lazy CBs, and the main list is
+	// empty, and rhp happens to be non-lazy, then we end up flushing all
+	// the lazy CBs to the main list as well. That's the right thing to do,
+	// since we are kick-starting RCU GP processing anyway for the non-lazy
+	// one, we can just reuse that GP for the already queued-up lazy ones.
+	if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) ||
+	    (lazy && n_lazy_cbs >= qhimark)) {
 		rcu_nocb_lock(rdp);
 		*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
 		if (*was_alldone)
 			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("FirstQ"));
-		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+					    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
+		WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
 		WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
 		return false; // Caller must enqueue the callback.
 	}
 
 	// If ->nocb_bypass has been used too long or is too full,
 	// flush ->nocb_bypass to ->cblist.
-	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
-	    ncbs >= qhimark) {
+	if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) {
 		rcu_nocb_lock(rdp);
-		if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+		if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) {
 			*was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
 			if (*was_alldone)
 				trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-						    TPS("FirstQ"));
+						    lazy ? TPS("FirstLazyQ") : TPS("FirstQ"));
 			WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
 			return false; // Caller must enqueue the callback.
 		}
@@ -455,12 +475,20 @@  static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
 	rcu_nocb_wait_contended(rdp);
 	rcu_nocb_bypass_lock(rdp);
 	ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+	n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
 	rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-	rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+	if (lazy)
+		rcu_cblist_enqueue_lazy(&rdp->nocb_bypass, rhp);
+	else
+		rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
 	if (!ncbs) {
 		WRITE_ONCE(rdp->nocb_bypass_first, j);
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+				    lazy ? TPS("FirstLazyBQ") : TPS("FirstBQ"));
+	} else if (!n_lazy_cbs && lazy) {
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstLazyBQ"));
 	}
+
 	rcu_nocb_bypass_unlock(rdp);
 	smp_mb(); /* Order enqueue before wake. */
 	if (ncbs) {
@@ -493,7 +521,7 @@  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
 {
 	unsigned long cur_gp_seq;
 	unsigned long j;
-	long len;
+	long len, lazy_len, bypass_len;
 	struct task_struct *t;
 
 	// If we are being polled or there is no kthread, just leave.
@@ -506,9 +534,16 @@  static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
 	}
 	// Need to actually to a wakeup.
 	len = rcu_segcblist_n_cbs(&rdp->cblist);
+	bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+	lazy_len = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
 	if (was_alldone) {
 		rdp->qlen_last_fqs_check = len;
-		if (!irqs_disabled_flags(flags)) {
+		// Only lazy CBs in bypass list
+		if (lazy_len && bypass_len == lazy_len) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
+					   TPS("WakeLazy"));
+		} else if (!irqs_disabled_flags(flags)) {
 			/* ... if queue was empty ... */
 			rcu_nocb_unlock_irqrestore(rdp, flags);
 			wake_nocb_gp(rdp, false);
@@ -599,8 +634,8 @@  static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
  */
 static void nocb_gp_wait(struct rcu_data *my_rdp)
 {
-	bool bypass = false;
-	long bypass_ncbs;
+	bool bypass = false, lazy = false;
+	long bypass_ncbs, lazy_ncbs;
 	int __maybe_unused cpu = my_rdp->cpu;
 	unsigned long cur_gp_seq;
 	unsigned long flags;
@@ -648,12 +683,21 @@  static void nocb_gp_wait(struct rcu_data *my_rdp)
 			continue;
 		}
 		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-		if (bypass_ncbs &&
+		lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
+		if (lazy_ncbs &&
+		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + LAZY_FLUSH_JIFFIES) ||
+		     bypass_ncbs > qhimark)) {
+			// Bypass full or old, so flush it.
+			(void)rcu_nocb_try_flush_bypass(rdp, j);
+			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+			lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
+		} else if (bypass_ncbs &&
 		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
 		     bypass_ncbs > 2 * qhimark)) {
 			// Bypass full or old, so flush it.
 			(void)rcu_nocb_try_flush_bypass(rdp, j);
 			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+			lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass);
 		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
 			rcu_nocb_unlock_irqrestore(rdp, flags);
 			if (needwake_state)
@@ -662,8 +706,11 @@  static void nocb_gp_wait(struct rcu_data *my_rdp)
 		}
 		if (bypass_ncbs) {
 			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-					    TPS("Bypass"));
-			bypass = true;
+				    bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
+			if (bypass_ncbs == lazy_ncbs)
+				lazy = true;
+			else
+				bypass = true;
 		}
 		rnp = rdp->mynode;
 
@@ -713,12 +760,21 @@  static void nocb_gp_wait(struct rcu_data *my_rdp)
 	my_rdp->nocb_gp_gp = needwait_gp;
 	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
 
-	if (bypass && !rcu_nocb_poll) {
-		// At least one child with non-empty ->nocb_bypass, so set
-		// timer in order to avoid stranding its callbacks.
-		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
-				   TPS("WakeBypassIsDeferred"));
+	// At least one child with non-empty ->nocb_bypass, so set
+	// timer in order to avoid stranding its callbacks.
+	if (!rcu_nocb_poll) {
+		// If bypass list only has lazy CBs. Add a deferred
+		// lazy wake up.
+		if (lazy && !bypass) {
+			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
+					TPS("WakeLazyIsDeferred"));
+		// Otherwise add a deferred bypass wake up.
+		} else if (bypass) {
+			wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+					TPS("WakeBypassIsDeferred"));
+		}
 	}
+
 	if (rcu_nocb_poll) {
 		/* Polling, so trace if first poll in the series. */
 		if (gotcbs)
@@ -999,7 +1055,7 @@  static long rcu_nocb_rdp_deoffload(void *arg)
 	 * return false, which means that future calls to rcu_nocb_try_bypass()
 	 * will refuse to put anything into the bypass.
 	 */
-	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+	WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
 	/*
 	 * Start with invoking rcu_core() early. This way if the current thread
 	 * happens to preempt an ongoing call to rcu_core() in the middle,
@@ -1500,13 +1556,14 @@  static void rcu_init_one_nocb(struct rcu_node *rnp)
 }
 
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				  unsigned long j)
+				  unsigned long j, bool lazy)
 {
 	return true;
 }
 
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-				bool *was_alldone, unsigned long flags)
+				bool *was_alldone, unsigned long flags,
+				bool lazy)
 {
 	return false;
 }