Message ID | 20220620224503.3841196-1-paulmck@kernel.org (mailing list archive) |
---|---|
State | Accepted |
Commit | 1598f4a4762be0ea6a1bcd229c2c9ff1ebb212bb |
Headers | show |
Series | Callback-offload (nocb) updates for v5.20 | expand |
On 6/21/2022 4:14 AM, Paul E. McKenney wrote: > From: Frederic Weisbecker <frederic@kernel.org> > > NOCB rdp's are part of a group whose list is iterated by the > corresponding rdp leader. > > This list is RCU traversed because an rdp can be either added or > deleted concurrently. Upon addition, a new iteration to the list after > a synchronization point (a pair of LOCK/UNLOCK ->nocb_gp_lock) is forced > to make sure: > > 1) we didn't miss a new element added in the middle of an iteration > 2) we didn't ignore a whole subset of the list due to an element being > quickly deleted and then re-added. > 3) we prevent from probably other surprises... > > Although this layout is expected to be safe, it doesn't help anybody > to sleep well. > > Simplify instead the nocb state toggling with moving the list > modification from the nocb (de-)offloading workqueue to the rcuog > kthreads instead. > > Whenever the rdp leader is expected to (re-)set the SEGCBLIST_KTHREAD_GP > flag of a target rdp, the latter is queued so that the leader handles > the flag flip along with adding or deleting the target rdp to the list > to iterate. This way the list modification and iteration happen from the > same kthread and those operations can't race altogether. > > As a bonus, the flags for each rdp don't need to be checked locklessly > before each iteration, which is one less opportunity to produce > nightmares. > > Signed-off-by: Frederic Weisbecker <frederic@kernel.org> > Cc: Neeraj Upadhyay <quic_neeraju@quicinc.com> > Cc: Boqun Feng <boqun.feng@gmail.com> > Cc: Uladzislau Rezki <uladzislau.rezki@sony.com> > Cc: Joel Fernandes <joel@joelfernandes.org> > Cc: Zqiang <qiang1.zhang@intel.com> > Signed-off-by: Paul E. McKenney <paulmck@kernel.org> > --- Reviewed-by: Neeraj Upadhyay <quic_neeraju@quicinc.com> Thanks Neeraj > kernel/rcu/tree.h | 1 + > kernel/rcu/tree_nocb.h | 138 +++++++++++++++++++++-------------------- > 2 files changed, 71 insertions(+), 68 deletions(-) > > diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h > index 2ccf5845957df..4f8532c33558f 100644 > --- a/kernel/rcu/tree.h > +++ b/kernel/rcu/tree.h > @@ -235,6 +235,7 @@ struct rcu_data { > * if rdp_gp. > */ > struct list_head nocb_entry_rdp; /* rcu_data node in wakeup chain. */ > + struct rcu_data *nocb_toggling_rdp; /* rdp queued for (de-)offloading */ > > /* The following fields are used by CB kthread, hence new cacheline. */ > struct rcu_data *nocb_gp_rdp ____cacheline_internodealigned_in_smp; > diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h > index 46694e13398a3..dac74952e1d1b 100644 > --- a/kernel/rcu/tree_nocb.h > +++ b/kernel/rcu/tree_nocb.h > @@ -546,52 +546,43 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, > } > } > > -/* > - * Check if we ignore this rdp. > - * > - * We check that without holding the nocb lock but > - * we make sure not to miss a freshly offloaded rdp > - * with the current ordering: > - * > - * rdp_offload_toggle() nocb_gp_enabled_cb() > - * ------------------------- ---------------------------- > - * WRITE flags LOCK nocb_gp_lock > - * LOCK nocb_gp_lock READ/WRITE nocb_gp_sleep > - * READ/WRITE nocb_gp_sleep UNLOCK nocb_gp_lock > - * UNLOCK nocb_gp_lock READ flags > - */ > -static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp) > -{ > - u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP; > - > - return rcu_segcblist_test_flags(&rdp->cblist, flags); > -} > - > -static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, > - bool *needwake_state) > +static int nocb_gp_toggle_rdp(struct rcu_data *rdp, > + bool *wake_state) > { > struct rcu_segcblist *cblist = &rdp->cblist; > + unsigned long flags; > + int ret; > > - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) { > - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { > - rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP); > - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) > - *needwake_state = true; > - } > - return false; > + rcu_nocb_lock_irqsave(rdp, flags); > + if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) && > + !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { > + /* > + * Offloading. Set our flag and notify the offload worker. > + * We will handle this rdp until it ever gets de-offloaded. > + */ > + rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP); > + if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) > + *wake_state = true; > + ret = 1; > + } else if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) && > + rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { > + /* > + * De-offloading. Clear our flag and notify the de-offload worker. > + * We will ignore this rdp until it ever gets re-offloaded. > + */ > + rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP); > + if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) > + *wake_state = true; > + ret = 0; > + } else { > + WARN_ON_ONCE(1); > + ret = -1; > } > > - /* > - * De-offloading. Clear our flag and notify the de-offload worker. > - * We will ignore this rdp until it ever gets re-offloaded. > - */ > - WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)); > - rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP); > - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) > - *needwake_state = true; > - return true; > -} > + rcu_nocb_unlock_irqrestore(rdp, flags); > > + return ret; > +} > > /* > * No-CBs GP kthreads come here to wait for additional callbacks to show up > @@ -609,7 +600,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > bool needwait_gp = false; // This prevents actual uninitialized use. > bool needwake; > bool needwake_gp; > - struct rcu_data *rdp; > + struct rcu_data *rdp, *rdp_toggling = NULL; > struct rcu_node *rnp; > unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. > bool wasempty = false; > @@ -634,19 +625,10 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > * is added to the list, so the skipped-over rcu_data structures > * won't be ignored for long. > */ > - list_for_each_entry_rcu(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp, 1) { > - bool needwake_state = false; > - > - if (!nocb_gp_enabled_cb(rdp)) > - continue; > + list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) { > trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); > rcu_nocb_lock_irqsave(rdp, flags); > - if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) { > - rcu_nocb_unlock_irqrestore(rdp, flags); > - if (needwake_state) > - swake_up_one(&rdp->nocb_state_wq); > - continue; > - } > + lockdep_assert_held(&rdp->nocb_lock); > bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > if (bypass_ncbs && > (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || > @@ -656,8 +638,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); > } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { > rcu_nocb_unlock_irqrestore(rdp, flags); > - if (needwake_state) > - swake_up_one(&rdp->nocb_state_wq); > continue; /* No callbacks here, try next. */ > } > if (bypass_ncbs) { > @@ -705,8 +685,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > } > if (needwake_gp) > rcu_gp_kthread_wake(); > - if (needwake_state) > - swake_up_one(&rdp->nocb_state_wq); > } > > my_rdp->nocb_gp_bypass = bypass; > @@ -739,15 +717,49 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) > !READ_ONCE(my_rdp->nocb_gp_sleep)); > trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); > } > + > if (!rcu_nocb_poll) { > raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); > + // (De-)queue an rdp to/from the group if its nocb state is changing > + rdp_toggling = my_rdp->nocb_toggling_rdp; > + if (rdp_toggling) > + my_rdp->nocb_toggling_rdp = NULL; > + > if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) { > WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); > del_timer(&my_rdp->nocb_timer); > } > WRITE_ONCE(my_rdp->nocb_gp_sleep, true); > raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); > + } else { > + rdp_toggling = READ_ONCE(my_rdp->nocb_toggling_rdp); > + if (rdp_toggling) { > + /* > + * Paranoid locking to make sure nocb_toggling_rdp is well > + * reset *before* we (re)set SEGCBLIST_KTHREAD_GP or we could > + * race with another round of nocb toggling for this rdp. > + * Nocb locking should prevent from that already but we stick > + * to paranoia, especially in rare path. > + */ > + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); > + my_rdp->nocb_toggling_rdp = NULL; > + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); > + } > + } > + > + if (rdp_toggling) { > + bool wake_state = false; > + int ret; > + > + ret = nocb_gp_toggle_rdp(rdp_toggling, &wake_state); > + if (ret == 1) > + list_add_tail(&rdp_toggling->nocb_entry_rdp, &my_rdp->nocb_head_rdp); > + else if (ret == 0) > + list_del(&rdp_toggling->nocb_entry_rdp); > + if (wake_state) > + swake_up_one(&rdp_toggling->nocb_state_wq); > } > + > my_rdp->nocb_gp_seq = -1; > WARN_ON(signal_pending(current)); > } > @@ -966,6 +978,8 @@ static int rdp_offload_toggle(struct rcu_data *rdp, > swake_up_one(&rdp->nocb_cb_wq); > > raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); > + // Queue this rdp for add/del to/from the list to iterate on rcuog > + WRITE_ONCE(rdp_gp->nocb_toggling_rdp, rdp); > if (rdp_gp->nocb_gp_sleep) { > rdp_gp->nocb_gp_sleep = false; > wake_gp = true; > @@ -1013,8 +1027,6 @@ static long rcu_nocb_rdp_deoffload(void *arg) > swait_event_exclusive(rdp->nocb_state_wq, > !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB | > SEGCBLIST_KTHREAD_GP)); > - /* Stop nocb_gp_wait() from iterating over this structure. */ > - list_del_rcu(&rdp->nocb_entry_rdp); > /* > * Lock one last time to acquire latest callback updates from kthreads > * so we can later handle callbacks locally without locking. > @@ -1079,16 +1091,6 @@ static long rcu_nocb_rdp_offload(void *arg) > > pr_info("Offloading %d\n", rdp->cpu); > > - /* > - * Cause future nocb_gp_wait() invocations to iterate over > - * structure, resetting ->nocb_gp_sleep and waking up the related > - * "rcuog". Since nocb_gp_wait() in turn locks ->nocb_gp_lock > - * before setting ->nocb_gp_sleep again, we are guaranteed to > - * iterate this newly added structure before "rcuog" goes to > - * sleep again. > - */ > - list_add_tail_rcu(&rdp->nocb_entry_rdp, &rdp->nocb_gp_rdp->nocb_head_rdp); > - > /* > * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING > * is set.
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 2ccf5845957df..4f8532c33558f 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -235,6 +235,7 @@ struct rcu_data { * if rdp_gp. */ struct list_head nocb_entry_rdp; /* rcu_data node in wakeup chain. */ + struct rcu_data *nocb_toggling_rdp; /* rdp queued for (de-)offloading */ /* The following fields are used by CB kthread, hence new cacheline. */ struct rcu_data *nocb_gp_rdp ____cacheline_internodealigned_in_smp; diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h index 46694e13398a3..dac74952e1d1b 100644 --- a/kernel/rcu/tree_nocb.h +++ b/kernel/rcu/tree_nocb.h @@ -546,52 +546,43 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, } } -/* - * Check if we ignore this rdp. - * - * We check that without holding the nocb lock but - * we make sure not to miss a freshly offloaded rdp - * with the current ordering: - * - * rdp_offload_toggle() nocb_gp_enabled_cb() - * ------------------------- ---------------------------- - * WRITE flags LOCK nocb_gp_lock - * LOCK nocb_gp_lock READ/WRITE nocb_gp_sleep - * READ/WRITE nocb_gp_sleep UNLOCK nocb_gp_lock - * UNLOCK nocb_gp_lock READ flags - */ -static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp) -{ - u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP; - - return rcu_segcblist_test_flags(&rdp->cblist, flags); -} - -static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp, - bool *needwake_state) +static int nocb_gp_toggle_rdp(struct rcu_data *rdp, + bool *wake_state) { struct rcu_segcblist *cblist = &rdp->cblist; + unsigned long flags; + int ret; - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) { - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { - rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP); - if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) - *needwake_state = true; - } - return false; + rcu_nocb_lock_irqsave(rdp, flags); + if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) && + !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { + /* + * Offloading. Set our flag and notify the offload worker. + * We will handle this rdp until it ever gets de-offloaded. + */ + rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP); + if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) + *wake_state = true; + ret = 1; + } else if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED) && + rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) { + /* + * De-offloading. Clear our flag and notify the de-offload worker. + * We will ignore this rdp until it ever gets re-offloaded. + */ + rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP); + if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) + *wake_state = true; + ret = 0; + } else { + WARN_ON_ONCE(1); + ret = -1; } - /* - * De-offloading. Clear our flag and notify the de-offload worker. - * We will ignore this rdp until it ever gets re-offloaded. - */ - WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)); - rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP); - if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) - *needwake_state = true; - return true; -} + rcu_nocb_unlock_irqrestore(rdp, flags); + return ret; +} /* * No-CBs GP kthreads come here to wait for additional callbacks to show up @@ -609,7 +600,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) bool needwait_gp = false; // This prevents actual uninitialized use. bool needwake; bool needwake_gp; - struct rcu_data *rdp; + struct rcu_data *rdp, *rdp_toggling = NULL; struct rcu_node *rnp; unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. bool wasempty = false; @@ -634,19 +625,10 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) * is added to the list, so the skipped-over rcu_data structures * won't be ignored for long. */ - list_for_each_entry_rcu(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp, 1) { - bool needwake_state = false; - - if (!nocb_gp_enabled_cb(rdp)) - continue; + list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); rcu_nocb_lock_irqsave(rdp, flags); - if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) { - rcu_nocb_unlock_irqrestore(rdp, flags); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); - continue; - } + lockdep_assert_held(&rdp->nocb_lock); bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); if (bypass_ncbs && (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || @@ -656,8 +638,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { rcu_nocb_unlock_irqrestore(rdp, flags); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); continue; /* No callbacks here, try next. */ } if (bypass_ncbs) { @@ -705,8 +685,6 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) } if (needwake_gp) rcu_gp_kthread_wake(); - if (needwake_state) - swake_up_one(&rdp->nocb_state_wq); } my_rdp->nocb_gp_bypass = bypass; @@ -739,15 +717,49 @@ static void nocb_gp_wait(struct rcu_data *my_rdp) !READ_ONCE(my_rdp->nocb_gp_sleep)); trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); } + if (!rcu_nocb_poll) { raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + // (De-)queue an rdp to/from the group if its nocb state is changing + rdp_toggling = my_rdp->nocb_toggling_rdp; + if (rdp_toggling) + my_rdp->nocb_toggling_rdp = NULL; + if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) { WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); del_timer(&my_rdp->nocb_timer); } WRITE_ONCE(my_rdp->nocb_gp_sleep, true); raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } else { + rdp_toggling = READ_ONCE(my_rdp->nocb_toggling_rdp); + if (rdp_toggling) { + /* + * Paranoid locking to make sure nocb_toggling_rdp is well + * reset *before* we (re)set SEGCBLIST_KTHREAD_GP or we could + * race with another round of nocb toggling for this rdp. + * Nocb locking should prevent from that already but we stick + * to paranoia, especially in rare path. + */ + raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); + my_rdp->nocb_toggling_rdp = NULL; + raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); + } + } + + if (rdp_toggling) { + bool wake_state = false; + int ret; + + ret = nocb_gp_toggle_rdp(rdp_toggling, &wake_state); + if (ret == 1) + list_add_tail(&rdp_toggling->nocb_entry_rdp, &my_rdp->nocb_head_rdp); + else if (ret == 0) + list_del(&rdp_toggling->nocb_entry_rdp); + if (wake_state) + swake_up_one(&rdp_toggling->nocb_state_wq); } + my_rdp->nocb_gp_seq = -1; WARN_ON(signal_pending(current)); } @@ -966,6 +978,8 @@ static int rdp_offload_toggle(struct rcu_data *rdp, swake_up_one(&rdp->nocb_cb_wq); raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); + // Queue this rdp for add/del to/from the list to iterate on rcuog + WRITE_ONCE(rdp_gp->nocb_toggling_rdp, rdp); if (rdp_gp->nocb_gp_sleep) { rdp_gp->nocb_gp_sleep = false; wake_gp = true; @@ -1013,8 +1027,6 @@ static long rcu_nocb_rdp_deoffload(void *arg) swait_event_exclusive(rdp->nocb_state_wq, !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB | SEGCBLIST_KTHREAD_GP)); - /* Stop nocb_gp_wait() from iterating over this structure. */ - list_del_rcu(&rdp->nocb_entry_rdp); /* * Lock one last time to acquire latest callback updates from kthreads * so we can later handle callbacks locally without locking. @@ -1079,16 +1091,6 @@ static long rcu_nocb_rdp_offload(void *arg) pr_info("Offloading %d\n", rdp->cpu); - /* - * Cause future nocb_gp_wait() invocations to iterate over - * structure, resetting ->nocb_gp_sleep and waking up the related - * "rcuog". Since nocb_gp_wait() in turn locks ->nocb_gp_lock - * before setting ->nocb_gp_sleep again, we are guaranteed to - * iterate this newly added structure before "rcuog" goes to - * sleep again. - */ - list_add_tail_rcu(&rdp->nocb_entry_rdp, &rdp->nocb_gp_rdp->nocb_head_rdp); - /* * Can't use rcu_nocb_lock_irqsave() before SEGCBLIST_LOCKING * is set.