This patch splits ready to free pointers from the ones which require an extra grace period. This is done before submitting a new batch. Once a split is done, ready to free are released right away and after that a new batch is queued from the monitor work over a queue_rcu_work(). Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx> --- kernel/rcu/tree.c | 87 +++++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 312cb0dee117..50a585caf698 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -2764,15 +2764,13 @@ struct kvfree_rcu_bulk_data { * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period * @head_free: List of kfree_rcu() objects waiting for a grace period - * @head_free_gp_snap: Snapshot of RCU state for objects placed to "@head_free" * @bulk_head_free: Bulk-List of kvfree_rcu() objects waiting for a grace period * @krcp: Pointer to @kfree_rcu_cpu structure */ struct kfree_rcu_cpu_work { - struct work_struct rcu_work; + struct rcu_work rcu_work; struct rcu_head *head_free; - unsigned long head_free_gp_snap; struct list_head bulk_head_free[FREE_N_CHANNELS]; struct kfree_rcu_cpu *krcp; }; @@ -2780,6 +2778,7 @@ struct kfree_rcu_cpu_work { /** * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period * @head: List of kfree_rcu() objects not yet waiting for a grace period + * @head_gp_snap: Snapshot of RCU state for objects placed to "@head" * @bulk_head: Bulk-List of kvfree_rcu() objects not yet waiting for a grace period * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period * @lock: Synchronize access to this structure @@ -2807,6 +2806,7 @@ struct kfree_rcu_cpu { // Objects queued on a linked list // through their rcu_head structures. struct rcu_head *head; + unsigned long head_gp_snap; atomic_t head_count; // Objects queued on a bulk-list. @@ -2975,10 +2975,9 @@ static void kfree_rcu_work(struct work_struct *work) struct rcu_head *head; struct kfree_rcu_cpu *krcp; struct kfree_rcu_cpu_work *krwp; - unsigned long head_free_gp_snap; int i; - krwp = container_of(work, + krwp = container_of(to_rcu_work(work), struct kfree_rcu_cpu_work, rcu_work); krcp = krwp->krcp; @@ -2990,26 +2989,11 @@ static void kfree_rcu_work(struct work_struct *work) // Channel 3. head = krwp->head_free; krwp->head_free = NULL; - head_free_gp_snap = krwp->head_free_gp_snap; raw_spin_unlock_irqrestore(&krcp->lock, flags); // Handle the first two channels. for (i = 0; i < FREE_N_CHANNELS; i++) { // Start from the tail page, so a GP is likely passed for it. - list_for_each_entry_safe_reverse(bnode, n, &bulk_head[i], list) { - // Not yet ready? Bail out since we need one more GP. - if (!poll_state_synchronize_rcu(bnode->gp_snap)) - break; - - list_del_init(&bnode->list); - kvfree_rcu_bulk(krcp, bnode, i); - } - - // Please note a request for one more extra GP can - // occur only once for all objects in this batch. - if (!list_empty(&bulk_head[i])) - synchronize_rcu(); - list_for_each_entry_safe(bnode, n, &bulk_head[i], list) kvfree_rcu_bulk(krcp, bnode, i); } @@ -3021,10 +3005,7 @@ static void kfree_rcu_work(struct work_struct *work) * queued on a linked list through their rcu_head structures. * This list is named "Channel 3". */ - if (head) { - cond_synchronize_rcu(head_free_gp_snap); - kvfree_rcu_list(head); - } + kvfree_rcu_list(head); } static bool @@ -3065,6 +3046,44 @@ schedule_delayed_monitor_work(struct kfree_rcu_cpu *krcp) queue_delayed_work(system_wq, &krcp->monitor_work, delay); } +static void +kvfree_rcu_drain_ready(struct kfree_rcu_cpu *krcp) +{ + struct list_head bulk_ready[FREE_N_CHANNELS]; + struct kvfree_rcu_bulk_data *bnode, *n; + struct rcu_head *head_ready = NULL; + unsigned long flags; + int i; + + raw_spin_lock_irqsave(&krcp->lock, flags); + for (i = 0; i < FREE_N_CHANNELS; i++) { + INIT_LIST_HEAD(&bulk_ready[i]); + + list_for_each_entry_safe_reverse(bnode, n, &krcp->bulk_head[i], list) { + if (!poll_state_synchronize_rcu(bnode->gp_snap)) + break; + + atomic_sub(bnode->nr_records, &krcp->bulk_count[i]); + list_move(&bnode->list, &bulk_ready[i]); + } + } + + if (krcp->head && poll_state_synchronize_rcu(krcp->head_gp_snap)) { + head_ready = krcp->head; + atomic_set(&krcp->head_count, 0); + WRITE_ONCE(krcp->head, NULL); + } + raw_spin_unlock_irqrestore(&krcp->lock, flags); + + for (i = 0; i < FREE_N_CHANNELS; i++) { + list_for_each_entry_safe(bnode, n, &bulk_ready[i], list) + kvfree_rcu_bulk(krcp, bnode, i); + } + + if (head_ready) + kvfree_rcu_list(head_ready); +} + /* * This function is invoked after the KFREE_DRAIN_JIFFIES timeout. */ @@ -3075,6 +3094,9 @@ static void kfree_rcu_monitor(struct work_struct *work) unsigned long flags; int i, j; + // Drain ready for reclaim. + kvfree_rcu_drain_ready(krcp); + raw_spin_lock_irqsave(&krcp->lock, flags); // Attempt to start a new batch. @@ -3094,8 +3116,9 @@ static void kfree_rcu_monitor(struct work_struct *work) // Channel 2 corresponds to vmalloc-pointer bulk path. for (j = 0; j < FREE_N_CHANNELS; j++) { if (list_empty(&krwp->bulk_head_free[j])) { - list_replace_init(&krcp->bulk_head[j], &krwp->bulk_head_free[j]); atomic_set(&krcp->bulk_count[j], 0); + list_replace_init(&krcp->bulk_head[j], + &krwp->bulk_head_free[j]); } } @@ -3103,13 +3126,8 @@ static void kfree_rcu_monitor(struct work_struct *work) // objects queued on the linked list. if (!krwp->head_free) { krwp->head_free = krcp->head; - WRITE_ONCE(krcp->head, NULL); atomic_set(&krcp->head_count, 0); - - // Take a snapshot for this krwp. Please note no more - // any objects can be added to attached head_free channel - // therefore fixate a GP for it here. - krwp->head_free_gp_snap = get_state_synchronize_rcu(); + WRITE_ONCE(krcp->head, NULL); } // One work is per one batch, so there are three @@ -3117,7 +3135,7 @@ static void kfree_rcu_monitor(struct work_struct *work) // be that the work is in the pending state when // channels have been detached following by each // other. - queue_work(system_wq, &krwp->rcu_work); + queue_rcu_work(system_wq, &krwp->rcu_work); } } @@ -3304,6 +3322,9 @@ void kvfree_call_rcu(struct rcu_head *head, void *ptr) head->next = krcp->head; WRITE_ONCE(krcp->head, head); atomic_inc(&krcp->head_count); + + // Take a snapshot for this krcp. + krcp->head_gp_snap = get_state_synchronize_rcu(); success = true; } @@ -4860,7 +4881,7 @@ static void __init kfree_rcu_batch_init(void) struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu); for (i = 0; i < KFREE_N_BATCHES; i++) { - INIT_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work); + INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work); krcp->krw_arr[i].krcp = krcp; for (j = 0; j < FREE_N_CHANNELS; j++) -- 2.30.2