Recently a discussion about performance of system involving a high rate of kfree_rcu() calls surfaced on the list [1] which led to another discussion how to prepare for this situation. This patch adds basic batching support for kfree_rcu. It is "basic" because we do none of the slab management, dynamic allocation, code moving or any of the other things, some of which previous attempts did [2]. These fancier improvements can be follow-up patches and there are several ideas being experimented in those regards. Torture tests follow in the next patch and show improvements of around ~13% with continuous flooding of kfree_rcu() calls on a 16 CPU system. [1] http://lore.kernel.org/lkml/20190723035725-mutt-send-email-mst@xxxxxxxxxx [2] https://lkml.org/lkml/2017/12/19/824 This is an effort just to start simple, and build up from there. Cc: Rao Shoaib <rao.shoaib@xxxxxxxxxx> Cc: max.byungchul.park@xxxxxxxxx Cc: byungchul.park@xxxxxxx Cc: kernel-team@xxxxxxxxxxx Cc: kernel-team@xxxxxxx Co-developed-by: Byungchul Park <byungchul.park@xxxxxxx> Signed-off-by: Byungchul Park <byungchul.park@xxxxxxx> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> --- kernel/rcu/tree.c | 198 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 193 insertions(+), 5 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index a14e5fbbea46..bdbd483606ce 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -2593,19 +2593,194 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func) } EXPORT_SYMBOL_GPL(call_rcu); + +/* Maximum number of jiffies to wait before draining batch */ +#define KFREE_DRAIN_JIFFIES 50 + +/* + * Maximum number of kfree(s) to batch, if limit is hit + * then RCU work is queued right away + */ +#define KFREE_MAX_BATCH 200000ULL + +struct kfree_rcu_cpu { + /* The work done to free objects after GP */ + struct rcu_work rcu_work; + + /* The list of objects being queued */ + struct rcu_head *head; + int kfree_batch_len; + + /* The list of objects pending a free */ + struct rcu_head *head_free; + + /* Protect concurrent access to this structure */ + spinlock_t lock; + + /* The work done to monitor whether objects need free */ + struct delayed_work monitor_work; + bool monitor_todo; +}; + +static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc); + +/* Free all heads after a grace period (worker function) */ +static void kfree_rcu_work(struct work_struct *work) +{ + unsigned long flags; + struct rcu_head *head, *next; + struct kfree_rcu_cpu *krc = container_of(to_rcu_work(work), + struct kfree_rcu_cpu, rcu_work); + + spin_lock_irqsave(&krc->lock, flags); + head = krc->head_free; + krc->head_free = NULL; + spin_unlock_irqrestore(&krc->lock, flags); + + /* The head must be detached and not referenced from anywhere */ + for (; head; head = next) { + next = head->next; + head->next = NULL; + /* Could be possible to optimize with kfree_bulk in future */ + __rcu_reclaim(rcu_state.name, head); + } +} + +/* + * Schedule the kfree batch RCU work to run after GP. + * + * Either the batch reached its maximum size, or the monitor's + * time reached, either way schedule the batch work. + */ +static bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krc) +{ + lockdep_assert_held(&krc->lock); + + /* + * Someone already drained, probably before the monitor's worker + * thread ran. Just return to avoid useless work. + */ + if (!krc->head) + return true; + + /* + * If RCU batch work already in progress, we cannot + * queue another one, just refuse the optimization. + */ + if (krc->head_free) + return false; + + krc->head_free = krc->head; + krc->head = NULL; + krc->kfree_batch_len = 0; + INIT_RCU_WORK(&krc->rcu_work, kfree_rcu_work); + queue_rcu_work(system_wq, &krc->rcu_work); + + return true; +} + +static void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krc, + unsigned long flags) +{ + struct rcu_head *head, *next; + + /* It is time to do bulk reclaim after grace period */ + krc->monitor_todo = false; + if (queue_kfree_rcu_work(krc)) { + spin_unlock_irqrestore(&krc->lock, flags); + return; + } + + /* + * Use non-batch regular call_rcu for kfree_rcu in case things are too + * busy and batching of kfree_rcu could not be used. + */ + head = krc->head; + krc->head = NULL; + krc->kfree_batch_len = 0; + spin_unlock_irqrestore(&krc->lock, flags); + + for (; head; head = next) { + next = head->next; + head->next = NULL; + __call_rcu(head, head->func, -1, 1); + } +} + +/* + * If enough time has passed, the kfree batch has to be drained + * and the monitor takes care of that. + */ +static void kfree_rcu_monitor(struct work_struct *work) +{ + bool todo; + unsigned long flags; + struct kfree_rcu_cpu *krc = container_of(work, struct kfree_rcu_cpu, + monitor_work.work); + + /* It is time to do bulk reclaim after grace period */ + spin_lock_irqsave(&krc->lock, flags); + todo = krc->monitor_todo; + krc->monitor_todo = false; + if (todo) + kfree_rcu_drain_unlock(krc, flags); + else + spin_unlock_irqrestore(&krc->lock, flags); +} + +static void kfree_rcu_batch(struct rcu_head *head, rcu_callback_t func) +{ + unsigned long flags; + struct kfree_rcu_cpu *this_krc; + bool monitor_todo; + + local_irq_save(flags); + this_krc = this_cpu_ptr(&krc); + + spin_lock(&this_krc->lock); + + /* Queue the kfree but don't yet schedule the batch */ + head->func = func; + head->next = this_krc->head; + this_krc->head = head; + this_krc->kfree_batch_len++; + + if (this_krc->kfree_batch_len == KFREE_MAX_BATCH) { + kfree_rcu_drain_unlock(this_krc, flags); + return; + } + + /* Maximum has not been reached, schedule monitor for timely drain */ + monitor_todo = this_krc->monitor_todo; + this_krc->monitor_todo = true; + spin_unlock(&this_krc->lock); + + if (!monitor_todo) { + schedule_delayed_work_on(smp_processor_id(), + &this_krc->monitor_work, KFREE_DRAIN_JIFFIES); + } + local_irq_restore(flags); +} + /* * Queue an RCU callback for lazy invocation after a grace period. - * This will likely be later named something like "call_rcu_lazy()", - * but this change will require some way of tagging the lazy RCU - * callbacks in the list of pending callbacks. Until then, this - * function may only be called from __kfree_rcu(). */ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) { - __call_rcu(head, func, -1, 1); + kfree_rcu_batch(head, func); } EXPORT_SYMBOL_GPL(kfree_call_rcu); +/* + * The version of kfree_call_rcu that does not do batching of kfree_rcu() + * requests. To be used only for performance testing comparisons with + * kfree_rcu_batch(). + */ +void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func) +{ + __call_rcu(head, func, -1, 1); +} + /* * During early boot, any blocking grace-period wait automatically * implies a grace period. Later on, this is never the case for PREEMPT. @@ -3452,6 +3627,17 @@ static void __init rcu_dump_rcu_node_tree(void) pr_cont("\n"); } +void kfree_rcu_batch_init(void) +{ + int cpu; + + for_each_possible_cpu(cpu) { + struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); + + INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor); + } +} + struct workqueue_struct *rcu_gp_wq; struct workqueue_struct *rcu_par_gp_wq; @@ -3459,6 +3645,8 @@ void __init rcu_init(void) { int cpu; + kfree_rcu_batch_init(); + rcu_early_boot_tests(); rcu_bootup_announce(); -- 2.22.0.770.g0f2c4a37fd-goog