Message ID | 20240807-b4-slab-kfree_rcu-destroy-v2-5-ea79102f428c@suse.cz (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | mm, slub: handle pending kfree_rcu() in kmem_cache_destroy() | expand |
Hello, Vlastimil! > From: "Uladzislau Rezki (Sony)" <urezki@gmail.com> > > Add a kvfree_rcu_barrier() function. It waits until all > in-flight pointers are freed over RCU machinery. It does > not wait any GP completion and it is within its right to > return immediately if there are no outstanding pointers. > > This function is useful when there is a need to guarantee > that a memory is fully freed before destroying memory caches. > For example, during unloading a kernel module. > > Signed-off-by: Uladzislau Rezki (Sony) <urezki@gmail.com> > Signed-off-by: Vlastimil Babka <vbabka@suse.cz> > --- > include/linux/rcutiny.h | 5 +++ > include/linux/rcutree.h | 1 + > kernel/rcu/tree.c | 103 ++++++++++++++++++++++++++++++++++++++++++++---- > 3 files changed, 101 insertions(+), 8 deletions(-) > > diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h > index d9ac7b136aea..522123050ff8 100644 > --- a/include/linux/rcutiny.h > +++ b/include/linux/rcutiny.h > @@ -111,6 +111,11 @@ static inline void __kvfree_call_rcu(struct rcu_head *head, void *ptr) > kvfree(ptr); > } > > +static inline void kvfree_rcu_barrier(void) > +{ > + rcu_barrier(); > +} > + > #ifdef CONFIG_KASAN_GENERIC > void kvfree_call_rcu(struct rcu_head *head, void *ptr); > #else > diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h > index 254244202ea9..58e7db80f3a8 100644 > --- a/include/linux/rcutree.h > +++ b/include/linux/rcutree.h > @@ -35,6 +35,7 @@ static inline void rcu_virt_note_context_switch(void) > > void synchronize_rcu_expedited(void); > void kvfree_call_rcu(struct rcu_head *head, void *ptr); > +void kvfree_rcu_barrier(void); > > void rcu_barrier(void); > void rcu_momentary_dyntick_idle(void); > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index e641cc681901..ebcfed9b570e 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -3584,18 +3584,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) > } > > /* > - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. > + * Return: %true if a work is queued, %false otherwise. > */ > -static void kfree_rcu_monitor(struct work_struct *work) > +static bool > +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp) > { > - struct kfree_rcu_cpu *krcp = container_of(work, > - struct kfree_rcu_cpu, monitor_work.work); > unsigned long flags; > + bool queued = false; > int i, j; > > - // Drain ready for reclaim. > - kvfree_rcu_drain_ready(krcp); > - > raw_spin_lock_irqsave(&krcp->lock, flags); > > // Attempt to start a new batch. > @@ -3634,11 +3631,27 @@ static void kfree_rcu_monitor(struct work_struct *work) > // be that the work is in the pending state when > // channels have been detached following by each > // other. > - queue_rcu_work(system_wq, &krwp->rcu_work); > + queued = queue_rcu_work(system_wq, &krwp->rcu_work); > } > } > > raw_spin_unlock_irqrestore(&krcp->lock, flags); > + return queued; > +} > + > +/* > + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. > + */ > +static void kfree_rcu_monitor(struct work_struct *work) > +{ > + struct kfree_rcu_cpu *krcp = container_of(work, > + struct kfree_rcu_cpu, monitor_work.work); > + > + // Drain ready for reclaim. > + kvfree_rcu_drain_ready(krcp); > + > + // Queue a batch for a rest. > + kvfree_rcu_queue_batch(krcp); > > // If there is nothing to detach, it means that our job is > // successfully done here. In case of having at least one > @@ -3859,6 +3872,80 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr) > } > EXPORT_SYMBOL_GPL(kvfree_call_rcu); > > +/** > + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete. > + * > + * Note that a single argument of kvfree_rcu() call has a slow path that > + * triggers synchronize_rcu() following by freeing a pointer. It is done > + * before the return from the function. Therefore for any single-argument > + * call that will result in a kfree() to a cache that is to be destroyed > + * during module exit, it is developer's responsibility to ensure that all > + * such calls have returned before the call to kmem_cache_destroy(). > + */ > +void kvfree_rcu_barrier(void) > +{ > + struct kfree_rcu_cpu_work *krwp; > + struct kfree_rcu_cpu *krcp; > + bool queued; > + int i, cpu; > + > + /* > + * Firstly we detach objects and queue them over an RCU-batch > + * for all CPUs. Finally queued works are flushed for each CPU. > + * > + * Please note. If there are outstanding batches for a particular > + * CPU, those have to be finished first following by queuing a new. > + */ > + for_each_possible_cpu(cpu) { > + krcp = per_cpu_ptr(&krc, cpu); > + > + /* > + * Check if this CPU has any objects which have been queued for a > + * new GP completion. If not(means nothing to detach), we are done > + * with it. If any batch is pending/running for this "krcp", below > + * per-cpu flush_rcu_work() waits its completion(see last step). > + */ > + if (!need_offload_krc(krcp)) > + continue; > + > + while (1) { > + /* > + * If we are not able to queue a new RCU work it means: > + * - batches for this CPU are still in flight which should > + * be flushed first and then repeat; > + * - no objects to detach, because of concurrency. > + */ > + queued = kvfree_rcu_queue_batch(krcp); > + > + /* > + * Bail out, if there is no need to offload this "krcp" > + * anymore. As noted earlier it can run concurrently. > + */ > + if (queued || !need_offload_krc(krcp)) > + break; > + > + /* There are ongoing batches. */ > + for (i = 0; i < KFREE_N_BATCHES; i++) { > + krwp = &(krcp->krw_arr[i]); > + flush_rcu_work(&krwp->rcu_work); > + } > + } > + } > + > + /* > + * Now we guarantee that all objects are flushed. > + */ > + for_each_possible_cpu(cpu) { > + krcp = per_cpu_ptr(&krc, cpu); > + > + for (i = 0; i < KFREE_N_BATCHES; i++) { > + krwp = &(krcp->krw_arr[i]); > + flush_rcu_work(&krwp->rcu_work); > + } > + } > +} > +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier); > + > static unsigned long > kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) > { > > -- > 2.46.0 > I need to send out a v2. What is a best way? Please let me know. I have not checked where this series already landed. Thank you! -- Uladzislau Rezki
On 8/9/24 18:26, Uladzislau Rezki wrote: > Hello, Vlastimil! > I need to send out a v2. What is a best way? Please let me know. I have not > checked where this series already landed. Hi, you can just send it separately based on v6.11-rc2, as you did v1 and I will replace it in the slab/for-next. Thanks! Vlastimil > Thank you! > > -- > Uladzislau Rezki
Hello, Vlastimil! > On 8/9/24 18:26, Uladzislau Rezki wrote: > > Hello, Vlastimil! > > I need to send out a v2. What is a best way? Please let me know. I have not > > checked where this series already landed. > > Hi, > > you can just send it separately based on v6.11-rc2, as you did v1 and I will > replace it in the slab/for-next. Thanks! > Sorry for delay. I had a vacation last week. Just posted the v2 with a fix. Now my tests are passed! Thanks! -- Uladzislau Rezki
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h index d9ac7b136aea..522123050ff8 100644 --- a/include/linux/rcutiny.h +++ b/include/linux/rcutiny.h @@ -111,6 +111,11 @@ static inline void __kvfree_call_rcu(struct rcu_head *head, void *ptr) kvfree(ptr); } +static inline void kvfree_rcu_barrier(void) +{ + rcu_barrier(); +} + #ifdef CONFIG_KASAN_GENERIC void kvfree_call_rcu(struct rcu_head *head, void *ptr); #else diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h index 254244202ea9..58e7db80f3a8 100644 --- a/include/linux/rcutree.h +++ b/include/linux/rcutree.h @@ -35,6 +35,7 @@ static inline void rcu_virt_note_context_switch(void) void synchronize_rcu_expedited(void); void kvfree_call_rcu(struct rcu_head *head, void *ptr); +void kvfree_rcu_barrier(void); void rcu_barrier(void); void rcu_momentary_dyntick_idle(void); diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index e641cc681901..ebcfed9b570e 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -3584,18 +3584,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) } /* - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. + * Return: %true if a work is queued, %false otherwise. */ -static void kfree_rcu_monitor(struct work_struct *work) +static bool +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp) { - struct kfree_rcu_cpu *krcp = container_of(work, - struct kfree_rcu_cpu, monitor_work.work); unsigned long flags; + bool queued = false; int i, j; - // Drain ready for reclaim. - kvfree_rcu_drain_ready(krcp); - raw_spin_lock_irqsave(&krcp->lock, flags); // Attempt to start a new batch. @@ -3634,11 +3631,27 @@ static void kfree_rcu_monitor(struct work_struct *work) // be that the work is in the pending state when // channels have been detached following by each // other. - queue_rcu_work(system_wq, &krwp->rcu_work); + queued = queue_rcu_work(system_wq, &krwp->rcu_work); } } raw_spin_unlock_irqrestore(&krcp->lock, flags); + return queued; +} + +/* + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. + */ +static void kfree_rcu_monitor(struct work_struct *work) +{ + struct kfree_rcu_cpu *krcp = container_of(work, + struct kfree_rcu_cpu, monitor_work.work); + + // Drain ready for reclaim. + kvfree_rcu_drain_ready(krcp); + + // Queue a batch for a rest. + kvfree_rcu_queue_batch(krcp); // If there is nothing to detach, it means that our job is // successfully done here. In case of having at least one @@ -3859,6 +3872,80 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr) } EXPORT_SYMBOL_GPL(kvfree_call_rcu); +/** + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete. + * + * Note that a single argument of kvfree_rcu() call has a slow path that + * triggers synchronize_rcu() following by freeing a pointer. It is done + * before the return from the function. Therefore for any single-argument + * call that will result in a kfree() to a cache that is to be destroyed + * during module exit, it is developer's responsibility to ensure that all + * such calls have returned before the call to kmem_cache_destroy(). + */ +void kvfree_rcu_barrier(void) +{ + struct kfree_rcu_cpu_work *krwp; + struct kfree_rcu_cpu *krcp; + bool queued; + int i, cpu; + + /* + * Firstly we detach objects and queue them over an RCU-batch + * for all CPUs. Finally queued works are flushed for each CPU. + * + * Please note. If there are outstanding batches for a particular + * CPU, those have to be finished first following by queuing a new. + */ + for_each_possible_cpu(cpu) { + krcp = per_cpu_ptr(&krc, cpu); + + /* + * Check if this CPU has any objects which have been queued for a + * new GP completion. If not(means nothing to detach), we are done + * with it. If any batch is pending/running for this "krcp", below + * per-cpu flush_rcu_work() waits its completion(see last step). + */ + if (!need_offload_krc(krcp)) + continue; + + while (1) { + /* + * If we are not able to queue a new RCU work it means: + * - batches for this CPU are still in flight which should + * be flushed first and then repeat; + * - no objects to detach, because of concurrency. + */ + queued = kvfree_rcu_queue_batch(krcp); + + /* + * Bail out, if there is no need to offload this "krcp" + * anymore. As noted earlier it can run concurrently. + */ + if (queued || !need_offload_krc(krcp)) + break; + + /* There are ongoing batches. */ + for (i = 0; i < KFREE_N_BATCHES; i++) { + krwp = &(krcp->krw_arr[i]); + flush_rcu_work(&krwp->rcu_work); + } + } + } + + /* + * Now we guarantee that all objects are flushed. + */ + for_each_possible_cpu(cpu) { + krcp = per_cpu_ptr(&krc, cpu); + + for (i = 0; i < KFREE_N_BATCHES; i++) { + krwp = &(krcp->krw_arr[i]); + flush_rcu_work(&krwp->rcu_work); + } + } +} +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier); + static unsigned long kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) {