On Thu, Aug 01, 2024 at 01:10:39PM +0200, Uladzislau Rezki (Sony) wrote: > Add a kvfree_rcu_barrier() function. It waits until all > in-flight pointers are freed over RCU machinery. It does > not wait any GP completion and it is within its right to > return immediately if there are no outstanding pointers. > > This function is useful when there is a need to guarantee > that a memory is fully freed before destroying memory caches. > For example, during unloading a kernel module. > > Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx> > --- > include/linux/rcutiny.h | 5 ++ > include/linux/rcutree.h | 1 + > kernel/rcu/tree.c | 103 ++++++++++++++++++++++++++++++++++++---- > 3 files changed, 101 insertions(+), 8 deletions(-) > > diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h > index d9ac7b136aea..522123050ff8 100644 > --- a/include/linux/rcutiny.h > +++ b/include/linux/rcutiny.h > @@ -111,6 +111,11 @@ static inline void __kvfree_call_rcu(struct rcu_head *head, void *ptr) > kvfree(ptr); > } > > +static inline void kvfree_rcu_barrier(void) > +{ > + rcu_barrier(); > +} > + > #ifdef CONFIG_KASAN_GENERIC > void kvfree_call_rcu(struct rcu_head *head, void *ptr); > #else > diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h > index 254244202ea9..58e7db80f3a8 100644 > --- a/include/linux/rcutree.h > +++ b/include/linux/rcutree.h > @@ -35,6 +35,7 @@ static inline void rcu_virt_note_context_switch(void) > > void synchronize_rcu_expedited(void); > void kvfree_call_rcu(struct rcu_head *head, void *ptr); > +void kvfree_rcu_barrier(void); > > void rcu_barrier(void); > void rcu_momentary_dyntick_idle(void); > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index 28c7031711a3..1423013f9fe6 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -3550,18 +3550,15 @@ kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) > } > > /* > - * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. > + * Return: %true if a work is queued, %false otherwise. > */ > -static void kfree_rcu_monitor(struct work_struct *work) > +static bool > +kvfree_rcu_queue_batch(struct kfree_rcu_cpu *krcp) > { > - struct kfree_rcu_cpu *krcp = container_of(work, > - struct kfree_rcu_cpu, monitor_work.work); > unsigned long flags; > + bool queued = false; > int i, j; > > - // Drain ready for reclaim. > - kvfree_rcu_drain_ready(krcp); > - > raw_spin_lock_irqsave(&krcp->lock, flags); > > // Attempt to start a new batch. > @@ -3600,11 +3597,27 @@ static void kfree_rcu_monitor(struct work_struct *work) > // be that the work is in the pending state when > // channels have been detached following by each > // other. > - queue_rcu_work(system_wq, &krwp->rcu_work); > + queued = queue_rcu_work(system_wq, &krwp->rcu_work); > } > } > > raw_spin_unlock_irqrestore(&krcp->lock, flags); > + return queued; > +} > + > +/* > + * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. > + */ > +static void kfree_rcu_monitor(struct work_struct *work) > +{ > + struct kfree_rcu_cpu *krcp = container_of(work, > + struct kfree_rcu_cpu, monitor_work.work); > + > + // Drain ready for reclaim. > + kvfree_rcu_drain_ready(krcp); > + > + // Queue a batch for a rest. > + kvfree_rcu_queue_batch(krcp); > > // If there is nothing to detach, it means that our job is > // successfully done here. In case of having at least one > @@ -3825,6 +3838,80 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr) > } > EXPORT_SYMBOL_GPL(kvfree_call_rcu); > > +/** > + * kvfree_rcu_barrier - Wait until all in-flight kvfree_rcu() complete. > + * > + * Note that a single argument of kvfree_rcu() call has a slow path that > + * triggers synchronize_rcu() following by freeing a pointer. It is done > + * before the return from the function. Therefore for any single-argument > + * call that will result in a kfree() to a cache that is to be destroyed > + * during module exit, it is developer's responsibility to ensure that all > + * such calls have returned before the call to kmem_cache_destroy(). > + */ > +void kvfree_rcu_barrier(void) > +{ > + struct kfree_rcu_cpu_work *krwp; > + struct kfree_rcu_cpu *krcp; > + bool queued; > + int i, cpu; > + > + /* > + * Firstly we detach objects and queue them over an RCU-batch > + * for all CPUs. Finally queued works are flushed for each CPU. > + * > + * Please note. If there are outstanding batches for a particular > + * CPU, those have to be finished first following by queuing a new. > + */ > + for_each_possible_cpu(cpu) { > + krcp = per_cpu_ptr(&krc, cpu); > + > + /* > + * Check if this CPU has any objects which have been queued for a > + * new GP completion. If not(means nothing to detach), we are done > + * with it. If any batch is pending/running for this "krcp", below > + * per-cpu flush_rcu_work() waits its completion(see last step). > + */ > + if (!need_offload_krc(krcp)) Still trying to figure out the locking inside kfree_rcu(), but don't you need holding krcp->lock to performance these checks? Regards, Boqun > + continue; > + > + while (1) { > + /* > + * If we are not able to queue a new RCU work it means: > + * - batches for this CPU are still in flight which should > + * be flushed first and then repeat; > + * - no objects to detach, because of concurrency. > + */ > + queued = kvfree_rcu_queue_batch(krcp); > + > + /* > + * Bail out, if there is no need to offload this "krcp" > + * anymore. As noted earlier it can run concurrently. > + */ > + if (queued || !need_offload_krc(krcp)) > + break; > + > + /* There are ongoing batches. */ > + for (i = 0; i < KFREE_N_BATCHES; i++) { > + krwp = &(krcp->krw_arr[i]); > + flush_rcu_work(&krwp->rcu_work); > + } > + } > + } > + > + /* > + * Now we guarantee that all objects are flushed. > + */ > + for_each_possible_cpu(cpu) { > + krcp = per_cpu_ptr(&krc, cpu); > + > + for (i = 0; i < KFREE_N_BATCHES; i++) { > + krwp = &(krcp->krw_arr[i]); > + flush_rcu_work(&krwp->rcu_work); > + } > + } > +} > +EXPORT_SYMBOL_GPL(kvfree_rcu_barrier); > + > static unsigned long > kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) > { > -- > 2.39.2 >