During testing, it was observed that amount of memory consumed due kfree_rcu() batching is 300-400MB. Previously we had only a single head_free pointer pointing to the list of rcu_head(s) that are to be freed after a grace period. Until this list is drained, we cannot queue any more objects on it since such objects may not be ready to be reclaimed when the worker thread eventually gets to drainin g the head_free list. We can do better by maintaining multiple lists as done by this patch. Testing shows that memory consumption came down by around 100-150MB with just adding another list. Adding more than 1 additional list did not show any improvement. Suggested-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxx> Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> --- kernel/rcu/tree.c | 64 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 4f7c3096d786..9b9ae4db1c2d 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -2688,28 +2688,38 @@ EXPORT_SYMBOL_GPL(call_rcu); /* Maximum number of jiffies to wait before draining a batch. */ #define KFREE_DRAIN_JIFFIES (HZ / 50) +#define KFREE_N_BATCHES 2 + +struct kfree_rcu_work { + /* The rcu_work node for queuing work with queue_rcu_work(). The work + * is done after a grace period. + */ + struct rcu_work rcu_work; + + /* The list of objects that have now left ->head and are queued for + * freeing after a grace period. + */ + struct rcu_head *head_free; + + struct kfree_rcu_cpu *krcp; +}; +static DEFINE_PER_CPU(__typeof__(struct kfree_rcu_work)[KFREE_N_BATCHES], krw); /* * Maximum number of kfree(s) to batch, if this limit is hit then the batch of * kfree(s) is queued for freeing after a grace period, right away. */ struct kfree_rcu_cpu { - /* The rcu_work node for queuing work with queue_rcu_work(). The work - * is done after a grace period. - */ - struct rcu_work rcu_work; /* The list of objects being queued in a batch but are not yet * scheduled to be freed. */ struct rcu_head *head; - /* The list of objects that have now left ->head and are queued for - * freeing after a grace period. - */ - struct rcu_head *head_free; + /* Pointer to the per-cpu array of kfree_rcu_work structures */ + struct kfree_rcu_work *krwp; - /* Protect concurrent access to this structure. */ + /* Protect concurrent access to this structure and kfree_rcu_work. */ spinlock_t lock; /* The delayed work that flushes ->head to ->head_free incase ->head @@ -2730,12 +2740,14 @@ static void kfree_rcu_work(struct work_struct *work) { unsigned long flags; struct rcu_head *head, *next; - struct kfree_rcu_cpu *krcp = container_of(to_rcu_work(work), - struct kfree_rcu_cpu, rcu_work); + struct kfree_rcu_work *krwp = container_of(to_rcu_work(work), + struct kfree_rcu_work, rcu_work); + struct kfree_rcu_cpu *krcp; + + krcp = krwp->krcp; spin_lock_irqsave(&krcp->lock, flags); - head = krcp->head_free; - krcp->head_free = NULL; + head = xchg(&krwp->head_free, NULL); spin_unlock_irqrestore(&krcp->lock, flags); /* @@ -2758,19 +2770,28 @@ static void kfree_rcu_work(struct work_struct *work) */ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp) { + int i = 0; + struct kfree_rcu_work *krwp = NULL; + lockdep_assert_held(&krcp->lock); + while (i < KFREE_N_BATCHES) { + if (!krcp->krwp[i].head_free) { + krwp = &(krcp->krwp[i]); + break; + } + i++; + } - /* If a previous RCU batch work is already in progress, we cannot queue + /* If both RCU batches are already in progress, we cannot queue * another one, just refuse the optimization and it will be retried * again in KFREE_DRAIN_JIFFIES time. */ - if (krcp->head_free) + if (!krwp) return false; - krcp->head_free = krcp->head; - krcp->head = NULL; - INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work); - queue_rcu_work(system_wq, &krcp->rcu_work); + krwp->head_free = xchg(&krcp->head, NULL); + INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work); + queue_rcu_work(system_wq, &krwp->rcu_work); return true; } @@ -3736,8 +3757,13 @@ static void __init kfree_rcu_batch_init(void) for_each_possible_cpu(cpu) { struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); + struct kfree_rcu_work *krwp = &(per_cpu(krw, cpu)[0]); + int i = KFREE_N_BATCHES; spin_lock_init(&krcp->lock); + krcp->krwp = krwp; + while (i--) + krwp[i].krcp = krcp; INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor); } } -- 2.23.0.187.g17f5b7556c-goog