Hi Vlad, Thanks for the comments. I replied inline: On 5/17/22 05:07, Uladzislau Rezki wrote: >> diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile >> index 0cfb009a99b9..8968b330d6e0 100644 >> --- a/kernel/rcu/Makefile >> +++ b/kernel/rcu/Makefile >> @@ -16,3 +16,4 @@ obj-$(CONFIG_RCU_REF_SCALE_TEST) += refscale.o >> obj-$(CONFIG_TREE_RCU) += tree.o >> obj-$(CONFIG_TINY_RCU) += tiny.o >> obj-$(CONFIG_RCU_NEED_SEGCBLIST) += rcu_segcblist.o >> +obj-$(CONFIG_RCU_LAZY) += lazy.o >> diff --git a/kernel/rcu/lazy.c b/kernel/rcu/lazy.c >> new file mode 100644 >> index 000000000000..55e406cfc528 >> --- /dev/null >> +++ b/kernel/rcu/lazy.c >> @@ -0,0 +1,145 @@ >> +/* >> + * Lockless lazy-RCU implementation. >> + */ >> +#include <linux/rcupdate.h> >> +#include <linux/shrinker.h> >> +#include <linux/workqueue.h> >> +#include "rcu.h" >> + >> +// How much to batch before flushing? >> +#define MAX_LAZY_BATCH 2048 >> + >> +// How much to wait before flushing? >> +#define MAX_LAZY_JIFFIES 10000 >> + >> +// We cast lazy_rcu_head to rcu_head and back. This keeps the API simple while >> +// allowing us to use lockless list node in the head. Also, we use BUILD_BUG_ON >> +// later to ensure that rcu_head and lazy_rcu_head are of the same size. >> +struct lazy_rcu_head { >> + struct llist_node llist_node; >> + void (*func)(struct callback_head *head); >> +} __attribute__((aligned(sizeof(void *)))); >> + >> +struct rcu_lazy_pcp { >> + struct llist_head head; >> + struct delayed_work work; >> + atomic_t count; >> +}; >> +DEFINE_PER_CPU(struct rcu_lazy_pcp, rcu_lazy_pcp_ins); >> + >> +// Lockless flush of CPU, can be called concurrently. >> +static void lazy_rcu_flush_cpu(struct rcu_lazy_pcp *rlp) >> +{ >> + struct llist_node *node = llist_del_all(&rlp->head); >> + struct lazy_rcu_head *cursor, *temp; >> + >> + if (!node) >> + return; >> + >> + llist_for_each_entry_safe(cursor, temp, node, llist_node) { >> + struct rcu_head *rh = (struct rcu_head *)cursor; >> + debug_rcu_head_unqueue(rh); >> + call_rcu(rh, rh->func); >> + atomic_dec(&rlp->count); >> + } >> +} >> + >> +void call_rcu_lazy(struct rcu_head *head_rcu, rcu_callback_t func) >> +{ >> + struct lazy_rcu_head *head = (struct lazy_rcu_head *)head_rcu; >> + struct rcu_lazy_pcp *rlp; >> + >> + preempt_disable(); >> + rlp = this_cpu_ptr(&rcu_lazy_pcp_ins); >> + preempt_enable(); >> > Can we get rid of such explicit disabling/enabling preemption? Ok I'll try. Last I checked, something needs to disable preemption to prevent warnings with sampling the current processor ID. >> + >> + if (debug_rcu_head_queue((void *)head)) { >> + // Probable double call_rcu(), just leak. >> + WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", >> + __func__, head); >> + >> + // Mark as success and leave. >> + return; >> + } >> + >> + // Queue to per-cpu llist >> + head->func = func; >> + llist_add(&head->llist_node, &rlp->head); >> + >> + // Flush queue if too big >> + if (atomic_inc_return(&rlp->count) >= MAX_LAZY_BATCH) { >> + lazy_rcu_flush_cpu(rlp); >> > Can we just schedule the work instead of drawn from the caller context? > For example it can be a hard-irq context. You raise a good point. Ok I'll do that. Though if the CB list is small, I would prefer to do it inline. I will look more into it. > >> + } else { >> + if (!delayed_work_pending(&rlp->work)) { >> + schedule_delayed_work(&rlp->work, MAX_LAZY_JIFFIES); >> + } >> + } >> +} > EXPORT_SYMBOL_GPL()? to be able to use in kernel modules. > Sure, will fix. Thanks, - Joel