Third cut of "big hammer" expedited RCU grace periods, this time including rcu rather than just rcu_bh. This uses resched IPIs to force quiescent states on other CPUs. This particular variant uses smp_call_function() to invoke set_need_resched() on all CPUs in order to cause this to happen. Track the CPUs that have passed through a quiescent state (or gone offline) with a cpumask. Does nothing to expedite callbacks already registered with call_rcu() or call_rcu_bh(), but there is no need to. Just maps to synchronize_rcu() and a new synchronize_rcu_bh() on preemptable RCU, which has more complex grace-period detection -- this can be fixed later. Passes light rcutorture testing. Grace periods take many milliseconds on a variety of machines with a number of different config option combinations -- in other words, this implementation just does not cut it. Not even close. I am posting it on the off-chance that I made some stupid mistake that someone might spot. Absent that, I am taking a different approach, namely adapting the synchronize_sched() implementation from preemptable RCU. Evgeniy might have been suggesting something similar, and Mathieu seemed to be thinking along these lines as well. Shortcomings: o Waaaaay too slow!!! Again, thinking in terms of using preemptable RCU's synchronize_sched() implementation. o Does not address preemptable RCU. Changes since v2: o Use reschedule IPIs rather than a softirq. Changes since v1: o Added rcutorture support, and added exports required by rcutorture. o Added comment stating that smp_call_function() implies a memory barrier, suggested by Mathieu. o Added #include for delay.h. Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx> --- include/linux/rcupdate.h | 4 kernel/rcuclassic.c | 1 kernel/rcupdate.c | 179 +++++++++++++++++++++++++++++++++++++++++ kernel/rcutorture.c | 205 +++++++++++++++++++++++++---------------------- kernel/rcutree.c | 1 5 files changed, 298 insertions(+), 92 deletions(-) diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 15fbb3c..b630f14 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -264,10 +264,14 @@ extern void synchronize_rcu(void); extern void rcu_barrier(void); extern void rcu_barrier_bh(void); extern void rcu_barrier_sched(void); +extern void synchronize_rcu_expedited(void); +extern void synchronize_rcu_bh_expedited(void); +extern long rcu_batches_completed_bh_expedited(void); /* Internal to kernel */ extern void rcu_init(void); extern void rcu_scheduler_starting(void); extern int rcu_needs_cpu(int cpu); +extern void synchronize_rcu_expedited_qs(int cpu); #endif /* __LINUX_RCUPDATE_H */ diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c index 0f2b0b3..d15bd62 100644 --- a/kernel/rcuclassic.c +++ b/kernel/rcuclassic.c @@ -87,6 +87,7 @@ void rcu_qsctr_inc(int cpu) { struct rcu_data *rdp = &per_cpu(rcu_data, cpu); rdp->passed_quiesc = 1; + synchronize_rcu_expedited_qs(cpu); } void rcu_bh_qsctr_inc(int cpu) diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c index a967c9f..20fd5da 100644 --- a/kernel/rcupdate.c +++ b/kernel/rcupdate.c @@ -45,6 +45,7 @@ #include <linux/mutex.h> #include <linux/module.h> #include <linux/kernel_stat.h> +#include <linux/delay.h> enum rcu_barrier { RCU_BARRIER_STD, @@ -98,6 +99,30 @@ void synchronize_rcu(void) } EXPORT_SYMBOL_GPL(synchronize_rcu); +/** + * synchronize_rcu_bh - wait until an rcu_bh grace period has elapsed. + * + * Control will return to the caller some time after a full rcu_bh grace + * period has elapsed, in other words after all currently executing rcu_bh + * read-side critical sections have completed. RCU read-side critical + * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(), + * and may be nested. + */ +void synchronize_rcu_bh(void) +{ + struct rcu_synchronize rcu; + + if (rcu_blocking_is_gp()) + return; + + init_completion(&rcu.completion); + /* Will wake me after RCU finished. */ + call_rcu_bh(&rcu.head, wakeme_after_rcu); + /* Wait for it. */ + wait_for_completion(&rcu.completion); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_bh); + static void rcu_barrier_callback(struct rcu_head *notused) { if (atomic_dec_and_test(&rcu_barrier_cpu_count)) @@ -217,10 +242,164 @@ static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self, return NOTIFY_OK; } +static DEFINE_MUTEX(synchronize_rcu_bh_mutex); +static long synchronize_rcu_bh_completed; /* Expedited-grace-period count. */ + +long rcu_batches_completed_bh_expedited(void) +{ + return synchronize_rcu_bh_completed; +} +EXPORT_SYMBOL_GPL(rcu_batches_completed_bh_expedited); + +#if !defined(CONFIG_SMP) + +void synchronize_rcu_expedited_qs(int cpu) +{ +} + +static void __init synchronize_rcu_expedited_init(void) +{ +} + +void synchronize_rcu_expedited(void) +{ + mutex_lock(&synchronize_rcu_bh_mutex); + synchronize_rcu_bh_completed++; + mutex_unlock(&synchronize_rcu_bh_mutex); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); + +void synchronize_rcu_bh_expedited(void) +{ + synchronize_rcu_expedited(); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); + +#elif defined(CONFIG_PREEMPT_RCU) + +static void __init synchronize_rcu_expedited_init(void) +{ +} + +void synchronize_rcu_expedited(void) +{ + synchronize_rcu(); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); + +void synchronize_rcu_bh_expedited(void) +{ + synchronize_rcu_bh(); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); + +#else + +static DEFINE_PER_CPU(int, rcu_expedited_need_qs); +static cpumask_var_t rcu_bh_waiting_map; + +void synchronize_rcu_expedited_qs(int cpu) +{ + smp_mb(); + per_cpu(rcu_expedited_need_qs, cpu) = 0; + smp_mb(); +} + +static void __init synchronize_rcu_expedited_init(void) +{ + alloc_bootmem_cpumask_var(&rcu_bh_waiting_map); +} + +static void rcu_set_need_resched(void *unused) +{ + set_need_resched(); +} + +void synchronize_rcu_expedited(void) +{ + int cpu; + int done; + int times = 0; + + mutex_lock(&synchronize_rcu_bh_mutex); + + /* Take snapshot of online CPUs, blocking CPU hotplug. */ + preempt_disable(); + cpumask_copy(rcu_bh_waiting_map, &cpu_online_map); + cpumask_clear_cpu(smp_processor_id(), rcu_bh_waiting_map); + preempt_enable(); + + /* + * Mark each online CPU as needing a quiescent state and make + * each do a set_need_resched(). + */ + smp_mb(); /* Ensure prior changes seen before setting flag below. */ + for_each_cpu(cpu, rcu_bh_waiting_map) { + preempt_disable(); + per_cpu(rcu_expedited_need_qs, cpu) = 1; + preempt_enable(); + } + smp_call_function(rcu_set_need_resched, NULL, 1); + udelay(10); /* let IPIs actually get to their destinations. */ + + /* + * Loop waiting for each CPU to either pass through a quiescent + * state or to go offline. We don't care which. + */ + for (;;) { + + /* Ignore CPUs that are now offline, w/CPU hotplug blocked. */ + preempt_disable(); + cpumask_and(rcu_bh_waiting_map, rcu_bh_waiting_map, + &cpu_online_map); + cpumask_clear_cpu(smp_processor_id(), rcu_bh_waiting_map); + preempt_enable(); + + /* Check if any CPUs still need a quiescent state. */ + done = 1; + for_each_cpu(cpu, rcu_bh_waiting_map) { + preempt_disable(); + if (!cpumask_test_cpu(cpu, &cpu_online_map) || + !per_cpu(rcu_expedited_need_qs, cpu)) + cpumask_clear_cpu(cpu, rcu_bh_waiting_map); + else { + done = 0; + smp_send_reschedule(cpu); + } + preempt_enable(); + } + if (done) + break; + + /* + * Wait a bit. If we have already waited a fair + * amount of time, sleep. + */ + if (++times < 10) + udelay(10 * times); + else + schedule_timeout_uninterruptible(1); + /* FIXME: need to complain about holdout CPUs if too long. */ + } + + synchronize_rcu_bh_completed++; + mutex_unlock(&synchronize_rcu_bh_mutex); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); + +void synchronize_rcu_bh_expedited(void) +{ + synchronize_rcu_expedited(); +} +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); + +#endif /* #else #ifndef CONFIG_SMP */ + void __init rcu_init(void) { __rcu_init(); hotcpu_notifier(rcu_barrier_cpu_hotplug, 0); + synchronize_rcu_expedited_init(); } void rcu_scheduler_starting(void) diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index 9b4a975..8845936 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -257,14 +257,14 @@ struct rcu_torture_ops { void (*init)(void); void (*cleanup)(void); int (*readlock)(void); - void (*readdelay)(struct rcu_random_state *rrsp); + void (*read_delay)(struct rcu_random_state *rrsp); void (*readunlock)(int idx); int (*completed)(void); - void (*deferredfree)(struct rcu_torture *p); + void (*deferred_free)(struct rcu_torture *p); void (*sync)(void); void (*cb_barrier)(void); int (*stats)(char *page); - int irqcapable; + int irq_capable; char *name; }; static struct rcu_torture_ops *cur_ops = NULL; @@ -320,7 +320,7 @@ rcu_torture_cb(struct rcu_head *p) rp->rtort_mbtest = 0; rcu_torture_free(rp); } else - cur_ops->deferredfree(rp); + cur_ops->deferred_free(rp); } static void rcu_torture_deferred_free(struct rcu_torture *p) @@ -329,18 +329,18 @@ static void rcu_torture_deferred_free(struct rcu_torture *p) } static struct rcu_torture_ops rcu_ops = { - .init = NULL, - .cleanup = NULL, - .readlock = rcu_torture_read_lock, - .readdelay = rcu_read_delay, - .readunlock = rcu_torture_read_unlock, - .completed = rcu_torture_completed, - .deferredfree = rcu_torture_deferred_free, - .sync = synchronize_rcu, - .cb_barrier = rcu_barrier, - .stats = NULL, - .irqcapable = 1, - .name = "rcu" + .init = NULL, + .cleanup = NULL, + .readlock = rcu_torture_read_lock, + .read_delay = rcu_read_delay, + .readunlock = rcu_torture_read_unlock, + .completed = rcu_torture_completed, + .deferred_free = rcu_torture_deferred_free, + .sync = synchronize_rcu, + .cb_barrier = rcu_barrier, + .stats = NULL, + .irq_capable = 1, + .name = "rcu" }; static void rcu_sync_torture_deferred_free(struct rcu_torture *p) @@ -370,18 +370,18 @@ static void rcu_sync_torture_init(void) } static struct rcu_torture_ops rcu_sync_ops = { - .init = rcu_sync_torture_init, - .cleanup = NULL, - .readlock = rcu_torture_read_lock, - .readdelay = rcu_read_delay, - .readunlock = rcu_torture_read_unlock, - .completed = rcu_torture_completed, - .deferredfree = rcu_sync_torture_deferred_free, - .sync = synchronize_rcu, - .cb_barrier = NULL, - .stats = NULL, - .irqcapable = 1, - .name = "rcu_sync" + .init = rcu_sync_torture_init, + .cleanup = NULL, + .readlock = rcu_torture_read_lock, + .read_delay = rcu_read_delay, + .readunlock = rcu_torture_read_unlock, + .completed = rcu_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = synchronize_rcu, + .cb_barrier = NULL, + .stats = NULL, + .irq_capable = 1, + .name = "rcu_sync" }; /* @@ -432,33 +432,53 @@ static void rcu_bh_torture_synchronize(void) } static struct rcu_torture_ops rcu_bh_ops = { - .init = NULL, - .cleanup = NULL, - .readlock = rcu_bh_torture_read_lock, - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ - .readunlock = rcu_bh_torture_read_unlock, - .completed = rcu_bh_torture_completed, - .deferredfree = rcu_bh_torture_deferred_free, - .sync = rcu_bh_torture_synchronize, - .cb_barrier = rcu_barrier_bh, - .stats = NULL, - .irqcapable = 1, - .name = "rcu_bh" + .init = NULL, + .cleanup = NULL, + .readlock = rcu_bh_torture_read_lock, + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ + .readunlock = rcu_bh_torture_read_unlock, + .completed = rcu_bh_torture_completed, + .deferred_free = rcu_bh_torture_deferred_free, + .sync = rcu_bh_torture_synchronize, + .cb_barrier = rcu_barrier_bh, + .stats = NULL, + .irq_capable = 1, + .name = "rcu_bh" }; static struct rcu_torture_ops rcu_bh_sync_ops = { - .init = rcu_sync_torture_init, - .cleanup = NULL, - .readlock = rcu_bh_torture_read_lock, - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ - .readunlock = rcu_bh_torture_read_unlock, - .completed = rcu_bh_torture_completed, - .deferredfree = rcu_sync_torture_deferred_free, - .sync = rcu_bh_torture_synchronize, - .cb_barrier = NULL, - .stats = NULL, - .irqcapable = 1, - .name = "rcu_bh_sync" + .init = rcu_sync_torture_init, + .cleanup = NULL, + .readlock = rcu_bh_torture_read_lock, + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ + .readunlock = rcu_bh_torture_read_unlock, + .completed = rcu_bh_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = rcu_bh_torture_synchronize, + .cb_barrier = NULL, + .stats = NULL, + .irq_capable = 1, + .name = "rcu_bh_sync" +}; + +static int rcu_bh_expedited_torture_completed(void) +{ + return rcu_batches_completed_bh_expedited(); +} + +static struct rcu_torture_ops rcu_bh_expedited_ops = { + .init = rcu_sync_torture_init, + .cleanup = NULL, + .readlock = rcu_bh_torture_read_lock, + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ + .readunlock = rcu_bh_torture_read_unlock, + .completed = rcu_bh_expedited_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = synchronize_rcu_bh_expedited, + .cb_barrier = NULL, + .stats = NULL, + .irq_capable = 1, + .name = "rcu_bh_expedited" }; /* @@ -530,17 +550,17 @@ static int srcu_torture_stats(char *page) } static struct rcu_torture_ops srcu_ops = { - .init = srcu_torture_init, - .cleanup = srcu_torture_cleanup, - .readlock = srcu_torture_read_lock, - .readdelay = srcu_read_delay, - .readunlock = srcu_torture_read_unlock, - .completed = srcu_torture_completed, - .deferredfree = rcu_sync_torture_deferred_free, - .sync = srcu_torture_synchronize, - .cb_barrier = NULL, - .stats = srcu_torture_stats, - .name = "srcu" + .init = srcu_torture_init, + .cleanup = srcu_torture_cleanup, + .readlock = srcu_torture_read_lock, + .read_delay = srcu_read_delay, + .readunlock = srcu_torture_read_unlock, + .completed = srcu_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = srcu_torture_synchronize, + .cb_barrier = NULL, + .stats = srcu_torture_stats, + .name = "srcu" }; /* @@ -574,32 +594,32 @@ static void sched_torture_synchronize(void) } static struct rcu_torture_ops sched_ops = { - .init = rcu_sync_torture_init, - .cleanup = NULL, - .readlock = sched_torture_read_lock, - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ - .readunlock = sched_torture_read_unlock, - .completed = sched_torture_completed, - .deferredfree = rcu_sched_torture_deferred_free, - .sync = sched_torture_synchronize, - .cb_barrier = rcu_barrier_sched, - .stats = NULL, - .irqcapable = 1, - .name = "sched" + .init = rcu_sync_torture_init, + .cleanup = NULL, + .readlock = sched_torture_read_lock, + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ + .readunlock = sched_torture_read_unlock, + .completed = sched_torture_completed, + .deferred_free = rcu_sched_torture_deferred_free, + .sync = sched_torture_synchronize, + .cb_barrier = rcu_barrier_sched, + .stats = NULL, + .irq_capable = 1, + .name = "sched" }; static struct rcu_torture_ops sched_ops_sync = { - .init = rcu_sync_torture_init, - .cleanup = NULL, - .readlock = sched_torture_read_lock, - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ - .readunlock = sched_torture_read_unlock, - .completed = sched_torture_completed, - .deferredfree = rcu_sync_torture_deferred_free, - .sync = sched_torture_synchronize, - .cb_barrier = NULL, - .stats = NULL, - .name = "sched_sync" + .init = rcu_sync_torture_init, + .cleanup = NULL, + .readlock = sched_torture_read_lock, + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ + .readunlock = sched_torture_read_unlock, + .completed = sched_torture_completed, + .deferred_free = rcu_sync_torture_deferred_free, + .sync = sched_torture_synchronize, + .cb_barrier = NULL, + .stats = NULL, + .name = "sched_sync" }; /* @@ -635,7 +655,7 @@ rcu_torture_writer(void *arg) i = RCU_TORTURE_PIPE_LEN; atomic_inc(&rcu_torture_wcount[i]); old_rp->rtort_pipe_count++; - cur_ops->deferredfree(old_rp); + cur_ops->deferred_free(old_rp); } rcu_torture_current_version++; oldbatch = cur_ops->completed(); @@ -700,7 +720,7 @@ static void rcu_torture_timer(unsigned long unused) if (p->rtort_mbtest == 0) atomic_inc(&n_rcu_torture_mberror); spin_lock(&rand_lock); - cur_ops->readdelay(&rand); + cur_ops->read_delay(&rand); n_rcu_torture_timers++; spin_unlock(&rand_lock); preempt_disable(); @@ -738,11 +758,11 @@ rcu_torture_reader(void *arg) VERBOSE_PRINTK_STRING("rcu_torture_reader task started"); set_user_nice(current, 19); - if (irqreader && cur_ops->irqcapable) + if (irqreader && cur_ops->irq_capable) setup_timer_on_stack(&t, rcu_torture_timer, 0); do { - if (irqreader && cur_ops->irqcapable) { + if (irqreader && cur_ops->irq_capable) { if (!timer_pending(&t)) mod_timer(&t, 1); } @@ -757,7 +777,7 @@ rcu_torture_reader(void *arg) } if (p->rtort_mbtest == 0) atomic_inc(&n_rcu_torture_mberror); - cur_ops->readdelay(&rand); + cur_ops->read_delay(&rand); preempt_disable(); pipe_count = p->rtort_pipe_count; if (pipe_count > RCU_TORTURE_PIPE_LEN) { @@ -778,7 +798,7 @@ rcu_torture_reader(void *arg) } while (!kthread_should_stop() && fullstop == FULLSTOP_DONTSTOP); VERBOSE_PRINTK_STRING("rcu_torture_reader task stopping"); rcutorture_shutdown_absorb("rcu_torture_reader"); - if (irqreader && cur_ops->irqcapable) + if (irqreader && cur_ops->irq_capable) del_timer_sync(&t); while (!kthread_should_stop()) schedule_timeout_uninterruptible(1); @@ -1078,6 +1098,7 @@ rcu_torture_init(void) int firsterr = 0; static struct rcu_torture_ops *torture_ops[] = { &rcu_ops, &rcu_sync_ops, &rcu_bh_ops, &rcu_bh_sync_ops, + &rcu_bh_expedited_ops, &srcu_ops, &sched_ops, &sched_ops_sync, }; mutex_lock(&fullstop_mutex); diff --git a/kernel/rcutree.c b/kernel/rcutree.c index d2a372f..bf2c21d 100644 --- a/kernel/rcutree.c +++ b/kernel/rcutree.c @@ -89,6 +89,7 @@ void rcu_qsctr_inc(int cpu) struct rcu_data *rdp = &per_cpu(rcu_data, cpu); rdp->passed_quiesc = 1; rdp->passed_quiesc_completed = rdp->completed; + synchronize_rcu_expedited_qs(cpu); } void rcu_bh_qsctr_inc(int cpu) -- To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html