Message ID | 20220622225102.2112026-3-joel@joelfernandes.org (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | [v2,1/8] rcu: Introduce call_rcu_lazy() API implementation | expand |
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: [..] > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > index 2ccf5845957d..fec4fad6654b 100644 > --- a/kernel/rcu/tree.h > +++ b/kernel/rcu/tree.h > @@ -267,8 +267,9 @@ struct rcu_data { > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > #define RCU_NOCB_WAKE_NOT 0 > #define RCU_NOCB_WAKE_BYPASS 1 > -#define RCU_NOCB_WAKE 2 > -#define RCU_NOCB_WAKE_FORCE 3 > +#define RCU_NOCB_WAKE_LAZY 2 > +#define RCU_NOCB_WAKE 3 > +#define RCU_NOCB_WAKE_FORCE 4 > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > /* For jiffies_till_first_fqs and */ > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > static void rcu_init_one_nocb(struct rcu_node *rnp); > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j); > + unsigned long j, bool lazy); > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags); > + bool *was_alldone, unsigned long flags, > + bool lazy); > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > unsigned long flags); > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > index e369efe94fda..b9244f22e102 100644 > --- a/kernel/rcu/tree_nocb.h > +++ b/kernel/rcu/tree_nocb.h > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > } > > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > + > /* > * Arrange to wake the GP kthread for this NOCB group at some future > * time when it is safe to do so. > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > * Bypass wakeup overrides previous deferments. In case > * of callback storm, no need to wake up too early. > */ > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > + if (waketype == RCU_NOCB_WAKE_LAZY) { > + mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES); > + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { > mod_timer(&rdp_gp->nocb_timer, jiffies + 2); > WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > } else { > @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > * Note that this function always returns true if rhp is NULL. > */ > static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy) > { > struct rcu_cblist rcl; > > @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ > if (rhp) > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > + > + trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len)); Before anyone yells at me, that trace_printk() has been politely asked to take a walk :-). It got mad at me, but on the next iteration, it wont be there. thanks, - Joel
Hi "Joel,
Thank you for the patch! Perhaps something to improve:
[auto build test WARNING on linus/master]
[also build test WARNING on v5.19-rc3 next-20220622]
[cannot apply to paulmck-rcu/dev]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch]
url: https://github.com/intel-lab-lkp/linux/commits/Joel-Fernandes-Google/Implement-call_rcu_lazy-and-miscellaneous-fixes/20220623-065447
base: https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git de5c208d533a46a074eb46ea17f672cc005a7269
config: riscv-nommu_k210_defconfig (https://download.01.org/0day-ci/archive/20220623/202206230916.2YtpR3sO-lkp@intel.com/config)
compiler: riscv64-linux-gcc (GCC) 11.3.0
reproduce (this is a W=1 build):
wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
chmod +x ~/bin/make.cross
# https://github.com/intel-lab-lkp/linux/commit/543ee31928d1cff057320ff64603283a34fe0052
git remote add linux-review https://github.com/intel-lab-lkp/linux
git fetch --no-tags linux-review Joel-Fernandes-Google/Implement-call_rcu_lazy-and-miscellaneous-fixes/20220623-065447
git checkout 543ee31928d1cff057320ff64603283a34fe0052
# save the config file
mkdir build_dir && cp config build_dir/.config
COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-11.3.0 make.cross W=1 O=build_dir ARCH=riscv SHELL=/bin/bash kernel/rcu/
If you fix the issue, kindly add following tag where applicable
Reported-by: kernel test robot <lkp@intel.com>
All warnings (new ones prefixed by >>):
kernel/rcu/tree.c:3103: warning: Function parameter or member 'lazy' not described in '__call_rcu_common'
>> kernel/rcu/tree.c:3103: warning: expecting prototype for call_rcu(). Prototype was for __call_rcu_common() instead
vim +3103 kernel/rcu/tree.c
b2b00ddf193bf83 kernel/rcu/tree.c Paul E. McKenney 2019-10-30 3060
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3061 /**
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3062 * call_rcu() - Queue an RCU callback for invocation after a grace period.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3063 * @head: structure to be used for queueing the RCU updates.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3064 * @func: actual callback function to be invoked after the grace period
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3065 *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3066 * The callback function will be invoked some time after a full grace
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3067 * period elapses, in other words after all pre-existing RCU read-side
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3068 * critical sections have completed. However, the callback function
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3069 * might well execute concurrently with RCU read-side critical sections
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3070 * that started after call_rcu() was invoked.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3071 *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3072 * RCU read-side critical sections are delimited by rcu_read_lock()
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3073 * and rcu_read_unlock(), and may be nested. In addition, but only in
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3074 * v5.0 and later, regions of code across which interrupts, preemption,
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3075 * or softirqs have been disabled also serve as RCU read-side critical
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3076 * sections. This includes hardware interrupt handlers, softirq handlers,
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3077 * and NMI handlers.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3078 *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3079 * Note that all CPUs must agree that the grace period extended beyond
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3080 * all pre-existing RCU read-side critical section. On systems with more
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3081 * than one CPU, this means that when "func()" is invoked, each CPU is
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3082 * guaranteed to have executed a full memory barrier since the end of its
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3083 * last RCU read-side critical section whose beginning preceded the call
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3084 * to call_rcu(). It also means that each CPU executing an RCU read-side
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3085 * critical section that continues beyond the start of "func()" must have
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3086 * executed a memory barrier after the call_rcu() but before the beginning
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3087 * of that RCU read-side critical section. Note that these guarantees
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3088 * include CPUs that are offline, idle, or executing in user mode, as
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3089 * well as CPUs that are executing in the kernel.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3090 *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3091 * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3092 * resulting RCU callback function "func()", then both CPU A and CPU B are
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3093 * guaranteed to execute a full memory barrier during the time interval
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3094 * between the call to call_rcu() and the invocation of "func()" -- even
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3095 * if CPU A and CPU B are the same CPU (but again only if the system has
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3096 * more than one CPU).
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3097 *
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3098 * Implementation of these memory-ordering guarantees is described here:
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3099 * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3100 */
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google 2022-06-22 3101) static void
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google 2022-06-22 3102) __call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 @3103 {
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney 2020-12-08 3104 static atomic_t doublefrees;
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3105 unsigned long flags;
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3106 struct rcu_data *rdp;
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3107 bool was_alldone;
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3108
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney 2016-08-23 3109 /* Misaligned rcu_head! */
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney 2016-08-23 3110 WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
b8f2ed538477d9a kernel/rcu/tree.c Paul E. McKenney 2016-08-23 3111
ae15018456c44b7 kernel/rcutree.c Paul E. McKenney 2013-04-23 3112 if (debug_rcu_head_queue(head)) {
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney 2017-05-03 3113 /*
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney 2017-05-03 3114 * Probable double call_rcu(), so leak the callback.
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney 2017-05-03 3115 * Use rcu:rcu_callback trace event to find the previous
1fe09ebe7a9c990 kernel/rcu/tree.c Paul E. McKenney 2021-12-18 3116 * time callback was passed to call_rcu().
fa3c66476975abf kernel/rcu/tree.c Paul E. McKenney 2017-05-03 3117 */
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney 2020-12-08 3118 if (atomic_inc_return(&doublefrees) < 4) {
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney 2020-12-08 3119 pr_err("%s(): Double-freed CB %p->%pS()!!! ", __func__, head, head->func);
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney 2020-12-08 3120 mem_dump_obj(head);
b4b7914a6a73fc1 kernel/rcu/tree.c Paul E. McKenney 2020-12-08 3121 }
7d0ae8086b82831 kernel/rcu/tree.c Paul E. McKenney 2015-03-03 3122 WRITE_ONCE(head->func, rcu_leak_callback);
ae15018456c44b7 kernel/rcutree.c Paul E. McKenney 2013-04-23 3123 return;
ae15018456c44b7 kernel/rcutree.c Paul E. McKenney 2013-04-23 3124 }
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3125 head->func = func;
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3126 head->next = NULL;
300c0c5e721834f kernel/rcu/tree.c Jun Miao 2021-11-16 3127 kasan_record_aux_stack_noalloc(head);
d818cc76e2b4d5f kernel/rcu/tree.c Zqiang 2021-12-26 3128 local_irq_save(flags);
da1df50d16171f4 kernel/rcu/tree.c Paul E. McKenney 2018-07-03 3129 rdp = this_cpu_ptr(&rcu_data);
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3130
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3131 /* Add the callback to our list. */
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3132 if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3133 // This can trigger due to call_rcu() from offline CPU:
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3134 WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
34404ca8fb252cc kernel/rcu/tree.c Paul E. McKenney 2015-01-19 3135 WARN_ON_ONCE(!rcu_is_watching());
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3136 // Very early boot, before rcu_init(). Initialize if needed
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3137 // and then drop through to queue the callback.
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney 2017-02-08 3138 if (rcu_segcblist_empty(&rdp->cblist))
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney 2017-02-08 3139 rcu_segcblist_init(&rdp->cblist);
143da9c2fc030a5 kernel/rcu/tree.c Paul E. McKenney 2015-01-19 3140 }
77a40f97030b27b kernel/rcu/tree.c Joel Fernandes (Google 2019-08-30 3141)
b2b00ddf193bf83 kernel/rcu/tree.c Paul E. McKenney 2019-10-30 3142 check_cb_ovld(rdp);
543ee31928d1cff kernel/rcu/tree.c Joel Fernandes (Google 2022-06-22 3143) if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
d1b222c6be1f8bf kernel/rcu/tree.c Paul E. McKenney 2019-07-02 3144 return; // Enqueued onto ->nocb_bypass, so just leave.
b692dc4adfcff54 kernel/rcu/tree.c Paul E. McKenney 2020-02-11 3145 // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
77a40f97030b27b kernel/rcu/tree.c Joel Fernandes (Google 2019-08-30 3146) rcu_segcblist_enqueue(&rdp->cblist, head);
c408b215f58f715 kernel/rcu/tree.c Uladzislau Rezki (Sony 2020-05-25 3147) if (__is_kvfree_rcu_offset((unsigned long)func))
c408b215f58f715 kernel/rcu/tree.c Uladzislau Rezki (Sony 2020-05-25 3148) trace_rcu_kvfree_callback(rcu_state.name, head,
3c779dfef2c4524 kernel/rcu/tree.c Paul E. McKenney 2018-07-05 3149 (unsigned long)func,
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney 2017-02-08 3150 rcu_segcblist_n_cbs(&rdp->cblist));
d4c08f2ac311a36 kernel/rcutree.c Paul E. McKenney 2011-06-25 3151 else
3c779dfef2c4524 kernel/rcu/tree.c Paul E. McKenney 2018-07-05 3152 trace_rcu_callback(rcu_state.name, head,
15fecf89e46a962 kernel/rcu/tree.c Paul E. McKenney 2017-02-08 3153 rcu_segcblist_n_cbs(&rdp->cblist));
d4c08f2ac311a36 kernel/rcutree.c Paul E. McKenney 2011-06-25 3154
3afe7fa535491ec kernel/rcu/tree.c Joel Fernandes (Google 2020-11-14 3155) trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
3afe7fa535491ec kernel/rcu/tree.c Joel Fernandes (Google 2020-11-14 3156)
29154c57e35a191 kernel/rcutree.c Paul E. McKenney 2012-05-30 3157 /* Go handle any RCU core processing required. */
3820b513a2e33d6 kernel/rcu/tree.c Frederic Weisbecker 2020-11-12 3158 if (unlikely(rcu_rdp_is_offloaded(rdp))) {
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3159 __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3160 } else {
5c7d89676bc5196 kernel/rcu/tree.c Paul E. McKenney 2018-07-03 3161 __call_rcu_core(rdp, head, flags);
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3162 local_irq_restore(flags);
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3163 }
5d6742b37727e11 kernel/rcu/tree.c Paul E. McKenney 2019-05-15 3164 }
64db4cfff99c04c kernel/rcutree.c Paul E. McKenney 2008-12-18 3165
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > Implement timer-based RCU lazy callback batching. The batch is flushed > whenever a certain amount of time has passed, or the batch on a > particular CPU grows too big. Also memory pressure will flush it in a > future patch. > > To handle several corner cases automagically (such as rcu_barrier() and > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list > length has the lazy CB length included in it. A separate lazy CB length > counter is also introduced to keep track of the number of lazy CBs. > > Suggested-by: Paul McKenney <paulmck@kernel.org> > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org> Not bad, but some questions and comments below. Thanx, Paul > --- > include/linux/rcu_segcblist.h | 1 + > include/linux/rcupdate.h | 6 ++ > kernel/rcu/Kconfig | 8 +++ > kernel/rcu/rcu_segcblist.c | 19 ++++++ > kernel/rcu/rcu_segcblist.h | 14 ++++ > kernel/rcu/tree.c | 24 +++++-- > kernel/rcu/tree.h | 10 +-- > kernel/rcu/tree_nocb.h | 125 +++++++++++++++++++++++++--------- > 8 files changed, 164 insertions(+), 43 deletions(-) > > diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h > index 659d13a7ddaa..9a992707917b 100644 > --- a/include/linux/rcu_segcblist.h > +++ b/include/linux/rcu_segcblist.h > @@ -22,6 +22,7 @@ struct rcu_cblist { > struct rcu_head *head; > struct rcu_head **tail; > long len; > + long lazy_len; > }; > > #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > index 1a32036c918c..9191a3d88087 100644 > --- a/include/linux/rcupdate.h > +++ b/include/linux/rcupdate.h > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > > +#ifdef CONFIG_RCU_LAZY > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); > +#else > +#define call_rcu_lazy(head, func) call_rcu(head, func) > +#endif > + > /* Internal to kernel */ > void rcu_init(void); > extern int rcu_scheduler_active; > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig > index 27aab870ae4c..0bffa992fdc4 100644 > --- a/kernel/rcu/Kconfig > +++ b/kernel/rcu/Kconfig > @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB > Say N here if you hate read-side memory barriers. > Take the default if you are unsure. > > +config RCU_LAZY > + bool "RCU callback lazy invocation functionality" > + depends on RCU_NOCB_CPU > + default n > + help > + To save power, batch RCU callbacks and flush after delay, memory > + pressure or callback list growing too big. Spaces vs. tabs. The checkpatch warning is unhelpful ("please write a help paragraph that fully describes the config symbol"), but please fix the whitespace if you have not already done so. > endmenu # "RCU Subsystem" > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c > index c54ea2b6a36b..627a3218a372 100644 > --- a/kernel/rcu/rcu_segcblist.c > +++ b/kernel/rcu/rcu_segcblist.c > @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp) > rclp->head = NULL; > rclp->tail = &rclp->head; > rclp->len = 0; > + rclp->lazy_len = 0; > } > > /* > @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) > WRITE_ONCE(rclp->len, rclp->len + 1); > } > > +/* > + * Enqueue an rcu_head structure onto the specified callback list. Please also note the fact that it is enqueuing lazily. > + */ > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp) > +{ > + rcu_cblist_enqueue(rclp, rhp); > + WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1); Except... Why not just add a "lazy" parameter to rcu_cblist_enqueue()? IS_ENABLED() can make it fast. > +} > + > /* > * Flush the second rcu_cblist structure onto the first one, obliterating > * any contents of the first. If rhp is non-NULL, enqueue it as the sole > @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > } > } Header comment, please. It can be short, referring to that of the function rcu_cblist_flush_enqueue(). > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > + struct rcu_cblist *srclp, > + struct rcu_head *rhp) Please line up the "struct" keywords. (Picky, I know...) > +{ > + rcu_cblist_flush_enqueue(drclp, srclp, rhp); > + if (rhp) > + WRITE_ONCE(srclp->lazy_len, 1); Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()? Concerns about speed in the !RCU_LAZY case can be addressed using IS_ENABLED(), for example: if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp) WRITE_ONCE(srclp->lazy_len, 1); > +} > + > /* > * Dequeue the oldest rcu_head structure from the specified callback > * list. > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h > index 431cee212467..c3d7de65b689 100644 > --- a/kernel/rcu/rcu_segcblist.h > +++ b/kernel/rcu/rcu_segcblist.h > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) > return READ_ONCE(rclp->len); > } > > +/* Return number of callbacks in the specified callback list. */ > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) > +{ > +#ifdef CONFIG_RCU_LAZY > + return READ_ONCE(rclp->lazy_len); > +#else > + return 0; > +#endif Please use IS_ENABLED(). This saves a line (and lots of characters) but compiles just as efficienctly. > +} > + > /* Return number of callbacks in segmented callback list by summing seglen. */ > long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp); > > void rcu_cblist_init(struct rcu_cblist *rclp); > void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp); > void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > struct rcu_cblist *srclp, > struct rcu_head *rhp); > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > + struct rcu_cblist *srclp, > + struct rcu_head *rhp); Please line up the "struct" keywords. (Still picky, I know...) > +{ > + rcu_cblist_flush_enqueue(drclp, srclp, rhp); > + if (rhp) > + WRITE_ONCE(srclp->lazy_len, 1); > +} > + > /* > * Dequeue the oldest rcu_head structure from the specified callback > * list. > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h > index 431cee212467..c3d7de65b689 100644 > --- a/kernel/rcu/rcu_segcblist.h > +++ b/kernel/rcu/rcu_segcblist.h > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) > return READ_ONCE(rclp->len); > } > > +/* Return number of callbacks in the specified callback list. */ > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) > +{ > +#ifdef CONFIG_RCU_LAZY > + return READ_ONCE(rclp->lazy_len); > +#else > + return 0; > +#endif Please use IS_ENABLED(). This saves a line (and lots of characters) but compiles just as efficienctly. > +} > + > /* Return number of callbacks in segmented callback list by summing seglen. */ > long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp); > > void rcu_cblist_init(struct rcu_cblist *rclp); > void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp); > void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > struct rcu_cblist *srclp, > struct rcu_head *rhp); > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > + struct rcu_cblist *srclp, > + struct rcu_head *rhp); Please line up the "struct" keywords. (Even more pickiness, I know...) > struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); > > /* > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index c25ba442044a..d2e3d6e176d2 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp) > * Implementation of these memory-ordering guarantees is described here: > * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. > */ The above docbook comment needs to move to call_rcu(). > -void call_rcu(struct rcu_head *head, rcu_callback_t func) > +static void > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) > { > static atomic_t doublefrees; > unsigned long flags; > @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > } > > check_cb_ovld(rdp); > - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) > + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) > return; // Enqueued onto ->nocb_bypass, so just leave. > // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. > rcu_segcblist_enqueue(&rdp->cblist, head); > @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > local_irq_restore(flags); > } > } > -EXPORT_SYMBOL_GPL(call_rcu); Please add a docbook comment for call_rcu_lazy(). It can be brief, for example, by referring to call_rcu()'s docbook comment for memory-ordering details. > +#ifdef CONFIG_RCU_LAZY > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) > +{ > + return __call_rcu_common(head, func, true); > +} > +EXPORT_SYMBOL_GPL(call_rcu_lazy); > +#endif > + > +void call_rcu(struct rcu_head *head, rcu_callback_t func) > +{ > + return __call_rcu_common(head, func, false); > + > +} > +EXPORT_SYMBOL_GPL(call_rcu); > > /* Maximum number of jiffies to wait before draining a batch. */ > #define KFREE_DRAIN_JIFFIES (HZ / 50) > @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) > rdp->barrier_head.func = rcu_barrier_callback; > debug_rcu_head_queue(&rdp->barrier_head); > rcu_nocb_lock(rdp); > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); > if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { > atomic_inc(&rcu_state.barrier_cpu_count); > } else { > @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu) > my_rdp = this_cpu_ptr(&rcu_data); > my_rnp = my_rdp->mynode; > rcu_nocb_lock(my_rdp); /* irqs already disabled. */ > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false)); > raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ > /* Leverage recent GPs and set GP for new callbacks. */ > needwake = rcu_advance_cbs(my_rnp, rdp) || > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > index 2ccf5845957d..fec4fad6654b 100644 > --- a/kernel/rcu/tree.h > +++ b/kernel/rcu/tree.h > @@ -267,8 +267,9 @@ struct rcu_data { > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > #define RCU_NOCB_WAKE_NOT 0 > #define RCU_NOCB_WAKE_BYPASS 1 > -#define RCU_NOCB_WAKE 2 > -#define RCU_NOCB_WAKE_FORCE 3 > +#define RCU_NOCB_WAKE_LAZY 2 > +#define RCU_NOCB_WAKE 3 > +#define RCU_NOCB_WAKE_FORCE 4 > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > /* For jiffies_till_first_fqs and */ > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > static void rcu_init_one_nocb(struct rcu_node *rnp); > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j); > + unsigned long j, bool lazy); > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags); > + bool *was_alldone, unsigned long flags, > + bool lazy); > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > unsigned long flags); > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > index e369efe94fda..b9244f22e102 100644 > --- a/kernel/rcu/tree_nocb.h > +++ b/kernel/rcu/tree_nocb.h > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > } Comment on LAZY_FLUSH_JIFFIES purpose in life, please! (At some point more flexibility may be required, but let's not unnecessarily rush into that.) > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > + > /* > * Arrange to wake the GP kthread for this NOCB group at some future > * time when it is safe to do so. > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > * Bypass wakeup overrides previous deferments. In case > * of callback storm, no need to wake up too early. > */ > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > + if (waketype == RCU_NOCB_WAKE_LAZY) { Presumably we get here only if all of this CPU's callbacks are lazy? > + mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES); > + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { > mod_timer(&rdp_gp->nocb_timer, jiffies + 2); > WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > } else { > @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > * Note that this function always returns true if rhp is NULL. > */ > static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy) > { > struct rcu_cblist rcl; > > @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ > if (rhp) > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > + > + trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len)); This debug code need to go, of course. If you would like, you could replace it with a trace event. > + /* The lazy CBs are being flushed, but a new one might be enqueued. */ > + if (lazy) > + rcu_cblist_flush_enqueue_lazy(&rcl, &rdp->nocb_bypass, rhp); > + else > + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); Shouldn't these be a single function with a "lazy" argument, as noted earlier? > rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); > WRITE_ONCE(rdp->nocb_bypass_first, j); > rcu_nocb_bypass_unlock(rdp); > @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > * Note that this function always returns true if rhp is NULL. > */ > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy) > { > if (!rcu_rdp_is_offloaded(rdp)) > return true; > rcu_lockdep_assert_cblist_protected(rdp); > rcu_nocb_bypass_lock(rdp); > - return rcu_nocb_do_flush_bypass(rdp, rhp, j); > + return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); > } > > /* > @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > if (!rcu_rdp_is_offloaded(rdp) || > !rcu_nocb_bypass_trylock(rdp)) > return; > - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); > + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); > } > > /* > @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > * there is only one CPU in operation. > */ > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags) > + bool *was_alldone, unsigned long flags, > + bool lazy) > { > unsigned long c; > unsigned long cur_gp_seq; > unsigned long j = jiffies; > long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > > lockdep_assert_irqs_disabled(); > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > } > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > - // If there hasn't yet been all that many ->cblist enqueues > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > - // ->nocb_bypass first. > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > + // If caller passed a non-lazy CB and there hasn't yet been all that > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > + // number of CBs (lazy + non-lazy) grows too much. > + // > + // Note that if the bypass list has lazy CBs, and the main list is > + // empty, and rhp happens to be non-lazy, then we end up flushing all > + // the lazy CBs to the main list as well. That's the right thing to do, > + // since we are kick-starting RCU GP processing anyway for the non-lazy > + // one, we can just reuse that GP for the already queued-up lazy ones. > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > + (lazy && n_lazy_cbs >= qhimark)) { > rcu_nocb_lock(rdp); > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > - TPS("FirstQ")); > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); The "false" here instead of "lazy" is because the caller is to do the enqueuing, correct? > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > return false; // Caller must enqueue the callback. > } > > // If ->nocb_bypass has been used too long or is too full, > // flush ->nocb_bypass to ->cblist. > - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || > - ncbs >= qhimark) { > + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { > rcu_nocb_lock(rdp); > - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { > + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { But shouldn't this "true" be "lazy"? I don't see how we are guaranteed that the callback is in fact lazy at this point in the code. Also, there is not yet a guarantee that the caller will do the enqueuing. So what am I missing? > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > - TPS("FirstQ")); > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > return false; // Caller must enqueue the callback. > } > @@ -455,12 +475,20 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > rcu_nocb_wait_contended(rdp); > rcu_nocb_bypass_lock(rdp); > ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > - rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); > + if (lazy) > + rcu_cblist_enqueue_lazy(&rdp->nocb_bypass, rhp); > + else > + rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); And this is one reason to add the "lazy" parameter to rcu_cblist_enqueue(). > if (!ncbs) { > WRITE_ONCE(rdp->nocb_bypass_first, j); > - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); > + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > + lazy ? TPS("FirstLazyBQ") : TPS("FirstBQ")); > + } else if (!n_lazy_cbs && lazy) { > + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstLazyBQ")); But in this case, there already are callbacks. In that case, how does it help to keep track of the laziness of this callback? In fact, aren't the only meaningful states "empty", "all lazy", and "at least one non-lazy callback"? After all, if you have at least one non-lazy callback, it needs to be business as usual, correct? > } > + > rcu_nocb_bypass_unlock(rdp); > smp_mb(); /* Order enqueue before wake. */ > if (ncbs) { > @@ -493,7 +521,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, > { > unsigned long cur_gp_seq; > unsigned long j; > - long len; > + long len, lazy_len, bypass_len; > struct task_struct *t; > > // If we are being polled or there is no kthread, just leave. > @@ -506,9 +534,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, > } > // Need to actually to a wakeup. > len = rcu_segcblist_n_cbs(&rdp->cblist); > + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + lazy_len = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > if (was_alldone) { > rdp->qlen_last_fqs_check = len; > - if (!irqs_disabled_flags(flags)) { > + // Only lazy CBs in bypass list > + if (lazy_len && bypass_len == lazy_len) { And this is one piece of evidence -- you are only checking for all callbacks being lazy. So do you really need to count them, as opposed to note that all are lazy on the one hand or some are non-lazy on the other? > + rcu_nocb_unlock_irqrestore(rdp, flags); > + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, > + TPS("WakeLazy")); > + } else if (!irqs_disabled_flags(flags)) { > /* ... if queue was empty ... */ > rcu_nocb_unlock_irqrestore(rdp, flags); > wake_nocb_gp(rdp, false); > @@ -599,8 +634,8 @@ static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, > */ > static void nocb_gp_wait(struct rcu_data *my_rdp) > { > - bool bypass = false; > - long bypass_ncbs; > + bool bypass = false, lazy = false; > + long bypass_ncbs, lazy_ncbs; > int __maybe_unused cpu = my_rdp->cpu; > unsigned long cur_gp_seq; > unsigned long flags; > @@ -648,12 +683,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > continue; > } > bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > - if (bypass_ncbs && > + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > + if (lazy_ncbs && This one works either way. The current approach works because the timeout is longer. If you have exceeded the lazy timeout, you don't care that there are non-lazy callbacks queued. > + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + LAZY_FLUSH_JIFFIES) || > + bypass_ncbs > qhimark)) { > + // Bypass full or old, so flush it. > + (void)rcu_nocb_try_flush_bypass(rdp, j); > + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > + } else if (bypass_ncbs && > (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || > bypass_ncbs > 2 * qhimark)) { > // Bypass full or old, so flush it. > (void)rcu_nocb_try_flush_bypass(rdp, j); > bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); And these two "if" bodies are identical, correct? Can they be merged? > } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { > rcu_nocb_unlock_irqrestore(rdp, flags); > if (needwake_state) > @@ -662,8 +706,11 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > } > if (bypass_ncbs) { > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > - TPS("Bypass")); > - bypass = true; > + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass")); > + if (bypass_ncbs == lazy_ncbs) > + lazy = true; > + else > + bypass = true; Another place where a lazy flag rather than count would suit. > } > rnp = rdp->mynode; > > @@ -713,12 +760,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > my_rdp->nocb_gp_gp = needwait_gp; > my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; > > - if (bypass && !rcu_nocb_poll) { > - // At least one child with non-empty ->nocb_bypass, so set > - // timer in order to avoid stranding its callbacks. > - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, > - TPS("WakeBypassIsDeferred")); > + // At least one child with non-empty ->nocb_bypass, so set > + // timer in order to avoid stranding its callbacks. > + if (!rcu_nocb_poll) { > + // If bypass list only has lazy CBs. Add a deferred > + // lazy wake up. > + if (lazy && !bypass) { > + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY, > + TPS("WakeLazyIsDeferred")); > + // Otherwise add a deferred bypass wake up. > + } else if (bypass) { > + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, > + TPS("WakeBypassIsDeferred")); > + } > } > + > if (rcu_nocb_poll) { > /* Polling, so trace if first poll in the series. */ > if (gotcbs) > @@ -999,7 +1055,7 @@ static long rcu_nocb_rdp_deoffload(void *arg) > * return false, which means that future calls to rcu_nocb_try_bypass() > * will refuse to put anything into the bypass. > */ > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); > /* > * Start with invoking rcu_core() early. This way if the current thread > * happens to preempt an ongoing call to rcu_core() in the middle, > @@ -1500,13 +1556,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) > } > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - unsigned long j) > + unsigned long j, bool lazy) > { > return true; > } > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > - bool *was_alldone, unsigned long flags) > + bool *was_alldone, unsigned long flags, > + bool lazy) > { > return false; > } > -- > 2.37.0.rc0.104.g0611611a94-goog >
On Wed, Jun 22, 2022 at 11:18:02PM +0000, Joel Fernandes wrote: > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > [..] > > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > > index 2ccf5845957d..fec4fad6654b 100644 > > --- a/kernel/rcu/tree.h > > +++ b/kernel/rcu/tree.h > > @@ -267,8 +267,9 @@ struct rcu_data { > > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > > #define RCU_NOCB_WAKE_NOT 0 > > #define RCU_NOCB_WAKE_BYPASS 1 > > -#define RCU_NOCB_WAKE 2 > > -#define RCU_NOCB_WAKE_FORCE 3 > > +#define RCU_NOCB_WAKE_LAZY 2 > > +#define RCU_NOCB_WAKE 3 > > +#define RCU_NOCB_WAKE_FORCE 4 > > > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > > /* For jiffies_till_first_fqs and */ > > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > > static void rcu_init_one_nocb(struct rcu_node *rnp); > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - unsigned long j); > > + unsigned long j, bool lazy); > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - bool *was_alldone, unsigned long flags); > > + bool *was_alldone, unsigned long flags, > > + bool lazy); > > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > > unsigned long flags); > > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > > index e369efe94fda..b9244f22e102 100644 > > --- a/kernel/rcu/tree_nocb.h > > +++ b/kernel/rcu/tree_nocb.h > > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > > } > > > > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > > + > > /* > > * Arrange to wake the GP kthread for this NOCB group at some future > > * time when it is safe to do so. > > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > > * Bypass wakeup overrides previous deferments. In case > > * of callback storm, no need to wake up too early. > > */ > > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > > + if (waketype == RCU_NOCB_WAKE_LAZY) { > > + mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES); > > + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > > + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { > > mod_timer(&rdp_gp->nocb_timer, jiffies + 2); > > WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); > > } else { > > @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > > * Note that this function always returns true if rhp is NULL. > > */ > > static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - unsigned long j) > > + unsigned long j, bool lazy) > > { > > struct rcu_cblist rcl; > > > > @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ > > if (rhp) > > rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ > > - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); > > + > > + trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len)); > > Before anyone yells at me, that trace_printk() has been politely asked to take > a walk :-). It got mad at me, but on the next iteration, it wont be there. ;-) ;-) ;-) Thanx, Paul
On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > } > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > - // If there hasn't yet been all that many ->cblist enqueues > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > - // ->nocb_bypass first. > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > + // If caller passed a non-lazy CB and there hasn't yet been all that > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > + // number of CBs (lazy + non-lazy) grows too much. > + // > + // Note that if the bypass list has lazy CBs, and the main list is > + // empty, and rhp happens to be non-lazy, then we end up flushing all > + // the lazy CBs to the main list as well. That's the right thing to do, > + // since we are kick-starting RCU GP processing anyway for the non-lazy > + // one, we can just reuse that GP for the already queued-up lazy ones. > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > + (lazy && n_lazy_cbs >= qhimark)) { > rcu_nocb_lock(rdp); > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > - TPS("FirstQ")); > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); That's outside the scope of this patchset but this makes me realize we unconditionally try to flush the bypass from call_rcu() fastpath, and therefore we unconditionally lock the bypass lock from call_rcu() fastpath. It shouldn't be contended at this stage since we are holding the nocb_lock already, and only the local CPU can hold the nocb_bypass_lock without holding the nocb_lock. But still... It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) before doing anything. Only the local CPU can enqueue to the bypass list. Adding that to my TODO list...
On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote: > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > } > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > - // If there hasn't yet been all that many ->cblist enqueues > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > - // ->nocb_bypass first. > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > + // number of CBs (lazy + non-lazy) grows too much. > > + // > > + // Note that if the bypass list has lazy CBs, and the main list is > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > + // the lazy CBs to the main list as well. That's the right thing to do, > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > + (lazy && n_lazy_cbs >= qhimark)) { > > rcu_nocb_lock(rdp); > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > if (*was_alldone) > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > - TPS("FirstQ")); > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > That's outside the scope of this patchset but this makes me realize we > unconditionally try to flush the bypass from call_rcu() fastpath, and > therefore we unconditionally lock the bypass lock from call_rcu() fastpath. > > It shouldn't be contended at this stage since we are holding the nocb_lock > already, and only the local CPU can hold the nocb_bypass_lock without holding > the nocb_lock. But still... > > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) > before doing anything. Only the local CPU can enqueue to the bypass list. > > Adding that to my TODO list... That does sound like a potentially very helpful approach! As always, please analyze and test carefully! Thanx, Paul
On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote: > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > } > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > - // If there hasn't yet been all that many ->cblist enqueues > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > - // ->nocb_bypass first. > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > + // number of CBs (lazy + non-lazy) grows too much. > > + // > > + // Note that if the bypass list has lazy CBs, and the main list is > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > + // the lazy CBs to the main list as well. That's the right thing to do, > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > + (lazy && n_lazy_cbs >= qhimark)) { > > rcu_nocb_lock(rdp); > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > if (*was_alldone) > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > - TPS("FirstQ")); > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > That's outside the scope of this patchset but this makes me realize we > unconditionally try to flush the bypass from call_rcu() fastpath, and > therefore we unconditionally lock the bypass lock from call_rcu() fastpath. > > It shouldn't be contended at this stage since we are holding the nocb_lock > already, and only the local CPU can hold the nocb_bypass_lock without holding > the nocb_lock. But still... > > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) > before doing anything. Only the local CPU can enqueue to the bypass list. > > Adding that to my TODO list... > I am afraid I did not understand your comment. The bypass list lock is held once we have decided to use the bypass list to queue something on to it. The bypass flushing is also conditional on either the bypass cblist growing too big or a jiffie elapsing since the first bypass queue. So in both cases, acquiring the lock is conditional. What do you mean it is unconditionally acquiring the bypass lock? Where? Thanks! - Joel
On Wed, Jun 29, 2022 at 08:29:48PM +0000, Joel Fernandes wrote: > On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote: > > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > } > > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > > > - // If there hasn't yet been all that many ->cblist enqueues > > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > > - // ->nocb_bypass first. > > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > > + // number of CBs (lazy + non-lazy) grows too much. > > > + // > > > + // Note that if the bypass list has lazy CBs, and the main list is > > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > > + // the lazy CBs to the main list as well. That's the right thing to do, > > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > > + (lazy && n_lazy_cbs >= qhimark)) { > > > rcu_nocb_lock(rdp); > > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > > if (*was_alldone) > > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > > - TPS("FirstQ")); > > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > > > That's outside the scope of this patchset but this makes me realize we > > unconditionally try to flush the bypass from call_rcu() fastpath, and > > therefore we unconditionally lock the bypass lock from call_rcu() fastpath. > > > > It shouldn't be contended at this stage since we are holding the nocb_lock > > already, and only the local CPU can hold the nocb_bypass_lock without holding > > the nocb_lock. But still... > > > > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) > > before doing anything. Only the local CPU can enqueue to the bypass list. > > > > Adding that to my TODO list... > > > > I am afraid I did not understand your comment. The bypass list lock is held > once we have decided to use the bypass list to queue something on to it. > > The bypass flushing is also conditional on either the bypass cblist growing > too big or a jiffie elapsing since the first bypass queue. > > So in both cases, acquiring the lock is conditional. What do you mean it is > unconditionally acquiring the bypass lock? Where? Just to make sure we are talking about the same thing, I'm referring to this path: // If there hasn't yet been all that many ->cblist enqueues // this jiffy, tell the caller to enqueue onto ->cblist. But flush // ->nocb_bypass first. if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { rcu_nocb_lock(rdp); *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstQ")); WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. } This is called whenever we decide not to queue to the bypass list because there is no flooding detected (rdp->nocb_nobypass_count hasn't reached nocb_nobypass_lim_per_jiffy for the current jiffy). I call this the fast path because this is what I would except in a normal load, as opposed to callbacks flooding. And in this fastpath, the above rcu_nocb_flush_bypass() is unconditional. > > Thanks! > > - Joel >
On Thu, Jun 30, 2022 at 12:01:14AM +0200, Frederic Weisbecker wrote: > On Wed, Jun 29, 2022 at 08:29:48PM +0000, Joel Fernandes wrote: > > On Wed, Jun 29, 2022 at 01:53:49PM +0200, Frederic Weisbecker wrote: > > > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > > } > > > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > > > > > - // If there hasn't yet been all that many ->cblist enqueues > > > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > > > - // ->nocb_bypass first. > > > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > > > + // number of CBs (lazy + non-lazy) grows too much. > > > > + // > > > > + // Note that if the bypass list has lazy CBs, and the main list is > > > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > > > + // the lazy CBs to the main list as well. That's the right thing to do, > > > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > > > + (lazy && n_lazy_cbs >= qhimark)) { > > > > rcu_nocb_lock(rdp); > > > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > > > if (*was_alldone) > > > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > > > - TPS("FirstQ")); > > > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > > > > > That's outside the scope of this patchset but this makes me realize we > > > unconditionally try to flush the bypass from call_rcu() fastpath, and > > > therefore we unconditionally lock the bypass lock from call_rcu() fastpath. > > > > > > It shouldn't be contended at this stage since we are holding the nocb_lock > > > already, and only the local CPU can hold the nocb_bypass_lock without holding > > > the nocb_lock. But still... > > > > > > It looks safe to locklessly early check if (rcu_cblist_n_cbs(&rdp->nocb_bypass)) > > > before doing anything. Only the local CPU can enqueue to the bypass list. > > > > > > Adding that to my TODO list... > > > > > > > I am afraid I did not understand your comment. The bypass list lock is held > > once we have decided to use the bypass list to queue something on to it. > > > > The bypass flushing is also conditional on either the bypass cblist growing > > too big or a jiffie elapsing since the first bypass queue. > > > > So in both cases, acquiring the lock is conditional. What do you mean it is > > unconditionally acquiring the bypass lock? Where? > > Just to make sure we are talking about the same thing, I'm referring to this > path: > > // If there hasn't yet been all that many ->cblist enqueues > // this jiffy, tell the caller to enqueue onto ->cblist. But flush > // ->nocb_bypass first. > if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > rcu_nocb_lock(rdp); > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > if (*was_alldone) > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > TPS("FirstQ")); > WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > return false; // Caller must enqueue the callback. > } > > This is called whenever we decide not to queue to the bypass list because > there is no flooding detected (rdp->nocb_nobypass_count hasn't reached > nocb_nobypass_lim_per_jiffy for the current jiffy). I call this the fast path > because this is what I would except in a normal load, as opposed to callbacks > flooding. > > And in this fastpath, the above rcu_nocb_flush_bypass() is unconditional. Sorry you are right, I see that now. Another reason for why the contention is probably not a big deal (other than the nocb lock being held), is that all other callers of the flush appear to be in slow paths except for this one. Unless someone is offloading/deoffloading rapidly or something :) thanks, - Joel
On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote: > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > Implement timer-based RCU lazy callback batching. The batch is flushed > > whenever a certain amount of time has passed, or the batch on a > > particular CPU grows too big. Also memory pressure will flush it in a > > future patch. > > > > To handle several corner cases automagically (such as rcu_barrier() and > > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list > > length has the lazy CB length included in it. A separate lazy CB length > > counter is also introduced to keep track of the number of lazy CBs. > > > > Suggested-by: Paul McKenney <paulmck@kernel.org> > > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org> > > Not bad, but some questions and comments below. Thanks a lot for these, real helpful and I replied below: > > diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h > > index 659d13a7ddaa..9a992707917b 100644 > > --- a/include/linux/rcu_segcblist.h > > +++ b/include/linux/rcu_segcblist.h > > @@ -22,6 +22,7 @@ struct rcu_cblist { > > struct rcu_head *head; > > struct rcu_head **tail; > > long len; > > + long lazy_len; > > }; > > > > #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > > index 1a32036c918c..9191a3d88087 100644 > > --- a/include/linux/rcupdate.h > > +++ b/include/linux/rcupdate.h > > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) > > > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > > > > +#ifdef CONFIG_RCU_LAZY > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); > > +#else > > +#define call_rcu_lazy(head, func) call_rcu(head, func) > > +#endif > > + > > /* Internal to kernel */ > > void rcu_init(void); > > extern int rcu_scheduler_active; > > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig > > index 27aab870ae4c..0bffa992fdc4 100644 > > --- a/kernel/rcu/Kconfig > > +++ b/kernel/rcu/Kconfig > > @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB > > Say N here if you hate read-side memory barriers. > > Take the default if you are unsure. > > > > +config RCU_LAZY > > + bool "RCU callback lazy invocation functionality" > > + depends on RCU_NOCB_CPU > > + default n > > + help > > + To save power, batch RCU callbacks and flush after delay, memory > > + pressure or callback list growing too big. > > Spaces vs. tabs. Fixed, thanks. > The checkpatch warning is unhelpful ("please write a help paragraph that > fully describes the config symbol") Good old checkpatch :D > > endmenu # "RCU Subsystem" > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c > > index c54ea2b6a36b..627a3218a372 100644 > > --- a/kernel/rcu/rcu_segcblist.c > > +++ b/kernel/rcu/rcu_segcblist.c > > @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp) > > rclp->head = NULL; > > rclp->tail = &rclp->head; > > rclp->len = 0; > > + rclp->lazy_len = 0; > > } > > > > /* > > @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) > > WRITE_ONCE(rclp->len, rclp->len + 1); > > } > > > > +/* > > + * Enqueue an rcu_head structure onto the specified callback list. > > Please also note the fact that it is enqueuing lazily. Sorry, done. > > + */ > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp) > > +{ > > + rcu_cblist_enqueue(rclp, rhp); > > + WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1); > > Except... Why not just add a "lazy" parameter to rcu_cblist_enqueue()? > IS_ENABLED() can make it fast. Yeah good idea, it simplifies the code too. Thank you! So you mean I should add in this function so that the branch gets optimized: if (lazy && IS_ENABLE(CONFIG_RCU_LAZY)) { ... } That makes total sense considering the compiler may otherwise not be able to optimize the function viewing just the individual translation unit. I fixed it. The 6 month old baby and wife are calling my attention now. I will continue to reply to the other parts of this and other emails this evening and thanks for your help! thanks, - Joel
On Fri, Jul 08, 2022 at 06:43:21PM +0000, Joel Fernandes wrote: > On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote: > > On Wed, Jun 22, 2022 at 10:50:55PM +0000, Joel Fernandes (Google) wrote: > > > Implement timer-based RCU lazy callback batching. The batch is flushed > > > whenever a certain amount of time has passed, or the batch on a > > > particular CPU grows too big. Also memory pressure will flush it in a > > > future patch. > > > > > > To handle several corner cases automagically (such as rcu_barrier() and > > > hotplug), we re-use bypass lists to handle lazy CBs. The bypass list > > > length has the lazy CB length included in it. A separate lazy CB length > > > counter is also introduced to keep track of the number of lazy CBs. > > > > > > Suggested-by: Paul McKenney <paulmck@kernel.org> > > > Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org> > > > > Not bad, but some questions and comments below. > > Thanks a lot for these, real helpful and I replied below: > > > > diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h > > > index 659d13a7ddaa..9a992707917b 100644 > > > --- a/include/linux/rcu_segcblist.h > > > +++ b/include/linux/rcu_segcblist.h > > > @@ -22,6 +22,7 @@ struct rcu_cblist { > > > struct rcu_head *head; > > > struct rcu_head **tail; > > > long len; > > > + long lazy_len; > > > }; > > > > > > #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } > > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > > > index 1a32036c918c..9191a3d88087 100644 > > > --- a/include/linux/rcupdate.h > > > +++ b/include/linux/rcupdate.h > > > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) > > > > > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > > > > > > +#ifdef CONFIG_RCU_LAZY > > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); > > > +#else > > > +#define call_rcu_lazy(head, func) call_rcu(head, func) > > > +#endif > > > + > > > /* Internal to kernel */ > > > void rcu_init(void); > > > extern int rcu_scheduler_active; > > > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig > > > index 27aab870ae4c..0bffa992fdc4 100644 > > > --- a/kernel/rcu/Kconfig > > > +++ b/kernel/rcu/Kconfig > > > @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB > > > Say N here if you hate read-side memory barriers. > > > Take the default if you are unsure. > > > > > > +config RCU_LAZY > > > + bool "RCU callback lazy invocation functionality" > > > + depends on RCU_NOCB_CPU > > > + default n > > > + help > > > + To save power, batch RCU callbacks and flush after delay, memory > > > + pressure or callback list growing too big. > > > > Spaces vs. tabs. > > Fixed, thanks. > > > The checkpatch warning is unhelpful ("please write a help paragraph that > > fully describes the config symbol") > > Good old checkpatch :D ;-) ;-) ;-) > > > endmenu # "RCU Subsystem" > > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c > > > index c54ea2b6a36b..627a3218a372 100644 > > > --- a/kernel/rcu/rcu_segcblist.c > > > +++ b/kernel/rcu/rcu_segcblist.c > > > @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp) > > > rclp->head = NULL; > > > rclp->tail = &rclp->head; > > > rclp->len = 0; > > > + rclp->lazy_len = 0; > > > } > > > > > > /* > > > @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) > > > WRITE_ONCE(rclp->len, rclp->len + 1); > > > } > > > > > > +/* > > > + * Enqueue an rcu_head structure onto the specified callback list. > > > > Please also note the fact that it is enqueuing lazily. > > Sorry, done. > > > > + */ > > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp) > > > +{ > > > + rcu_cblist_enqueue(rclp, rhp); > > > + WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1); > > > > Except... Why not just add a "lazy" parameter to rcu_cblist_enqueue()? > > IS_ENABLED() can make it fast. > > Yeah good idea, it simplifies the code too. Thank you! > > So you mean I should add in this function so that the branch gets optimized: > if (lazy && IS_ENABLE(CONFIG_RCU_LAZY)) { > ... > } > > That makes total sense considering the compiler may otherwise not be able to > optimize the function viewing just the individual translation unit. I fixed > it. Or the other way around: if (IS_ENABLE(CONFIG_RCU_LAZY) && lazy) { Just in case the compiler is stumbling over its boolean logic. Or in case the human reader is. ;-) > The 6 month old baby and wife are calling my attention now. I will continue > to reply to the other parts of this and other emails this evening and thanks > for your help! Ah, for those who believe that SIGCHLD can be ignored in real life! ;-) Thanx, Paul
I replied some more and will reply more soon, its a really long thread and thank you for the detailed review :) On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote: [..] > > +} > > + > > /* > > * Flush the second rcu_cblist structure onto the first one, obliterating > > * any contents of the first. If rhp is non-NULL, enqueue it as the sole > > @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > > } > > } > > Header comment, please. It can be short, referring to that of the > function rcu_cblist_flush_enqueue(). Done, what I ended up doing is nuking the new function and doing the same IS_ENABLED() trick to the existing rcu_cblist_flush_enqueue(). diffstat is also happy! > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > > + struct rcu_cblist *srclp, > > + struct rcu_head *rhp) > > Please line up the "struct" keywords. (Picky, I know...) > > > +{ > > + rcu_cblist_flush_enqueue(drclp, srclp, rhp); > > + if (rhp) > > + WRITE_ONCE(srclp->lazy_len, 1); > > Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()? > Concerns about speed in the !RCU_LAZY case can be addressed using > IS_ENABLED(), for example: > > if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp) > WRITE_ONCE(srclp->lazy_len, 1); Ah indeed exactly what I ended up doing. > > +} > > + > > /* > > * Dequeue the oldest rcu_head structure from the specified callback > > * list. > > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h > > index 431cee212467..c3d7de65b689 100644 > > --- a/kernel/rcu/rcu_segcblist.h > > +++ b/kernel/rcu/rcu_segcblist.h > > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) > > return READ_ONCE(rclp->len); > > } > > > > +/* Return number of callbacks in the specified callback list. */ > > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) > > +{ > > +#ifdef CONFIG_RCU_LAZY > > + return READ_ONCE(rclp->lazy_len); > > +#else > > + return 0; > > +#endif > > Please use IS_ENABLED(). This saves a line (and lots of characters) > but compiles just as efficienctly. Sounds good, looks a lot better, thanks! It ends up looking like: static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) { if (IS_ENABLED(CONFIG_RCU_LAZY)) return READ_ONCE(rclp->lazy_len); return 0; } static inline void rcu_cblist_reset_lazy_len(struct rcu_cblist *rclp) { if (IS_ENABLED(CONFIG_RCU_LAZY)) WRITE_ONCE(rclp->lazy_len, 0); } > > +} > > + > > /* Return number of callbacks in segmented callback list by summing seglen. */ > > long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp); > > > > void rcu_cblist_init(struct rcu_cblist *rclp); > > void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp); > > void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > > struct rcu_cblist *srclp, > > struct rcu_head *rhp); > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > > + struct rcu_cblist *srclp, > > + struct rcu_head *rhp); > > Please line up the "struct" keywords. (Still picky, I know...) Nuked it due to new lazy parameter so no issue now. > > struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); > > > > /* > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > > index c25ba442044a..d2e3d6e176d2 100644 > > --- a/kernel/rcu/tree.c > > +++ b/kernel/rcu/tree.c > > @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp) > > * Implementation of these memory-ordering guarantees is described here: > > * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. > > */ > > The above docbook comment needs to move to call_rcu(). Ok sure. > > -void call_rcu(struct rcu_head *head, rcu_callback_t func) > > +static void > > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) > > { > > static atomic_t doublefrees; > > unsigned long flags; > > @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > > } > > > > check_cb_ovld(rdp); > > - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) > > + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) > > return; // Enqueued onto ->nocb_bypass, so just leave. > > // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. > > rcu_segcblist_enqueue(&rdp->cblist, head); > > @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > > local_irq_restore(flags); > > } > > } > > -EXPORT_SYMBOL_GPL(call_rcu); > > Please add a docbook comment for call_rcu_lazy(). It can be brief, for > example, by referring to call_rcu()'s docbook comment for memory-ordering > details. I added something like the following, hope it looks OK: #ifdef CONFIG_RCU_LAZY /** * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period. * @head: structure to be used for queueing the RCU updates. * @func: actual callback function to be invoked after the grace period * * The callback function will be invoked some time after a full grace * period elapses, in other words after all pre-existing RCU read-side * critical sections have completed. * * Use this API instead of call_rcu() if you don't mind the callback being * invoked after very long periods of time on systems without memory pressure * and on systems which are lightly loaded or mostly idle. * * Other than the extra delay in callbacks being invoked, this function is * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more * details about memory ordering and other functionality. */ void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) { return __call_rcu_common(head, func, true); } EXPORT_SYMBOL_GPL(call_rcu_lazy); #endif > > > +#ifdef CONFIG_RCU_LAZY > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) > > +{ > > + return __call_rcu_common(head, func, true); > > +} > > +EXPORT_SYMBOL_GPL(call_rcu_lazy); > > +#endif > > + > > +void call_rcu(struct rcu_head *head, rcu_callback_t func) > > +{ > > + return __call_rcu_common(head, func, false); > > + > > +} > > +EXPORT_SYMBOL_GPL(call_rcu); > > > > /* Maximum number of jiffies to wait before draining a batch. */ > > #define KFREE_DRAIN_JIFFIES (HZ / 50) > > @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) > > rdp->barrier_head.func = rcu_barrier_callback; > > debug_rcu_head_queue(&rdp->barrier_head); > > rcu_nocb_lock(rdp); > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); > > if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { > > atomic_inc(&rcu_state.barrier_cpu_count); > > } else { > > @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu) > > my_rdp = this_cpu_ptr(&rcu_data); > > my_rnp = my_rdp->mynode; > > rcu_nocb_lock(my_rdp); /* irqs already disabled. */ > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false)); > > raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ > > /* Leverage recent GPs and set GP for new callbacks. */ > > needwake = rcu_advance_cbs(my_rnp, rdp) || > > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > > index 2ccf5845957d..fec4fad6654b 100644 > > --- a/kernel/rcu/tree.h > > +++ b/kernel/rcu/tree.h > > @@ -267,8 +267,9 @@ struct rcu_data { > > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > > #define RCU_NOCB_WAKE_NOT 0 > > #define RCU_NOCB_WAKE_BYPASS 1 > > -#define RCU_NOCB_WAKE 2 > > -#define RCU_NOCB_WAKE_FORCE 3 > > +#define RCU_NOCB_WAKE_LAZY 2 > > +#define RCU_NOCB_WAKE 3 > > +#define RCU_NOCB_WAKE_FORCE 4 > > > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > > /* For jiffies_till_first_fqs and */ > > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > > static void rcu_init_one_nocb(struct rcu_node *rnp); > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - unsigned long j); > > + unsigned long j, bool lazy); > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - bool *was_alldone, unsigned long flags); > > + bool *was_alldone, unsigned long flags, > > + bool lazy); > > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > > unsigned long flags); > > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > > index e369efe94fda..b9244f22e102 100644 > > --- a/kernel/rcu/tree_nocb.h > > +++ b/kernel/rcu/tree_nocb.h > > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > > } > > Comment on LAZY_FLUSH_JIFFIES purpose in life, please! (At some point > more flexibility may be required, but let's not unnecessarily rush > into that.) I added this: /* * LAZY_FLUSH_JIFFIES decides the maximum amount of time that * can elapse before lazy callbacks are flushed. Lazy callbacks * could be flushed much earlier for a number of other reasons * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are * left unsubmitted to RCU after those many jiffies. */ > > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > > + > > /* > > * Arrange to wake the GP kthread for this NOCB group at some future > > * time when it is safe to do so. > > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > > * Bypass wakeup overrides previous deferments. In case > > * of callback storm, no need to wake up too early. > > */ > > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > > + if (waketype == RCU_NOCB_WAKE_LAZY) { > > Presumably we get here only if all of this CPU's callbacks are lazy? Yes that's right. > > rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); > > WRITE_ONCE(rdp->nocb_bypass_first, j); > > rcu_nocb_bypass_unlock(rdp); > > @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > * Note that this function always returns true if rhp is NULL. > > */ > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - unsigned long j) > > + unsigned long j, bool lazy) > > { > > if (!rcu_rdp_is_offloaded(rdp)) > > return true; > > rcu_lockdep_assert_cblist_protected(rdp); > > rcu_nocb_bypass_lock(rdp); > > - return rcu_nocb_do_flush_bypass(rdp, rhp, j); > > + return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); > > } > > > > /* > > @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > > if (!rcu_rdp_is_offloaded(rdp) || > > !rcu_nocb_bypass_trylock(rdp)) > > return; > > - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); > > + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); > > } > > > > /* > > @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > > * there is only one CPU in operation. > > */ > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > - bool *was_alldone, unsigned long flags) > > + bool *was_alldone, unsigned long flags, > > + bool lazy) > > { > > unsigned long c; > > unsigned long cur_gp_seq; > > unsigned long j = jiffies; > > long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > > + long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > > > > lockdep_assert_irqs_disabled(); > > > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > } > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > - // If there hasn't yet been all that many ->cblist enqueues > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > - // ->nocb_bypass first. > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > + // number of CBs (lazy + non-lazy) grows too much. > > + // > > + // Note that if the bypass list has lazy CBs, and the main list is > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > + // the lazy CBs to the main list as well. That's the right thing to do, > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > + (lazy && n_lazy_cbs >= qhimark)) { > > rcu_nocb_lock(rdp); > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > if (*was_alldone) > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > - TPS("FirstQ")); > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > The "false" here instead of "lazy" is because the caller is to do the > enqueuing, correct? There is no difference between using false or lazy here, because the bypass flush is not also enqueuing the lazy callback, right? We can also pass lazy instead of false if that's less confusing. Or maybe I missed the issue you're raising? > > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > > return false; // Caller must enqueue the callback. > > } > > > > // If ->nocb_bypass has been used too long or is too full, > > // flush ->nocb_bypass to ->cblist. > > - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || > > - ncbs >= qhimark) { > > + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { > > rcu_nocb_lock(rdp); > > - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { > > + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { > > But shouldn't this "true" be "lazy"? I don't see how we are guaranteed > that the callback is in fact lazy at this point in the code. Also, > there is not yet a guarantee that the caller will do the enqueuing. > So what am I missing? Sorry I screwed this part up. I think I meant 'false' here, if the list grew too big- then I think I would prefer if the new lazy CB instead is treated as non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What do you think? Will reply more to the rest of the comments soon, thanks! thanks, - Joel
On Sun, Jul 10, 2022 at 02:26:51AM +0000, Joel Fernandes wrote: > I replied some more and will reply more soon, its a really long thread and > thank you for the detailed review :) > > On Sat, Jun 25, 2022 at 09:00:19PM -0700, Paul E. McKenney wrote: > [..] > > > +} > > > + > > > /* > > > * Flush the second rcu_cblist structure onto the first one, obliterating > > > * any contents of the first. If rhp is non-NULL, enqueue it as the sole > > > @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > > > } > > > } > > > > Header comment, please. It can be short, referring to that of the > > function rcu_cblist_flush_enqueue(). > > Done, what I ended up doing is nuking the new function and doing the same > IS_ENABLED() trick to the existing rcu_cblist_flush_enqueue(). diffstat is > also happy! > > > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > > > + struct rcu_cblist *srclp, > > > + struct rcu_head *rhp) > > > > Please line up the "struct" keywords. (Picky, I know...) > > > > > +{ > > > + rcu_cblist_flush_enqueue(drclp, srclp, rhp); > > > + if (rhp) > > > + WRITE_ONCE(srclp->lazy_len, 1); > > > > Shouldn't this instead be a lazy argument to rcu_cblist_flush_enqueue()? > > Concerns about speed in the !RCU_LAZY case can be addressed using > > IS_ENABLED(), for example: > > > > if (IS_ENABLED(CONFIG_RCU_LAZY) && rhp) > > WRITE_ONCE(srclp->lazy_len, 1); > > Ah indeed exactly what I ended up doing. > > > > +} > > > + > > > /* > > > * Dequeue the oldest rcu_head structure from the specified callback > > > * list. > > > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h > > > index 431cee212467..c3d7de65b689 100644 > > > --- a/kernel/rcu/rcu_segcblist.h > > > +++ b/kernel/rcu/rcu_segcblist.h > > > @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) > > > return READ_ONCE(rclp->len); > > > } > > > > > > +/* Return number of callbacks in the specified callback list. */ > > > +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) > > > +{ > > > +#ifdef CONFIG_RCU_LAZY > > > + return READ_ONCE(rclp->lazy_len); > > > +#else > > > + return 0; > > > +#endif > > > > Please use IS_ENABLED(). This saves a line (and lots of characters) > > but compiles just as efficienctly. > > Sounds good, looks a lot better, thanks! > > It ends up looking like: > > static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) > { > if (IS_ENABLED(CONFIG_RCU_LAZY)) > return READ_ONCE(rclp->lazy_len); > return 0; > } > > static inline void rcu_cblist_reset_lazy_len(struct rcu_cblist *rclp) > { > if (IS_ENABLED(CONFIG_RCU_LAZY)) > WRITE_ONCE(rclp->lazy_len, 0); > } All much better, thank you! > > > +} > > > + > > > /* Return number of callbacks in segmented callback list by summing seglen. */ > > > long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp); > > > > > > void rcu_cblist_init(struct rcu_cblist *rclp); > > > void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); > > > +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp); > > > void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, > > > struct rcu_cblist *srclp, > > > struct rcu_head *rhp); > > > +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, > > > + struct rcu_cblist *srclp, > > > + struct rcu_head *rhp); > > > > Please line up the "struct" keywords. (Still picky, I know...) > > Nuked it due to new lazy parameter so no issue now. > > > > struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); > > > > > > /* > > > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > > > index c25ba442044a..d2e3d6e176d2 100644 > > > --- a/kernel/rcu/tree.c > > > +++ b/kernel/rcu/tree.c > > > @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp) > > > * Implementation of these memory-ordering guarantees is described here: > > > * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. > > > */ > > > > The above docbook comment needs to move to call_rcu(). > > Ok sure. > > > > -void call_rcu(struct rcu_head *head, rcu_callback_t func) > > > +static void > > > +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) > > > { > > > static atomic_t doublefrees; > > > unsigned long flags; > > > @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > > > } > > > > > > check_cb_ovld(rdp); > > > - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) > > > + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) > > > return; // Enqueued onto ->nocb_bypass, so just leave. > > > // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. > > > rcu_segcblist_enqueue(&rdp->cblist, head); > > > @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) > > > local_irq_restore(flags); > > > } > > > } > > > -EXPORT_SYMBOL_GPL(call_rcu); > > > > Please add a docbook comment for call_rcu_lazy(). It can be brief, for > > example, by referring to call_rcu()'s docbook comment for memory-ordering > > details. > > I added something like the following, hope it looks OK: > > #ifdef CONFIG_RCU_LAZY > /** > * call_rcu_lazy() - Lazily queue RCU callback for invocation after grace period. > * @head: structure to be used for queueing the RCU updates. > * @func: actual callback function to be invoked after the grace period > * > * The callback function will be invoked some time after a full grace > * period elapses, in other words after all pre-existing RCU read-side > * critical sections have completed. > * > * Use this API instead of call_rcu() if you don't mind the callback being > * invoked after very long periods of time on systems without memory pressure > * and on systems which are lightly loaded or mostly idle. > * > * Other than the extra delay in callbacks being invoked, this function is > * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more > * details about memory ordering and other functionality. Much better, thank you! > */ > void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) > { > return __call_rcu_common(head, func, true); > } > EXPORT_SYMBOL_GPL(call_rcu_lazy); > #endif > > > > > > +#ifdef CONFIG_RCU_LAZY > > > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) > > > +{ > > > + return __call_rcu_common(head, func, true); > > > +} > > > +EXPORT_SYMBOL_GPL(call_rcu_lazy); > > > +#endif > > > + > > > +void call_rcu(struct rcu_head *head, rcu_callback_t func) > > > +{ > > > + return __call_rcu_common(head, func, false); > > > + > > > +} > > > +EXPORT_SYMBOL_GPL(call_rcu); > > > > > > /* Maximum number of jiffies to wait before draining a batch. */ > > > #define KFREE_DRAIN_JIFFIES (HZ / 50) > > > @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) > > > rdp->barrier_head.func = rcu_barrier_callback; > > > debug_rcu_head_queue(&rdp->barrier_head); > > > rcu_nocb_lock(rdp); > > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); > > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); > > > if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { > > > atomic_inc(&rcu_state.barrier_cpu_count); > > > } else { > > > @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu) > > > my_rdp = this_cpu_ptr(&rcu_data); > > > my_rnp = my_rdp->mynode; > > > rcu_nocb_lock(my_rdp); /* irqs already disabled. */ > > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); > > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false)); > > > raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ > > > /* Leverage recent GPs and set GP for new callbacks. */ > > > needwake = rcu_advance_cbs(my_rnp, rdp) || > > > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > > > index 2ccf5845957d..fec4fad6654b 100644 > > > --- a/kernel/rcu/tree.h > > > +++ b/kernel/rcu/tree.h > > > @@ -267,8 +267,9 @@ struct rcu_data { > > > /* Values for nocb_defer_wakeup field in struct rcu_data. */ > > > #define RCU_NOCB_WAKE_NOT 0 > > > #define RCU_NOCB_WAKE_BYPASS 1 > > > -#define RCU_NOCB_WAKE 2 > > > -#define RCU_NOCB_WAKE_FORCE 3 > > > +#define RCU_NOCB_WAKE_LAZY 2 > > > +#define RCU_NOCB_WAKE 3 > > > +#define RCU_NOCB_WAKE_FORCE 4 > > > > > > #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) > > > /* For jiffies_till_first_fqs and */ > > > @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); > > > static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); > > > static void rcu_init_one_nocb(struct rcu_node *rnp); > > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > - unsigned long j); > > > + unsigned long j, bool lazy); > > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > - bool *was_alldone, unsigned long flags); > > > + bool *was_alldone, unsigned long flags, > > > + bool lazy); > > > static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, > > > unsigned long flags); > > > static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); > > > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > > > index e369efe94fda..b9244f22e102 100644 > > > --- a/kernel/rcu/tree_nocb.h > > > +++ b/kernel/rcu/tree_nocb.h > > > @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) > > > return __wake_nocb_gp(rdp_gp, rdp, force, flags); > > > } > > > > Comment on LAZY_FLUSH_JIFFIES purpose in life, please! (At some point > > more flexibility may be required, but let's not unnecessarily rush > > into that.) > > I added this: > /* > * LAZY_FLUSH_JIFFIES decides the maximum amount of time that > * can elapse before lazy callbacks are flushed. Lazy callbacks > * could be flushed much earlier for a number of other reasons > * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are > * left unsubmitted to RCU after those many jiffies. > */ Much better, thank you! > > > +#define LAZY_FLUSH_JIFFIES (10 * HZ) > > > + > > > /* > > > * Arrange to wake the GP kthread for this NOCB group at some future > > > * time when it is safe to do so. > > > @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, > > > * Bypass wakeup overrides previous deferments. In case > > > * of callback storm, no need to wake up too early. > > > */ > > > - if (waketype == RCU_NOCB_WAKE_BYPASS) { > > > + if (waketype == RCU_NOCB_WAKE_LAZY) { > > > > Presumably we get here only if all of this CPU's callbacks are lazy? > > Yes that's right. OK, thank yoiu for the clarification. > > > rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); > > > WRITE_ONCE(rdp->nocb_bypass_first, j); > > > rcu_nocb_bypass_unlock(rdp); > > > @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > * Note that this function always returns true if rhp is NULL. > > > */ > > > static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > - unsigned long j) > > > + unsigned long j, bool lazy) > > > { > > > if (!rcu_rdp_is_offloaded(rdp)) > > > return true; > > > rcu_lockdep_assert_cblist_protected(rdp); > > > rcu_nocb_bypass_lock(rdp); > > > - return rcu_nocb_do_flush_bypass(rdp, rhp, j); > > > + return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); > > > } > > > > > > /* > > > @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > > > if (!rcu_rdp_is_offloaded(rdp) || > > > !rcu_nocb_bypass_trylock(rdp)) > > > return; > > > - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); > > > + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); > > > } > > > > > > /* > > > @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) > > > * there is only one CPU in operation. > > > */ > > > static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > - bool *was_alldone, unsigned long flags) > > > + bool *was_alldone, unsigned long flags, > > > + bool lazy) > > > { > > > unsigned long c; > > > unsigned long cur_gp_seq; > > > unsigned long j = jiffies; > > > long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > > > + long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); > > > > > > lockdep_assert_irqs_disabled(); > > > > > > @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, > > > } > > > WRITE_ONCE(rdp->nocb_nobypass_count, c); > > > > > > - // If there hasn't yet been all that many ->cblist enqueues > > > - // this jiffy, tell the caller to enqueue onto ->cblist. But flush > > > - // ->nocb_bypass first. > > > - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { > > > + // If caller passed a non-lazy CB and there hasn't yet been all that > > > + // many ->cblist enqueues this jiffy, tell the caller to enqueue it > > > + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total > > > + // number of CBs (lazy + non-lazy) grows too much. > > > + // > > > + // Note that if the bypass list has lazy CBs, and the main list is > > > + // empty, and rhp happens to be non-lazy, then we end up flushing all > > > + // the lazy CBs to the main list as well. That's the right thing to do, > > > + // since we are kick-starting RCU GP processing anyway for the non-lazy > > > + // one, we can just reuse that GP for the already queued-up lazy ones. > > > + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > > > + (lazy && n_lazy_cbs >= qhimark)) { > > > rcu_nocb_lock(rdp); > > > *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > > > if (*was_alldone) > > > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > > > - TPS("FirstQ")); > > > - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > > > + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > > > + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > > > > The "false" here instead of "lazy" is because the caller is to do the > > enqueuing, correct? > > There is no difference between using false or lazy here, because the bypass > flush is not also enqueuing the lazy callback, right? > > We can also pass lazy instead of false if that's less confusing. > > Or maybe I missed the issue you're raising? I am mostly checking up on your intended meaning of "lazy" in various contexts. It could mean only that the caller requested laziness, or in some cases it could mean that the callback actually will be lazy. I can rationalize the "false" above as a "don't care" in this case because (as you say) there is not callback. In which case this code is OK as is, as long as the header comment for rcu_nocb_flush_bypass() clearly states that this parameter has meaning only when there really is a callback being queued. > > > WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > > > return false; // Caller must enqueue the callback. > > > } > > > > > > // If ->nocb_bypass has been used too long or is too full, > > > // flush ->nocb_bypass to ->cblist. > > > - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || > > > - ncbs >= qhimark) { > > > + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { > > > rcu_nocb_lock(rdp); > > > - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { > > > + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { > > > > But shouldn't this "true" be "lazy"? I don't see how we are guaranteed > > that the callback is in fact lazy at this point in the code. Also, > > there is not yet a guarantee that the caller will do the enqueuing. > > So what am I missing? > > Sorry I screwed this part up. I think I meant 'false' here, if the list grew > too big- then I think I would prefer if the new lazy CB instead is treated as > non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What > do you think? Good point, if we are choosing to override the laziness requested by the caller, then it should say "true". It would be good to have a comment saying that is what we are doing, correct? > Will reply more to the rest of the comments soon, thanks! Sounds good! (Hey, wouldn't want you to be bored!) Thanx, Paul
On 7/10/2022 12:03 PM, Paul E. McKenney wrote: [..] >>>> + // Note that if the bypass list has lazy CBs, and the main list is >>>> + // empty, and rhp happens to be non-lazy, then we end up flushing all >>>> + // the lazy CBs to the main list as well. That's the right thing to do, >>>> + // since we are kick-starting RCU GP processing anyway for the non-lazy >>>> + // one, we can just reuse that GP for the already queued-up lazy ones. >>>> + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || >>>> + (lazy && n_lazy_cbs >= qhimark)) { >>>> rcu_nocb_lock(rdp); >>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); >>>> if (*was_alldone) >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, >>>> - TPS("FirstQ")); >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); >>>> + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); >>> >>> The "false" here instead of "lazy" is because the caller is to do the >>> enqueuing, correct? >> >> There is no difference between using false or lazy here, because the bypass >> flush is not also enqueuing the lazy callback, right? >> >> We can also pass lazy instead of false if that's less confusing. >> >> Or maybe I missed the issue you're raising? > > I am mostly checking up on your intended meaning of "lazy" in various > contexts. It could mean only that the caller requested laziness, or in > some cases it could mean that the callback actually will be lazy. > > I can rationalize the "false" above as a "don't care" in this case > because (as you say) there is not callback. In which case this code > is OK as is, as long as the header comment for rcu_nocb_flush_bypass() > clearly states that this parameter has meaning only when there really > is a callback being queued. I decided to change this and the below to "lazy" variable instead of true/false, as the code is cleaner and less confusing IMO. It makes sense to me and in my testing it works fine. Hope that's Ok with you. About changing the lazy length count to a flag, one drawback of doing that is, say if there are some non-lazy CBs in the bypass list, then the lazy shrinker will end up reporting an inaccurate count. Also considering that it might be harder to add the count back later say if we need it for tracing, I would say lets leave it as is. I will keep the counter for v3 and we can discuss. Does that sound good to you? I think some more testing, checkpatch running etc and I should be good to send v3 :) Thanks! - Joel > >>>> WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); >>>> return false; // Caller must enqueue the callback. >>>> } >>>> >>>> // If ->nocb_bypass has been used too long or is too full, >>>> // flush ->nocb_bypass to ->cblist. >>>> - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || >>>> - ncbs >= qhimark) { >>>> + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { >>>> rcu_nocb_lock(rdp); >>>> - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { >>>> + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { >>> >>> But shouldn't this "true" be "lazy"? I don't see how we are guaranteed >>> that the callback is in fact lazy at this point in the code. Also, >>> there is not yet a guarantee that the caller will do the enqueuing. >>> So what am I missing? >> >> Sorry I screwed this part up. I think I meant 'false' here, if the list grew >> too big- then I think I would prefer if the new lazy CB instead is treated as >> non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What >> do you think? > > Good point, if we are choosing to override the laziness requested by the > caller, then it should say "true". It would be good to have a comment > saying that is what we are doing, correct? > >> Will reply more to the rest of the comments soon, thanks! > > Sounds good! (Hey, wouldn't want you to be bored!) > > Thanx, Paul
On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote: > > On 7/10/2022 12:03 PM, Paul E. McKenney wrote: > [..] > >>>> + // Note that if the bypass list has lazy CBs, and the main list is > >>>> + // empty, and rhp happens to be non-lazy, then we end up flushing all > >>>> + // the lazy CBs to the main list as well. That's the right thing to do, > >>>> + // since we are kick-starting RCU GP processing anyway for the non-lazy > >>>> + // one, we can just reuse that GP for the already queued-up lazy ones. > >>>> + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > >>>> + (lazy && n_lazy_cbs >= qhimark)) { > >>>> rcu_nocb_lock(rdp); > >>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > >>>> if (*was_alldone) > >>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > >>>> - TPS("FirstQ")); > >>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > >>>> + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > >>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > >>> > >>> The "false" here instead of "lazy" is because the caller is to do the > >>> enqueuing, correct? > >> > >> There is no difference between using false or lazy here, because the bypass > >> flush is not also enqueuing the lazy callback, right? > >> > >> We can also pass lazy instead of false if that's less confusing. > >> > >> Or maybe I missed the issue you're raising? > > > > I am mostly checking up on your intended meaning of "lazy" in various > > contexts. It could mean only that the caller requested laziness, or in > > some cases it could mean that the callback actually will be lazy. > > > > I can rationalize the "false" above as a "don't care" in this case > > because (as you say) there is not callback. In which case this code > > is OK as is, as long as the header comment for rcu_nocb_flush_bypass() > > clearly states that this parameter has meaning only when there really > > is a callback being queued. > > I decided to change this and the below to "lazy" variable instead of > true/false, as the code is cleaner and less confusing IMO. It makes > sense to me and in my testing it works fine. Hope that's Ok with you. Sounds plausible. > About changing the lazy length count to a flag, one drawback of doing > that is, say if there are some non-lazy CBs in the bypass list, then the > lazy shrinker will end up reporting an inaccurate count. Also > considering that it might be harder to add the count back later say if > we need it for tracing, I would say lets leave it as is. I will keep the > counter for v3 and we can discuss. Does that sound good to you? You lost me on this one. If there are any non-lazy callbacks, the whole bypass list is already being treated as non-lazy, right? If so, then the lazy shrinker should report the full count if all callbacks are lazy, and should report none otherwise. Or am I missing something here? > I think some more testing, checkpatch running etc and I should be good > to send v3 :) Sounds good! Thanx, Paul > Thanks! > > - Joel > > > > > >>>> WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); > >>>> return false; // Caller must enqueue the callback. > >>>> } > >>>> > >>>> // If ->nocb_bypass has been used too long or is too full, > >>>> // flush ->nocb_bypass to ->cblist. > >>>> - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || > >>>> - ncbs >= qhimark) { > >>>> + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { > >>>> rcu_nocb_lock(rdp); > >>>> - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { > >>>> + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { > >>> > >>> But shouldn't this "true" be "lazy"? I don't see how we are guaranteed > >>> that the callback is in fact lazy at this point in the code. Also, > >>> there is not yet a guarantee that the caller will do the enqueuing. > >>> So what am I missing? > >> > >> Sorry I screwed this part up. I think I meant 'false' here, if the list grew > >> too big- then I think I would prefer if the new lazy CB instead is treated as > >> non-lazy. But if that's too confusing, I will just pass 'lazy' instead. What > >> do you think? > > > > Good point, if we are choosing to override the laziness requested by the > > caller, then it should say "true". It would be good to have a comment > > saying that is what we are doing, correct? > > > >> Will reply more to the rest of the comments soon, thanks! > > > > Sounds good! (Hey, wouldn't want you to be bored!) > > > > Thanx, Paul
On 7/12/2022 5:04 PM, Paul E. McKenney wrote: > On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote: >> >> On 7/10/2022 12:03 PM, Paul E. McKenney wrote: >> [..] >>>>>> + // Note that if the bypass list has lazy CBs, and the main list is >>>>>> + // empty, and rhp happens to be non-lazy, then we end up flushing all >>>>>> + // the lazy CBs to the main list as well. That's the right thing to do, >>>>>> + // since we are kick-starting RCU GP processing anyway for the non-lazy >>>>>> + // one, we can just reuse that GP for the already queued-up lazy ones. >>>>>> + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || >>>>>> + (lazy && n_lazy_cbs >= qhimark)) { >>>>>> rcu_nocb_lock(rdp); >>>>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); >>>>>> if (*was_alldone) >>>>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, >>>>>> - TPS("FirstQ")); >>>>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); >>>>>> + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); >>>>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); >>>>> >>>>> The "false" here instead of "lazy" is because the caller is to do the >>>>> enqueuing, correct? >>>> >>>> There is no difference between using false or lazy here, because the bypass >>>> flush is not also enqueuing the lazy callback, right? >>>> >>>> We can also pass lazy instead of false if that's less confusing. >>>> >>>> Or maybe I missed the issue you're raising? >>> >>> I am mostly checking up on your intended meaning of "lazy" in various >>> contexts. It could mean only that the caller requested laziness, or in >>> some cases it could mean that the callback actually will be lazy. >>> >>> I can rationalize the "false" above as a "don't care" in this case >>> because (as you say) there is not callback. In which case this code >>> is OK as is, as long as the header comment for rcu_nocb_flush_bypass() >>> clearly states that this parameter has meaning only when there really >>> is a callback being queued. >> >> I decided to change this and the below to "lazy" variable instead of >> true/false, as the code is cleaner and less confusing IMO. It makes >> sense to me and in my testing it works fine. Hope that's Ok with you. > > Sounds plausible. > >> About changing the lazy length count to a flag, one drawback of doing >> that is, say if there are some non-lazy CBs in the bypass list, then the >> lazy shrinker will end up reporting an inaccurate count. Also >> considering that it might be harder to add the count back later say if >> we need it for tracing, I would say lets leave it as is. I will keep the >> counter for v3 and we can discuss. Does that sound good to you? > > You lost me on this one. If there are any non-lazy callbacks, the whole > bypass list is already being treated as non-lazy, right? If so, then > the lazy shrinker should report the full count if all callbacks are lazy, > and should report none otherwise. Or am I missing something here? > That's one way to interpret it, another way is say there were a 1000 lazy CBs, and now 1 non-lazy came in. The shrinker could report the lazy count as 0 per your interpretation. Yes its true they will get flushed out in the next jiffie, but for that time instant, the number of lazy CBs in the list is not zero! :) Yeah OK its a weak argument, still an argument! ;-) In any case, we saw the need for the length of the segcb lists to figure out things via tracing, so I suspect we may need this in the future as well, its a small cost so I would rather keep it if that's Ok with you! :) Thanks, - Joel
On Tue, Jul 12, 2022 at 05:10:41PM -0400, Joel Fernandes wrote: > > > On 7/12/2022 5:04 PM, Paul E. McKenney wrote: > > On Tue, Jul 12, 2022 at 04:53:48PM -0400, Joel Fernandes wrote: > >> > >> On 7/10/2022 12:03 PM, Paul E. McKenney wrote: > >> [..] > >>>>>> + // Note that if the bypass list has lazy CBs, and the main list is > >>>>>> + // empty, and rhp happens to be non-lazy, then we end up flushing all > >>>>>> + // the lazy CBs to the main list as well. That's the right thing to do, > >>>>>> + // since we are kick-starting RCU GP processing anyway for the non-lazy > >>>>>> + // one, we can just reuse that GP for the already queued-up lazy ones. > >>>>>> + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || > >>>>>> + (lazy && n_lazy_cbs >= qhimark)) { > >>>>>> rcu_nocb_lock(rdp); > >>>>>> *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); > >>>>>> if (*was_alldone) > >>>>>> trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, > >>>>>> - TPS("FirstQ")); > >>>>>> - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); > >>>>>> + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); > >>>>>> + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); > >>>>> > >>>>> The "false" here instead of "lazy" is because the caller is to do the > >>>>> enqueuing, correct? > >>>> > >>>> There is no difference between using false or lazy here, because the bypass > >>>> flush is not also enqueuing the lazy callback, right? > >>>> > >>>> We can also pass lazy instead of false if that's less confusing. > >>>> > >>>> Or maybe I missed the issue you're raising? > >>> > >>> I am mostly checking up on your intended meaning of "lazy" in various > >>> contexts. It could mean only that the caller requested laziness, or in > >>> some cases it could mean that the callback actually will be lazy. > >>> > >>> I can rationalize the "false" above as a "don't care" in this case > >>> because (as you say) there is not callback. In which case this code > >>> is OK as is, as long as the header comment for rcu_nocb_flush_bypass() > >>> clearly states that this parameter has meaning only when there really > >>> is a callback being queued. > >> > >> I decided to change this and the below to "lazy" variable instead of > >> true/false, as the code is cleaner and less confusing IMO. It makes > >> sense to me and in my testing it works fine. Hope that's Ok with you. > > > > Sounds plausible. > > > >> About changing the lazy length count to a flag, one drawback of doing > >> that is, say if there are some non-lazy CBs in the bypass list, then the > >> lazy shrinker will end up reporting an inaccurate count. Also > >> considering that it might be harder to add the count back later say if > >> we need it for tracing, I would say lets leave it as is. I will keep the > >> counter for v3 and we can discuss. Does that sound good to you? > > > > You lost me on this one. If there are any non-lazy callbacks, the whole > > bypass list is already being treated as non-lazy, right? If so, then > > the lazy shrinker should report the full count if all callbacks are lazy, > > and should report none otherwise. Or am I missing something here? > > > > That's one way to interpret it, another way is say there were a 1000 > lazy CBs, and now 1 non-lazy came in. The shrinker could report the lazy > count as 0 per your interpretation. Yes its true they will get flushed > out in the next jiffie, but for that time instant, the number of lazy > CBs in the list is not zero! :) Yeah OK its a weak argument, still an > argument! ;-) > > In any case, we saw the need for the length of the segcb lists to figure > out things via tracing, so I suspect we may need this in the future as > well, its a small cost so I would rather keep it if that's Ok with you! :) OK, being needed for tracing/diagnostics is a somewhat less weak argument... Let's see what v3 looks like. Thanx, Paul
diff --git a/include/linux/rcu_segcblist.h b/include/linux/rcu_segcblist.h index 659d13a7ddaa..9a992707917b 100644 --- a/include/linux/rcu_segcblist.h +++ b/include/linux/rcu_segcblist.h @@ -22,6 +22,7 @@ struct rcu_cblist { struct rcu_head *head; struct rcu_head **tail; long len; + long lazy_len; }; #define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head } diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 1a32036c918c..9191a3d88087 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ +#ifdef CONFIG_RCU_LAZY +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); +#else +#define call_rcu_lazy(head, func) call_rcu(head, func) +#endif + /* Internal to kernel */ void rcu_init(void); extern int rcu_scheduler_active; diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig index 27aab870ae4c..0bffa992fdc4 100644 --- a/kernel/rcu/Kconfig +++ b/kernel/rcu/Kconfig @@ -293,4 +293,12 @@ config TASKS_TRACE_RCU_READ_MB Say N here if you hate read-side memory barriers. Take the default if you are unsure. +config RCU_LAZY + bool "RCU callback lazy invocation functionality" + depends on RCU_NOCB_CPU + default n + help + To save power, batch RCU callbacks and flush after delay, memory + pressure or callback list growing too big. + endmenu # "RCU Subsystem" diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c index c54ea2b6a36b..627a3218a372 100644 --- a/kernel/rcu/rcu_segcblist.c +++ b/kernel/rcu/rcu_segcblist.c @@ -20,6 +20,7 @@ void rcu_cblist_init(struct rcu_cblist *rclp) rclp->head = NULL; rclp->tail = &rclp->head; rclp->len = 0; + rclp->lazy_len = 0; } /* @@ -32,6 +33,15 @@ void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp) WRITE_ONCE(rclp->len, rclp->len + 1); } +/* + * Enqueue an rcu_head structure onto the specified callback list. + */ +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp) +{ + rcu_cblist_enqueue(rclp, rhp); + WRITE_ONCE(rclp->lazy_len, rclp->lazy_len + 1); +} + /* * Flush the second rcu_cblist structure onto the first one, obliterating * any contents of the first. If rhp is non-NULL, enqueue it as the sole @@ -60,6 +70,15 @@ void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, } } +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, + struct rcu_cblist *srclp, + struct rcu_head *rhp) +{ + rcu_cblist_flush_enqueue(drclp, srclp, rhp); + if (rhp) + WRITE_ONCE(srclp->lazy_len, 1); +} + /* * Dequeue the oldest rcu_head structure from the specified callback * list. diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h index 431cee212467..c3d7de65b689 100644 --- a/kernel/rcu/rcu_segcblist.h +++ b/kernel/rcu/rcu_segcblist.h @@ -15,14 +15,28 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp) return READ_ONCE(rclp->len); } +/* Return number of callbacks in the specified callback list. */ +static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp) +{ +#ifdef CONFIG_RCU_LAZY + return READ_ONCE(rclp->lazy_len); +#else + return 0; +#endif +} + /* Return number of callbacks in segmented callback list by summing seglen. */ long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp); void rcu_cblist_init(struct rcu_cblist *rclp); void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp); +void rcu_cblist_enqueue_lazy(struct rcu_cblist *rclp, struct rcu_head *rhp); void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp, struct rcu_cblist *srclp, struct rcu_head *rhp); +void rcu_cblist_flush_enqueue_lazy(struct rcu_cblist *drclp, + struct rcu_cblist *srclp, + struct rcu_head *rhp); struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp); /* diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index c25ba442044a..d2e3d6e176d2 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -3098,7 +3098,8 @@ static void check_cb_ovld(struct rcu_data *rdp) * Implementation of these memory-ordering guarantees is described here: * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst. */ -void call_rcu(struct rcu_head *head, rcu_callback_t func) +static void +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy) { static atomic_t doublefrees; unsigned long flags; @@ -3139,7 +3140,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) } check_cb_ovld(rdp); - if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags)) + if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy)) return; // Enqueued onto ->nocb_bypass, so just leave. // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock. rcu_segcblist_enqueue(&rdp->cblist, head); @@ -3161,8 +3162,21 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) local_irq_restore(flags); } } -EXPORT_SYMBOL_GPL(call_rcu); +#ifdef CONFIG_RCU_LAZY +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func) +{ + return __call_rcu_common(head, func, true); +} +EXPORT_SYMBOL_GPL(call_rcu_lazy); +#endif + +void call_rcu(struct rcu_head *head, rcu_callback_t func) +{ + return __call_rcu_common(head, func, false); + +} +EXPORT_SYMBOL_GPL(call_rcu); /* Maximum number of jiffies to wait before draining a batch. */ #define KFREE_DRAIN_JIFFIES (HZ / 50) @@ -4056,7 +4070,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) rdp->barrier_head.func = rcu_barrier_callback; debug_rcu_head_queue(&rdp->barrier_head); rcu_nocb_lock(rdp); - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) { atomic_inc(&rcu_state.barrier_cpu_count); } else { @@ -4476,7 +4490,7 @@ void rcutree_migrate_callbacks(int cpu) my_rdp = this_cpu_ptr(&rcu_data); my_rnp = my_rdp->mynode; rcu_nocb_lock(my_rdp); /* irqs already disabled. */ - WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies)); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false)); raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ /* Leverage recent GPs and set GP for new callbacks. */ needwake = rcu_advance_cbs(my_rnp, rdp) || diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 2ccf5845957d..fec4fad6654b 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -267,8 +267,9 @@ struct rcu_data { /* Values for nocb_defer_wakeup field in struct rcu_data. */ #define RCU_NOCB_WAKE_NOT 0 #define RCU_NOCB_WAKE_BYPASS 1 -#define RCU_NOCB_WAKE 2 -#define RCU_NOCB_WAKE_FORCE 3 +#define RCU_NOCB_WAKE_LAZY 2 +#define RCU_NOCB_WAKE 3 +#define RCU_NOCB_WAKE_FORCE 4 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) /* For jiffies_till_first_fqs and */ @@ -436,9 +437,10 @@ static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); static void rcu_init_one_nocb(struct rcu_node *rnp); static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j); + unsigned long j, bool lazy); static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - bool *was_alldone, unsigned long flags); + bool *was_alldone, unsigned long flags, + bool lazy); static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, unsigned long flags); static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level); diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h index e369efe94fda..b9244f22e102 100644 --- a/kernel/rcu/tree_nocb.h +++ b/kernel/rcu/tree_nocb.h @@ -256,6 +256,8 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force) return __wake_nocb_gp(rdp_gp, rdp, force, flags); } +#define LAZY_FLUSH_JIFFIES (10 * HZ) + /* * Arrange to wake the GP kthread for this NOCB group at some future * time when it is safe to do so. @@ -272,7 +274,10 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, * Bypass wakeup overrides previous deferments. In case * of callback storm, no need to wake up too early. */ - if (waketype == RCU_NOCB_WAKE_BYPASS) { + if (waketype == RCU_NOCB_WAKE_LAZY) { + mod_timer(&rdp_gp->nocb_timer, jiffies + LAZY_FLUSH_JIFFIES); + WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); + } else if (waketype == RCU_NOCB_WAKE_BYPASS) { mod_timer(&rdp_gp->nocb_timer, jiffies + 2); WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype); } else { @@ -296,7 +301,7 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, * Note that this function always returns true if rhp is NULL. */ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) + unsigned long j, bool lazy) { struct rcu_cblist rcl; @@ -310,7 +315,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ if (rhp) rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ - rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); + + trace_printk("call_rcu_lazy callbacks = %ld\n", READ_ONCE(rdp->nocb_bypass.lazy_len)); + /* The lazy CBs are being flushed, but a new one might be enqueued. */ + if (lazy) + rcu_cblist_flush_enqueue_lazy(&rcl, &rdp->nocb_bypass, rhp); + else + rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); WRITE_ONCE(rdp->nocb_bypass_first, j); rcu_nocb_bypass_unlock(rdp); @@ -326,13 +337,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, * Note that this function always returns true if rhp is NULL. */ static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) + unsigned long j, bool lazy) { if (!rcu_rdp_is_offloaded(rdp)) return true; rcu_lockdep_assert_cblist_protected(rdp); rcu_nocb_bypass_lock(rdp); - return rcu_nocb_do_flush_bypass(rdp, rhp, j); + return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy); } /* @@ -345,7 +356,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) if (!rcu_rdp_is_offloaded(rdp) || !rcu_nocb_bypass_trylock(rdp)) return; - WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j)); + WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false)); } /* @@ -367,12 +378,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j) * there is only one CPU in operation. */ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - bool *was_alldone, unsigned long flags) + bool *was_alldone, unsigned long flags, + bool lazy) { unsigned long c; unsigned long cur_gp_seq; unsigned long j = jiffies; long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + long n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); lockdep_assert_irqs_disabled(); @@ -414,30 +427,37 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, } WRITE_ONCE(rdp->nocb_nobypass_count, c); - // If there hasn't yet been all that many ->cblist enqueues - // this jiffy, tell the caller to enqueue onto ->cblist. But flush - // ->nocb_bypass first. - if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { + // If caller passed a non-lazy CB and there hasn't yet been all that + // many ->cblist enqueues this jiffy, tell the caller to enqueue it + // onto ->cblist. But flush ->nocb_bypass first. Also do so, if total + // number of CBs (lazy + non-lazy) grows too much. + // + // Note that if the bypass list has lazy CBs, and the main list is + // empty, and rhp happens to be non-lazy, then we end up flushing all + // the lazy CBs to the main list as well. That's the right thing to do, + // since we are kick-starting RCU GP processing anyway for the non-lazy + // one, we can just reuse that GP for the already queued-up lazy ones. + if ((rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) || + (lazy && n_lazy_cbs >= qhimark)) { rcu_nocb_lock(rdp); *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("FirstQ")); - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false)); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. } // If ->nocb_bypass has been used too long or is too full, // flush ->nocb_bypass to ->cblist. - if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || - ncbs >= qhimark) { + if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { rcu_nocb_lock(rdp); - if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { + if (!rcu_nocb_flush_bypass(rdp, rhp, j, true)) { *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("FirstQ")); + lazy ? TPS("FirstLazyQ") : TPS("FirstQ")); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. } @@ -455,12 +475,20 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, rcu_nocb_wait_contended(rdp); rcu_nocb_bypass_lock(rdp); ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + n_lazy_cbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ - rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); + if (lazy) + rcu_cblist_enqueue_lazy(&rdp->nocb_bypass, rhp); + else + rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); if (!ncbs) { WRITE_ONCE(rdp->nocb_bypass_first, j); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, + lazy ? TPS("FirstLazyBQ") : TPS("FirstBQ")); + } else if (!n_lazy_cbs && lazy) { + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstLazyBQ")); } + rcu_nocb_bypass_unlock(rdp); smp_mb(); /* Order enqueue before wake. */ if (ncbs) { @@ -493,7 +521,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, { unsigned long cur_gp_seq; unsigned long j; - long len; + long len, lazy_len, bypass_len; struct task_struct *t; // If we are being polled or there is no kthread, just leave. @@ -506,9 +534,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, } // Need to actually to a wakeup. len = rcu_segcblist_n_cbs(&rdp->cblist); + bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass); + lazy_len = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); if (was_alldone) { rdp->qlen_last_fqs_check = len; - if (!irqs_disabled_flags(flags)) { + // Only lazy CBs in bypass list + if (lazy_len && bypass_len == lazy_len) { + rcu_nocb_unlock_irqrestore(rdp, flags); + wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, + TPS("WakeLazy")); + } else if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ rcu_nocb_unlock_irqrestore(rdp, flags); wake_nocb_gp(rdp, false); @@ -599,8 +634,8 @@ static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, */ static void nocb_gp_wait(struct rcu_data *my_rdp) { - bool bypass = false; - long bypass_ncbs; + bool bypass = false, lazy = false; + long bypass_ncbs, lazy_ncbs; int __maybe_unused cpu = my_rdp->cpu; unsigned long cur_gp_seq; unsigned long flags; @@ -648,12 +683,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) continue; } bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); - if (bypass_ncbs && + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); + if (lazy_ncbs && + (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + LAZY_FLUSH_JIFFIES) || + bypass_ncbs > qhimark)) { + // Bypass full or old, so flush it. + (void)rcu_nocb_try_flush_bypass(rdp, j); + bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); + } else if (bypass_ncbs && (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || bypass_ncbs > 2 * qhimark)) { // Bypass full or old, so flush it. (void)rcu_nocb_try_flush_bypass(rdp, j); bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); + lazy_ncbs = rcu_cblist_n_lazy_cbs(&rdp->nocb_bypass); } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { rcu_nocb_unlock_irqrestore(rdp, flags); if (needwake_state) @@ -662,8 +706,11 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) } if (bypass_ncbs) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("Bypass")); - bypass = true; + bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass")); + if (bypass_ncbs == lazy_ncbs) + lazy = true; + else + bypass = true; } rnp = rdp->mynode; @@ -713,12 +760,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) my_rdp->nocb_gp_gp = needwait_gp; my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; - if (bypass && !rcu_nocb_poll) { - // At least one child with non-empty ->nocb_bypass, so set - // timer in order to avoid stranding its callbacks. - wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, - TPS("WakeBypassIsDeferred")); + // At least one child with non-empty ->nocb_bypass, so set + // timer in order to avoid stranding its callbacks. + if (!rcu_nocb_poll) { + // If bypass list only has lazy CBs. Add a deferred + // lazy wake up. + if (lazy && !bypass) { + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY, + TPS("WakeLazyIsDeferred")); + // Otherwise add a deferred bypass wake up. + } else if (bypass) { + wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS, + TPS("WakeBypassIsDeferred")); + } } + if (rcu_nocb_poll) { /* Polling, so trace if first poll in the series. */ if (gotcbs) @@ -999,7 +1055,7 @@ static long rcu_nocb_rdp_deoffload(void *arg) * return false, which means that future calls to rcu_nocb_try_bypass() * will refuse to put anything into the bypass. */ - WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies)); + WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false)); /* * Start with invoking rcu_core() early. This way if the current thread * happens to preempt an ongoing call to rcu_core() in the middle, @@ -1500,13 +1556,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) } static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - unsigned long j) + unsigned long j, bool lazy) { return true; } static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, - bool *was_alldone, unsigned long flags) + bool *was_alldone, unsigned long flags, + bool lazy) { return false; }
Implement timer-based RCU lazy callback batching. The batch is flushed whenever a certain amount of time has passed, or the batch on a particular CPU grows too big. Also memory pressure will flush it in a future patch. To handle several corner cases automagically (such as rcu_barrier() and hotplug), we re-use bypass lists to handle lazy CBs. The bypass list length has the lazy CB length included in it. A separate lazy CB length counter is also introduced to keep track of the number of lazy CBs. Suggested-by: Paul McKenney <paulmck@kernel.org> Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org> --- include/linux/rcu_segcblist.h | 1 + include/linux/rcupdate.h | 6 ++ kernel/rcu/Kconfig | 8 +++ kernel/rcu/rcu_segcblist.c | 19 ++++++ kernel/rcu/rcu_segcblist.h | 14 ++++ kernel/rcu/tree.c | 24 +++++-- kernel/rcu/tree.h | 10 +-- kernel/rcu/tree_nocb.h | 125 +++++++++++++++++++++++++--------- 8 files changed, 164 insertions(+), 43 deletions(-)