On 3/6/2024 5:31 PM, Joel Fernandes wrote: > > > On 3/5/2024 2:57 PM, Uladzislau Rezki (Sony) wrote: >> Fix a below race by not releasing a wait-head from the >> GP-kthread as it can lead for reusing it whereas a worker >> can still access it thus execute newly added callbacks too >> early. >> >> CPU 0 CPU 1 >> ----- ----- >> >> // wait_tail == HEAD1 >> rcu_sr_normal_gp_cleanup() { >> // has passed SR_MAX_USERS_WAKE_FROM_GP >> wait_tail->next = next; >> // done_tail = HEAD1 >> smp_store_release(&rcu_state.srs_done_tail, wait_tail); >> queue_work() { >> test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work) >> __queue_work() >> } >> } >> >> set_work_pool_and_clear_pending() >> rcu_sr_normal_gp_cleanup_work() { >> // new GP, wait_tail == HEAD2 >> rcu_sr_normal_gp_cleanup() { >> // executes all completion, but stop at HEAD1 >> wait_tail->next = HEAD1; >> // done_tail = HEAD2 >> smp_store_release(&rcu_state.srs_done_tail, wait_tail); >> queue_work() { >> test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work) >> __queue_work() >> } >> } >> // done = HEAD2 >> done = smp_load_acquire(&rcu_state.srs_done_tail); >> // head = HEAD1 >> head = done->next; >> done->next = NULL; >> llist_for_each_safe() { >> // completes all callbacks, release HEAD1 >> } >> } >> // Process second queue >> set_work_pool_and_clear_pending() >> rcu_sr_normal_gp_cleanup_work() { >> // done = HEAD2 >> done = smp_load_acquire(&rcu_state.srs_done_tail); >> >> // new GP, wait_tail == HEAD3 >> rcu_sr_normal_gp_cleanup() { >> // Finds HEAD2 with ->next == NULL at the end >> rcu_sr_put_wait_head(HEAD2) >> ... >> >> // A few more GPs later >> rcu_sr_normal_gp_init() { >> HEAD2 = rcu_sr_get_wait_head(); >> llist_add(HEAD2, &rcu_state.srs_next); >> // head == rcu_state.srs_next >> head = done->next; >> done->next = NULL; >> llist_for_each_safe() { >> // EXECUTE CALLBACKS TOO EARLY!!! >> } >> } >> >> Reported-by: Frederic Weisbecker <frederic@xxxxxxxxxx> >> Fixes: 05a10b921000 ("rcu: Support direct wake-up of synchronize_rcu() users") >> Signed-off-by: Uladzislau Rezki (Sony) <urezki@xxxxxxxxx> >> --- >> kernel/rcu/tree.c | 22 ++++++++-------------- >> 1 file changed, 8 insertions(+), 14 deletions(-) >> >> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c >> index 31f3a61f9c38..475647620b12 100644 >> --- a/kernel/rcu/tree.c >> +++ b/kernel/rcu/tree.c >> @@ -1656,21 +1656,11 @@ static void rcu_sr_normal_gp_cleanup(void) >> WARN_ON_ONCE(!rcu_sr_is_wait_head(wait_tail)); >> >> /* >> - * Process (a) and (d) cases. See an illustration. Apart of >> - * that it handles the scenario when all clients are done, >> - * wait-head is released if last. The worker is not kicked. >> + * Process (a) and (d) cases. See an illustration. >> */ >> llist_for_each_safe(rcu, next, wait_tail->next) { >> - if (rcu_sr_is_wait_head(rcu)) { >> - if (!rcu->next) { >> - rcu_sr_put_wait_head(rcu); >> - wait_tail->next = NULL; >> - } else { >> - wait_tail->next = rcu; >> - } >> - >> + if (rcu_sr_is_wait_head(rcu)) >> break; >> - } >> >> rcu_sr_normal_complete(rcu); >> // It can be last, update a next on this step. >> @@ -1684,8 +1674,12 @@ static void rcu_sr_normal_gp_cleanup(void) >> smp_store_release(&rcu_state.srs_done_tail, wait_tail); >> ASSERT_EXCLUSIVE_WRITER(rcu_state.srs_done_tail); >> >> - if (wait_tail->next) >> - queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work); >> + /* >> + * We schedule a work in order to perform a final processing >> + * of outstanding users(if still left) and releasing wait-heads >> + * added by rcu_sr_normal_gp_init() call. >> + */ >> + queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work); >> } One question, why do you need to queue_work() if wait_tail->next == NULL? AFAICS, at this stage if wait_tail->next == NULL, you are in CASE f. so the last remaining HEAD stays? (And llist_for_each_safe() in rcu_sr_normal_gp_cleanup_work becomes a NOOP). Could be something like this, but maybe I missed something: @@ -1672,7 +1674,7 @@ static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) */ static void rcu_sr_normal_gp_cleanup(void) { - struct llist_node *wait_tail, *next, *rcu; + struct llist_node *wait_tail, *next = NULL, *rcu = NULL; int done = 0; wait_tail = rcu_state.srs_wait_tail; @@ -1707,7 +1709,8 @@ static void rcu_sr_normal_gp_cleanup(void) * of outstanding users(if still left) and releasing wait-heads * added by rcu_sr_normal_gp_init() call. */ - queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work); + if (rcu) + queue_work(system_highpri_wq, &rcu_state.srs_cleanup_work); } /*