Some extra but not overlapping comments with already mentioned in this email thread: > Implement timer-based RCU callback batching. The batch is flushed > whenever a certain amount of time has passed, or the batch on a > particular CPU grows too big. Also memory pressure can flush it. > > Locking is avoided to reduce lock contention when queuing and dequeuing > happens on different CPUs of a per-cpu list, such as when shrinker > context is running on different CPU. Also not having to use locks keeps > the per-CPU structure size small. > > Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> > --- > include/linux/rcupdate.h | 6 ++ > kernel/rcu/Kconfig | 8 +++ > kernel/rcu/Makefile | 1 + > kernel/rcu/lazy.c | 145 +++++++++++++++++++++++++++++++++++++++ > kernel/rcu/rcu.h | 5 ++ > kernel/rcu/tree.c | 2 + > 6 files changed, 167 insertions(+) > create mode 100644 kernel/rcu/lazy.c > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > index 88b42eb46406..d0a6c4f5172c 100644 > --- a/include/linux/rcupdate.h > +++ b/include/linux/rcupdate.h > @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) > > #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ > > +#ifdef CONFIG_RCU_LAZY > +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); > +#else > +#define call_rcu_lazy(head, func) call_rcu(head, func) > +#endif > + > /* Internal to kernel */ > void rcu_init(void); > extern int rcu_scheduler_active __read_mostly; > diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig > index bf8e341e75b4..c09715079829 100644 > --- a/kernel/rcu/Kconfig > +++ b/kernel/rcu/Kconfig > @@ -241,4 +241,12 @@ config TASKS_TRACE_RCU_READ_MB > Say N here if you hate read-side memory barriers. > Take the default if you are unsure. > > +config RCU_LAZY > + bool "RCU callback lazy invocation functionality" > + depends on RCU_NOCB_CPU > + default y > + help > + To save power, batch RCU callbacks and flush after delay, memory > + pressure or callback list growing too big. > + > endmenu # "RCU Subsystem" > diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile > index 0cfb009a99b9..8968b330d6e0 100644 > --- a/kernel/rcu/Makefile > +++ b/kernel/rcu/Makefile > @@ -16,3 +16,4 @@ obj-$(CONFIG_RCU_REF_SCALE_TEST) += refscale.o > obj-$(CONFIG_TREE_RCU) += tree.o > obj-$(CONFIG_TINY_RCU) += tiny.o > obj-$(CONFIG_RCU_NEED_SEGCBLIST) += rcu_segcblist.o > +obj-$(CONFIG_RCU_LAZY) += lazy.o > diff --git a/kernel/rcu/lazy.c b/kernel/rcu/lazy.c > new file mode 100644 > index 000000000000..55e406cfc528 > --- /dev/null > +++ b/kernel/rcu/lazy.c > @@ -0,0 +1,145 @@ > +/* > + * Lockless lazy-RCU implementation. > + */ > +#include <linux/rcupdate.h> > +#include <linux/shrinker.h> > +#include <linux/workqueue.h> > +#include "rcu.h" > + > +// How much to batch before flushing? > +#define MAX_LAZY_BATCH 2048 > + > +// How much to wait before flushing? > +#define MAX_LAZY_JIFFIES 10000 > + > +// We cast lazy_rcu_head to rcu_head and back. This keeps the API simple while > +// allowing us to use lockless list node in the head. Also, we use BUILD_BUG_ON > +// later to ensure that rcu_head and lazy_rcu_head are of the same size. > +struct lazy_rcu_head { > + struct llist_node llist_node; > + void (*func)(struct callback_head *head); > +} __attribute__((aligned(sizeof(void *)))); > + > +struct rcu_lazy_pcp { > + struct llist_head head; > + struct delayed_work work; > + atomic_t count; > +}; > +DEFINE_PER_CPU(struct rcu_lazy_pcp, rcu_lazy_pcp_ins); > + > +// Lockless flush of CPU, can be called concurrently. > +static void lazy_rcu_flush_cpu(struct rcu_lazy_pcp *rlp) > +{ > + struct llist_node *node = llist_del_all(&rlp->head); > + struct lazy_rcu_head *cursor, *temp; > + > + if (!node) > + return; > + > + llist_for_each_entry_safe(cursor, temp, node, llist_node) { > + struct rcu_head *rh = (struct rcu_head *)cursor; > + debug_rcu_head_unqueue(rh); > + call_rcu(rh, rh->func); > + atomic_dec(&rlp->count); > + } > +} > + > +void call_rcu_lazy(struct rcu_head *head_rcu, rcu_callback_t func) > +{ > + struct lazy_rcu_head *head = (struct lazy_rcu_head *)head_rcu; > + struct rcu_lazy_pcp *rlp; > + > + preempt_disable(); > + rlp = this_cpu_ptr(&rcu_lazy_pcp_ins); > + preempt_enable(); > Can we get rid of such explicit disabling/enabling preemption? > + > + if (debug_rcu_head_queue((void *)head)) { > + // Probable double call_rcu(), just leak. > + WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", > + __func__, head); > + > + // Mark as success and leave. > + return; > + } > + > + // Queue to per-cpu llist > + head->func = func; > + llist_add(&head->llist_node, &rlp->head); > + > + // Flush queue if too big > + if (atomic_inc_return(&rlp->count) >= MAX_LAZY_BATCH) { > + lazy_rcu_flush_cpu(rlp); > Can we just schedule the work instead of drawn from the caller context? For example it can be a hard-irq context. > + } else { > + if (!delayed_work_pending(&rlp->work)) { > + schedule_delayed_work(&rlp->work, MAX_LAZY_JIFFIES); > + } > + } > +} EXPORT_SYMBOL_GPL()? to be able to use in kernel modules. > + > +static unsigned long > +lazy_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) > +{ > + unsigned long count = 0; > + int cpu; > + > + /* Snapshot count of all CPUs */ > + for_each_possible_cpu(cpu) { > + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); > + > + count += atomic_read(&rlp->count); > + } > + > + return count; > +} > + > +static unsigned long > +lazy_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc) > +{ > + int cpu, freed = 0; > + > + for_each_possible_cpu(cpu) { > + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); > + unsigned long count; > + > + count = atomic_read(&rlp->count); > + lazy_rcu_flush_cpu(rlp); > + sc->nr_to_scan -= count; > + freed += count; > + if (sc->nr_to_scan <= 0) > + break; > + } > + > + return freed == 0 ? SHRINK_STOP : freed; > +} > + > +/* > + * This function is invoked after MAX_LAZY_JIFFIES timeout. > + */ > +static void lazy_work(struct work_struct *work) > +{ > + struct rcu_lazy_pcp *rlp = container_of(work, struct rcu_lazy_pcp, work.work); > + > + lazy_rcu_flush_cpu(rlp); > +} > + > +static struct shrinker lazy_rcu_shrinker = { > + .count_objects = lazy_rcu_shrink_count, > + .scan_objects = lazy_rcu_shrink_scan, > + .batch = 0, > + .seeks = DEFAULT_SEEKS, > +}; > + > +void __init rcu_lazy_init(void) > +{ > + int cpu; > + > + BUILD_BUG_ON(sizeof(struct lazy_rcu_head) != sizeof(struct rcu_head)); > + > + for_each_possible_cpu(cpu) { > + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); > + INIT_DELAYED_WORK(&rlp->work, lazy_work); > + } > + > + if (register_shrinker(&lazy_rcu_shrinker)) > + pr_err("Failed to register lazy_rcu shrinker!\n"); > +} > diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h > index 24b5f2c2de87..a5f4b44f395f 100644 > --- a/kernel/rcu/rcu.h > +++ b/kernel/rcu/rcu.h > @@ -561,4 +561,9 @@ void show_rcu_tasks_trace_gp_kthread(void); > static inline void show_rcu_tasks_trace_gp_kthread(void) {} > #endif > > +#ifdef CONFIG_RCU_LAZY > +void rcu_lazy_init(void); > +#else > +static inline void rcu_lazy_init(void) {} > +#endif > #endif /* __LINUX_RCU_H */ > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index a4c25a6283b0..ebdf6f7c9023 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -4775,6 +4775,8 @@ void __init rcu_init(void) > qovld_calc = DEFAULT_RCU_QOVLD_MULT * qhimark; > else > qovld_calc = qovld; > + > + rcu_lazy_init(); > } > > #include "tree_stall.h" > -- > 2.36.0.550.gb090851708-goog >