Implement timer-based RCU callback batching. The batch is flushed whenever a certain amount of time has passed, or the batch on a particular CPU grows too big. Also memory pressure can flush it. Locking is avoided to reduce lock contention when queuing and dequeuing happens on different CPUs of a per-cpu list, such as when shrinker context is running on different CPU. Also not having to use locks keeps the per-CPU structure size small. Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> --- include/linux/rcupdate.h | 6 ++ kernel/rcu/Kconfig | 8 +++ kernel/rcu/Makefile | 1 + kernel/rcu/lazy.c | 145 +++++++++++++++++++++++++++++++++++++++ kernel/rcu/rcu.h | 5 ++ kernel/rcu/tree.c | 2 + 6 files changed, 167 insertions(+) create mode 100644 kernel/rcu/lazy.c diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 88b42eb46406..d0a6c4f5172c 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -82,6 +82,12 @@ static inline int rcu_preempt_depth(void) #endif /* #else #ifdef CONFIG_PREEMPT_RCU */ +#ifdef CONFIG_RCU_LAZY +void call_rcu_lazy(struct rcu_head *head, rcu_callback_t func); +#else +#define call_rcu_lazy(head, func) call_rcu(head, func) +#endif + /* Internal to kernel */ void rcu_init(void); extern int rcu_scheduler_active __read_mostly; diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig index bf8e341e75b4..c09715079829 100644 --- a/kernel/rcu/Kconfig +++ b/kernel/rcu/Kconfig @@ -241,4 +241,12 @@ config TASKS_TRACE_RCU_READ_MB Say N here if you hate read-side memory barriers. Take the default if you are unsure. +config RCU_LAZY + bool "RCU callback lazy invocation functionality" + depends on RCU_NOCB_CPU + default y + help + To save power, batch RCU callbacks and flush after delay, memory + pressure or callback list growing too big. + endmenu # "RCU Subsystem" diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile index 0cfb009a99b9..8968b330d6e0 100644 --- a/kernel/rcu/Makefile +++ b/kernel/rcu/Makefile @@ -16,3 +16,4 @@ obj-$(CONFIG_RCU_REF_SCALE_TEST) += refscale.o obj-$(CONFIG_TREE_RCU) += tree.o obj-$(CONFIG_TINY_RCU) += tiny.o obj-$(CONFIG_RCU_NEED_SEGCBLIST) += rcu_segcblist.o +obj-$(CONFIG_RCU_LAZY) += lazy.o diff --git a/kernel/rcu/lazy.c b/kernel/rcu/lazy.c new file mode 100644 index 000000000000..55e406cfc528 --- /dev/null +++ b/kernel/rcu/lazy.c @@ -0,0 +1,145 @@ +/* + * Lockless lazy-RCU implementation. + */ +#include <linux/rcupdate.h> +#include <linux/shrinker.h> +#include <linux/workqueue.h> +#include "rcu.h" + +// How much to batch before flushing? +#define MAX_LAZY_BATCH 2048 + +// How much to wait before flushing? +#define MAX_LAZY_JIFFIES 10000 + +// We cast lazy_rcu_head to rcu_head and back. This keeps the API simple while +// allowing us to use lockless list node in the head. Also, we use BUILD_BUG_ON +// later to ensure that rcu_head and lazy_rcu_head are of the same size. +struct lazy_rcu_head { + struct llist_node llist_node; + void (*func)(struct callback_head *head); +} __attribute__((aligned(sizeof(void *)))); + +struct rcu_lazy_pcp { + struct llist_head head; + struct delayed_work work; + atomic_t count; +}; +DEFINE_PER_CPU(struct rcu_lazy_pcp, rcu_lazy_pcp_ins); + +// Lockless flush of CPU, can be called concurrently. +static void lazy_rcu_flush_cpu(struct rcu_lazy_pcp *rlp) +{ + struct llist_node *node = llist_del_all(&rlp->head); + struct lazy_rcu_head *cursor, *temp; + + if (!node) + return; + + llist_for_each_entry_safe(cursor, temp, node, llist_node) { + struct rcu_head *rh = (struct rcu_head *)cursor; + debug_rcu_head_unqueue(rh); + call_rcu(rh, rh->func); + atomic_dec(&rlp->count); + } +} + +void call_rcu_lazy(struct rcu_head *head_rcu, rcu_callback_t func) +{ + struct lazy_rcu_head *head = (struct lazy_rcu_head *)head_rcu; + struct rcu_lazy_pcp *rlp; + + preempt_disable(); + rlp = this_cpu_ptr(&rcu_lazy_pcp_ins); + preempt_enable(); + + if (debug_rcu_head_queue((void *)head)) { + // Probable double call_rcu(), just leak. + WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", + __func__, head); + + // Mark as success and leave. + return; + } + + // Queue to per-cpu llist + head->func = func; + llist_add(&head->llist_node, &rlp->head); + + // Flush queue if too big + if (atomic_inc_return(&rlp->count) >= MAX_LAZY_BATCH) { + lazy_rcu_flush_cpu(rlp); + } else { + if (!delayed_work_pending(&rlp->work)) { + schedule_delayed_work(&rlp->work, MAX_LAZY_JIFFIES); + } + } +} + +static unsigned long +lazy_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc) +{ + unsigned long count = 0; + int cpu; + + /* Snapshot count of all CPUs */ + for_each_possible_cpu(cpu) { + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); + + count += atomic_read(&rlp->count); + } + + return count; +} + +static unsigned long +lazy_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc) +{ + int cpu, freed = 0; + + for_each_possible_cpu(cpu) { + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); + unsigned long count; + + count = atomic_read(&rlp->count); + lazy_rcu_flush_cpu(rlp); + sc->nr_to_scan -= count; + freed += count; + if (sc->nr_to_scan <= 0) + break; + } + + return freed == 0 ? SHRINK_STOP : freed; +} + +/* + * This function is invoked after MAX_LAZY_JIFFIES timeout. + */ +static void lazy_work(struct work_struct *work) +{ + struct rcu_lazy_pcp *rlp = container_of(work, struct rcu_lazy_pcp, work.work); + + lazy_rcu_flush_cpu(rlp); +} + +static struct shrinker lazy_rcu_shrinker = { + .count_objects = lazy_rcu_shrink_count, + .scan_objects = lazy_rcu_shrink_scan, + .batch = 0, + .seeks = DEFAULT_SEEKS, +}; + +void __init rcu_lazy_init(void) +{ + int cpu; + + BUILD_BUG_ON(sizeof(struct lazy_rcu_head) != sizeof(struct rcu_head)); + + for_each_possible_cpu(cpu) { + struct rcu_lazy_pcp *rlp = per_cpu_ptr(&rcu_lazy_pcp_ins, cpu); + INIT_DELAYED_WORK(&rlp->work, lazy_work); + } + + if (register_shrinker(&lazy_rcu_shrinker)) + pr_err("Failed to register lazy_rcu shrinker!\n"); +} diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h index 24b5f2c2de87..a5f4b44f395f 100644 --- a/kernel/rcu/rcu.h +++ b/kernel/rcu/rcu.h @@ -561,4 +561,9 @@ void show_rcu_tasks_trace_gp_kthread(void); static inline void show_rcu_tasks_trace_gp_kthread(void) {} #endif +#ifdef CONFIG_RCU_LAZY +void rcu_lazy_init(void); +#else +static inline void rcu_lazy_init(void) {} +#endif #endif /* __LINUX_RCU_H */ diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index a4c25a6283b0..ebdf6f7c9023 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -4775,6 +4775,8 @@ void __init rcu_init(void) qovld_calc = DEFAULT_RCU_QOVLD_MULT * qhimark; else qovld_calc = qovld; + + rcu_lazy_init(); } #include "tree_stall.h" -- 2.36.0.550.gb090851708-goog