* Paul E. McKenney (paulmck@xxxxxxxxxxxxxxxxxx) wrote: > Third cut of "big hammer" expedited RCU grace periods, this time including > rcu rather than just rcu_bh. This uses resched IPIs to force quiescent > states on other CPUs. This particular variant uses smp_call_function() to > invoke set_need_resched() on all CPUs in order to cause this to happen. > Track the CPUs that have passed through a quiescent state (or gone > offline) with a cpumask. > > Does nothing to expedite callbacks already registered with call_rcu() or > call_rcu_bh(), but there is no need to. Just maps to synchronize_rcu() > and a new synchronize_rcu_bh() on preemptable RCU, which has more complex > grace-period detection -- this can be fixed later. > > Passes light rcutorture testing. Grace periods take many milliseconds > on a variety of machines with a number of different config option > combinations -- in other words, this implementation just does not cut it. > Not even close. > > I am posting it on the off-chance that I made some stupid mistake that > someone might spot. Absent that, I am taking a different approach, namely > adapting the synchronize_sched() implementation from preemptable RCU. > Evgeniy might have been suggesting something similar, and Mathieu seemed > to be thinking along these lines as well. > > Shortcomings: > > o Waaaaay too slow!!! Again, thinking in terms of using > preemptable RCU's synchronize_sched() implementation. > > o Does not address preemptable RCU. > > Changes since v2: > > o Use reschedule IPIs rather than a softirq. > > Changes since v1: > > o Added rcutorture support, and added exports required by > rcutorture. > > o Added comment stating that smp_call_function() implies a > memory barrier, suggested by Mathieu. > > o Added #include for delay.h. > > Signed-off-by: Paul E. McKenney <paulmck@xxxxxxxxxxxxxxxxxx> > --- > > include/linux/rcupdate.h | 4 > kernel/rcuclassic.c | 1 > kernel/rcupdate.c | 179 +++++++++++++++++++++++++++++++++++++++++ > kernel/rcutorture.c | 205 +++++++++++++++++++++++++---------------------- > kernel/rcutree.c | 1 > 5 files changed, 298 insertions(+), 92 deletions(-) > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > index 15fbb3c..b630f14 100644 > --- a/include/linux/rcupdate.h > +++ b/include/linux/rcupdate.h > @@ -264,10 +264,14 @@ extern void synchronize_rcu(void); > extern void rcu_barrier(void); > extern void rcu_barrier_bh(void); > extern void rcu_barrier_sched(void); > +extern void synchronize_rcu_expedited(void); > +extern void synchronize_rcu_bh_expedited(void); > +extern long rcu_batches_completed_bh_expedited(void); > > /* Internal to kernel */ > extern void rcu_init(void); > extern void rcu_scheduler_starting(void); > extern int rcu_needs_cpu(int cpu); > +extern void synchronize_rcu_expedited_qs(int cpu); > > #endif /* __LINUX_RCUPDATE_H */ > diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c > index 0f2b0b3..d15bd62 100644 > --- a/kernel/rcuclassic.c > +++ b/kernel/rcuclassic.c > @@ -87,6 +87,7 @@ void rcu_qsctr_inc(int cpu) > { > struct rcu_data *rdp = &per_cpu(rcu_data, cpu); > rdp->passed_quiesc = 1; > + synchronize_rcu_expedited_qs(cpu); > } > > void rcu_bh_qsctr_inc(int cpu) > diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c > index a967c9f..20fd5da 100644 > --- a/kernel/rcupdate.c > +++ b/kernel/rcupdate.c > @@ -45,6 +45,7 @@ > #include <linux/mutex.h> > #include <linux/module.h> > #include <linux/kernel_stat.h> > +#include <linux/delay.h> > > enum rcu_barrier { > RCU_BARRIER_STD, > @@ -98,6 +99,30 @@ void synchronize_rcu(void) > } > EXPORT_SYMBOL_GPL(synchronize_rcu); > > +/** > + * synchronize_rcu_bh - wait until an rcu_bh grace period has elapsed. > + * > + * Control will return to the caller some time after a full rcu_bh grace > + * period has elapsed, in other words after all currently executing rcu_bh > + * read-side critical sections have completed. RCU read-side critical > + * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(), > + * and may be nested. > + */ > +void synchronize_rcu_bh(void) > +{ > + struct rcu_synchronize rcu; > + > + if (rcu_blocking_is_gp()) > + return; > + > + init_completion(&rcu.completion); > + /* Will wake me after RCU finished. */ > + call_rcu_bh(&rcu.head, wakeme_after_rcu); > + /* Wait for it. */ > + wait_for_completion(&rcu.completion); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_bh); > + > static void rcu_barrier_callback(struct rcu_head *notused) > { > if (atomic_dec_and_test(&rcu_barrier_cpu_count)) > @@ -217,10 +242,164 @@ static int __cpuinit rcu_barrier_cpu_hotplug(struct notifier_block *self, > return NOTIFY_OK; > } > > +static DEFINE_MUTEX(synchronize_rcu_bh_mutex); > +static long synchronize_rcu_bh_completed; /* Expedited-grace-period count. */ > + > +long rcu_batches_completed_bh_expedited(void) > +{ > + return synchronize_rcu_bh_completed; > +} > +EXPORT_SYMBOL_GPL(rcu_batches_completed_bh_expedited); > + > +#if !defined(CONFIG_SMP) > + > +void synchronize_rcu_expedited_qs(int cpu) > +{ > +} > + > +static void __init synchronize_rcu_expedited_init(void) > +{ > +} > + > +void synchronize_rcu_expedited(void) > +{ > + mutex_lock(&synchronize_rcu_bh_mutex); > + synchronize_rcu_bh_completed++; > + mutex_unlock(&synchronize_rcu_bh_mutex); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); > + > +void synchronize_rcu_bh_expedited(void) > +{ > + synchronize_rcu_expedited(); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); > + > +#elif defined(CONFIG_PREEMPT_RCU) > + > +static void __init synchronize_rcu_expedited_init(void) > +{ > +} > + > +void synchronize_rcu_expedited(void) > +{ > + synchronize_rcu(); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); > + > +void synchronize_rcu_bh_expedited(void) > +{ > + synchronize_rcu_bh(); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); > + > +#else > + > +static DEFINE_PER_CPU(int, rcu_expedited_need_qs); > +static cpumask_var_t rcu_bh_waiting_map; > + > +void synchronize_rcu_expedited_qs(int cpu) > +{ > + smp_mb(); > + per_cpu(rcu_expedited_need_qs, cpu) = 0; > + smp_mb(); > +} > + > +static void __init synchronize_rcu_expedited_init(void) > +{ > + alloc_bootmem_cpumask_var(&rcu_bh_waiting_map); > +} > + > +static void rcu_set_need_resched(void *unused) > +{ > + set_need_resched(); > +} > + > +void synchronize_rcu_expedited(void) > +{ > + int cpu; > + int done; > + int times = 0; > + > + mutex_lock(&synchronize_rcu_bh_mutex); > + > + /* Take snapshot of online CPUs, blocking CPU hotplug. */ > + preempt_disable(); > + cpumask_copy(rcu_bh_waiting_map, &cpu_online_map); > + cpumask_clear_cpu(smp_processor_id(), rcu_bh_waiting_map); > + preempt_enable(); > + > + /* > + * Mark each online CPU as needing a quiescent state and make > + * each do a set_need_resched(). > + */ > + smp_mb(); /* Ensure prior changes seen before setting flag below. */ > + for_each_cpu(cpu, rcu_bh_waiting_map) { > + preempt_disable(); > + per_cpu(rcu_expedited_need_qs, cpu) = 1; > + preempt_enable(); > + } > + smp_call_function(rcu_set_need_resched, NULL, 1); > + udelay(10); /* let IPIs actually get to their destinations. */ > + > + /* > + * Loop waiting for each CPU to either pass through a quiescent > + * state or to go offline. We don't care which. > + */ > + for (;;) { > + > + /* Ignore CPUs that are now offline, w/CPU hotplug blocked. */ > + preempt_disable(); > + cpumask_and(rcu_bh_waiting_map, rcu_bh_waiting_map, > + &cpu_online_map); > + cpumask_clear_cpu(smp_processor_id(), rcu_bh_waiting_map); > + preempt_enable(); > + > + /* Check if any CPUs still need a quiescent state. */ > + done = 1; > + for_each_cpu(cpu, rcu_bh_waiting_map) { > + preempt_disable(); > + if (!cpumask_test_cpu(cpu, &cpu_online_map) || > + !per_cpu(rcu_expedited_need_qs, cpu)) > + cpumask_clear_cpu(cpu, rcu_bh_waiting_map); > + else { > + done = 0; > + smp_send_reschedule(cpu); > + } > + preempt_enable(); > + } > + if (done) > + break; > + > + /* > + * Wait a bit. If we have already waited a fair > + * amount of time, sleep. > + */ > + if (++times < 10) > + udelay(10 * times); > + else > + schedule_timeout_uninterruptible(1); Waiting a whole jiffy (e.g. 1ms, 4ms, 10ms) here seems like a big hammer to nail a delicate pin. I would not be surprised if your long delay would come from here. Is it possible that your ipi+scheduling delay is actually longer than the 11 udelays you are doing and that you end up calling schedule_timeout_uninterruptible(1) each time ? Mathieu > + /* FIXME: need to complain about holdout CPUs if too long. */ > + } > + > + synchronize_rcu_bh_completed++; > + mutex_unlock(&synchronize_rcu_bh_mutex); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_expedited); > + > +void synchronize_rcu_bh_expedited(void) > +{ > + synchronize_rcu_expedited(); > +} > +EXPORT_SYMBOL_GPL(synchronize_rcu_bh_expedited); > + > +#endif /* #else #ifndef CONFIG_SMP */ > + > void __init rcu_init(void) > { > __rcu_init(); > hotcpu_notifier(rcu_barrier_cpu_hotplug, 0); > + synchronize_rcu_expedited_init(); > } > > void rcu_scheduler_starting(void) > diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c > index 9b4a975..8845936 100644 > --- a/kernel/rcutorture.c > +++ b/kernel/rcutorture.c > @@ -257,14 +257,14 @@ struct rcu_torture_ops { > void (*init)(void); > void (*cleanup)(void); > int (*readlock)(void); > - void (*readdelay)(struct rcu_random_state *rrsp); > + void (*read_delay)(struct rcu_random_state *rrsp); > void (*readunlock)(int idx); > int (*completed)(void); > - void (*deferredfree)(struct rcu_torture *p); > + void (*deferred_free)(struct rcu_torture *p); > void (*sync)(void); > void (*cb_barrier)(void); > int (*stats)(char *page); > - int irqcapable; > + int irq_capable; > char *name; > }; > static struct rcu_torture_ops *cur_ops = NULL; > @@ -320,7 +320,7 @@ rcu_torture_cb(struct rcu_head *p) > rp->rtort_mbtest = 0; > rcu_torture_free(rp); > } else > - cur_ops->deferredfree(rp); > + cur_ops->deferred_free(rp); > } > > static void rcu_torture_deferred_free(struct rcu_torture *p) > @@ -329,18 +329,18 @@ static void rcu_torture_deferred_free(struct rcu_torture *p) > } > > static struct rcu_torture_ops rcu_ops = { > - .init = NULL, > - .cleanup = NULL, > - .readlock = rcu_torture_read_lock, > - .readdelay = rcu_read_delay, > - .readunlock = rcu_torture_read_unlock, > - .completed = rcu_torture_completed, > - .deferredfree = rcu_torture_deferred_free, > - .sync = synchronize_rcu, > - .cb_barrier = rcu_barrier, > - .stats = NULL, > - .irqcapable = 1, > - .name = "rcu" > + .init = NULL, > + .cleanup = NULL, > + .readlock = rcu_torture_read_lock, > + .read_delay = rcu_read_delay, > + .readunlock = rcu_torture_read_unlock, > + .completed = rcu_torture_completed, > + .deferred_free = rcu_torture_deferred_free, > + .sync = synchronize_rcu, > + .cb_barrier = rcu_barrier, > + .stats = NULL, > + .irq_capable = 1, > + .name = "rcu" > }; > > static void rcu_sync_torture_deferred_free(struct rcu_torture *p) > @@ -370,18 +370,18 @@ static void rcu_sync_torture_init(void) > } > > static struct rcu_torture_ops rcu_sync_ops = { > - .init = rcu_sync_torture_init, > - .cleanup = NULL, > - .readlock = rcu_torture_read_lock, > - .readdelay = rcu_read_delay, > - .readunlock = rcu_torture_read_unlock, > - .completed = rcu_torture_completed, > - .deferredfree = rcu_sync_torture_deferred_free, > - .sync = synchronize_rcu, > - .cb_barrier = NULL, > - .stats = NULL, > - .irqcapable = 1, > - .name = "rcu_sync" > + .init = rcu_sync_torture_init, > + .cleanup = NULL, > + .readlock = rcu_torture_read_lock, > + .read_delay = rcu_read_delay, > + .readunlock = rcu_torture_read_unlock, > + .completed = rcu_torture_completed, > + .deferred_free = rcu_sync_torture_deferred_free, > + .sync = synchronize_rcu, > + .cb_barrier = NULL, > + .stats = NULL, > + .irq_capable = 1, > + .name = "rcu_sync" > }; > > /* > @@ -432,33 +432,53 @@ static void rcu_bh_torture_synchronize(void) > } > > static struct rcu_torture_ops rcu_bh_ops = { > - .init = NULL, > - .cleanup = NULL, > - .readlock = rcu_bh_torture_read_lock, > - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ > - .readunlock = rcu_bh_torture_read_unlock, > - .completed = rcu_bh_torture_completed, > - .deferredfree = rcu_bh_torture_deferred_free, > - .sync = rcu_bh_torture_synchronize, > - .cb_barrier = rcu_barrier_bh, > - .stats = NULL, > - .irqcapable = 1, > - .name = "rcu_bh" > + .init = NULL, > + .cleanup = NULL, > + .readlock = rcu_bh_torture_read_lock, > + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ > + .readunlock = rcu_bh_torture_read_unlock, > + .completed = rcu_bh_torture_completed, > + .deferred_free = rcu_bh_torture_deferred_free, > + .sync = rcu_bh_torture_synchronize, > + .cb_barrier = rcu_barrier_bh, > + .stats = NULL, > + .irq_capable = 1, > + .name = "rcu_bh" > }; > > static struct rcu_torture_ops rcu_bh_sync_ops = { > - .init = rcu_sync_torture_init, > - .cleanup = NULL, > - .readlock = rcu_bh_torture_read_lock, > - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ > - .readunlock = rcu_bh_torture_read_unlock, > - .completed = rcu_bh_torture_completed, > - .deferredfree = rcu_sync_torture_deferred_free, > - .sync = rcu_bh_torture_synchronize, > - .cb_barrier = NULL, > - .stats = NULL, > - .irqcapable = 1, > - .name = "rcu_bh_sync" > + .init = rcu_sync_torture_init, > + .cleanup = NULL, > + .readlock = rcu_bh_torture_read_lock, > + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ > + .readunlock = rcu_bh_torture_read_unlock, > + .completed = rcu_bh_torture_completed, > + .deferred_free = rcu_sync_torture_deferred_free, > + .sync = rcu_bh_torture_synchronize, > + .cb_barrier = NULL, > + .stats = NULL, > + .irq_capable = 1, > + .name = "rcu_bh_sync" > +}; > + > +static int rcu_bh_expedited_torture_completed(void) > +{ > + return rcu_batches_completed_bh_expedited(); > +} > + > +static struct rcu_torture_ops rcu_bh_expedited_ops = { > + .init = rcu_sync_torture_init, > + .cleanup = NULL, > + .readlock = rcu_bh_torture_read_lock, > + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ > + .readunlock = rcu_bh_torture_read_unlock, > + .completed = rcu_bh_expedited_torture_completed, > + .deferred_free = rcu_sync_torture_deferred_free, > + .sync = synchronize_rcu_bh_expedited, > + .cb_barrier = NULL, > + .stats = NULL, > + .irq_capable = 1, > + .name = "rcu_bh_expedited" > }; > > /* > @@ -530,17 +550,17 @@ static int srcu_torture_stats(char *page) > } > > static struct rcu_torture_ops srcu_ops = { > - .init = srcu_torture_init, > - .cleanup = srcu_torture_cleanup, > - .readlock = srcu_torture_read_lock, > - .readdelay = srcu_read_delay, > - .readunlock = srcu_torture_read_unlock, > - .completed = srcu_torture_completed, > - .deferredfree = rcu_sync_torture_deferred_free, > - .sync = srcu_torture_synchronize, > - .cb_barrier = NULL, > - .stats = srcu_torture_stats, > - .name = "srcu" > + .init = srcu_torture_init, > + .cleanup = srcu_torture_cleanup, > + .readlock = srcu_torture_read_lock, > + .read_delay = srcu_read_delay, > + .readunlock = srcu_torture_read_unlock, > + .completed = srcu_torture_completed, > + .deferred_free = rcu_sync_torture_deferred_free, > + .sync = srcu_torture_synchronize, > + .cb_barrier = NULL, > + .stats = srcu_torture_stats, > + .name = "srcu" > }; > > /* > @@ -574,32 +594,32 @@ static void sched_torture_synchronize(void) > } > > static struct rcu_torture_ops sched_ops = { > - .init = rcu_sync_torture_init, > - .cleanup = NULL, > - .readlock = sched_torture_read_lock, > - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ > - .readunlock = sched_torture_read_unlock, > - .completed = sched_torture_completed, > - .deferredfree = rcu_sched_torture_deferred_free, > - .sync = sched_torture_synchronize, > - .cb_barrier = rcu_barrier_sched, > - .stats = NULL, > - .irqcapable = 1, > - .name = "sched" > + .init = rcu_sync_torture_init, > + .cleanup = NULL, > + .readlock = sched_torture_read_lock, > + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ > + .readunlock = sched_torture_read_unlock, > + .completed = sched_torture_completed, > + .deferred_free = rcu_sched_torture_deferred_free, > + .sync = sched_torture_synchronize, > + .cb_barrier = rcu_barrier_sched, > + .stats = NULL, > + .irq_capable = 1, > + .name = "sched" > }; > > static struct rcu_torture_ops sched_ops_sync = { > - .init = rcu_sync_torture_init, > - .cleanup = NULL, > - .readlock = sched_torture_read_lock, > - .readdelay = rcu_read_delay, /* just reuse rcu's version. */ > - .readunlock = sched_torture_read_unlock, > - .completed = sched_torture_completed, > - .deferredfree = rcu_sync_torture_deferred_free, > - .sync = sched_torture_synchronize, > - .cb_barrier = NULL, > - .stats = NULL, > - .name = "sched_sync" > + .init = rcu_sync_torture_init, > + .cleanup = NULL, > + .readlock = sched_torture_read_lock, > + .read_delay = rcu_read_delay, /* just reuse rcu's version. */ > + .readunlock = sched_torture_read_unlock, > + .completed = sched_torture_completed, > + .deferred_free = rcu_sync_torture_deferred_free, > + .sync = sched_torture_synchronize, > + .cb_barrier = NULL, > + .stats = NULL, > + .name = "sched_sync" > }; > > /* > @@ -635,7 +655,7 @@ rcu_torture_writer(void *arg) > i = RCU_TORTURE_PIPE_LEN; > atomic_inc(&rcu_torture_wcount[i]); > old_rp->rtort_pipe_count++; > - cur_ops->deferredfree(old_rp); > + cur_ops->deferred_free(old_rp); > } > rcu_torture_current_version++; > oldbatch = cur_ops->completed(); > @@ -700,7 +720,7 @@ static void rcu_torture_timer(unsigned long unused) > if (p->rtort_mbtest == 0) > atomic_inc(&n_rcu_torture_mberror); > spin_lock(&rand_lock); > - cur_ops->readdelay(&rand); > + cur_ops->read_delay(&rand); > n_rcu_torture_timers++; > spin_unlock(&rand_lock); > preempt_disable(); > @@ -738,11 +758,11 @@ rcu_torture_reader(void *arg) > > VERBOSE_PRINTK_STRING("rcu_torture_reader task started"); > set_user_nice(current, 19); > - if (irqreader && cur_ops->irqcapable) > + if (irqreader && cur_ops->irq_capable) > setup_timer_on_stack(&t, rcu_torture_timer, 0); > > do { > - if (irqreader && cur_ops->irqcapable) { > + if (irqreader && cur_ops->irq_capable) { > if (!timer_pending(&t)) > mod_timer(&t, 1); > } > @@ -757,7 +777,7 @@ rcu_torture_reader(void *arg) > } > if (p->rtort_mbtest == 0) > atomic_inc(&n_rcu_torture_mberror); > - cur_ops->readdelay(&rand); > + cur_ops->read_delay(&rand); > preempt_disable(); > pipe_count = p->rtort_pipe_count; > if (pipe_count > RCU_TORTURE_PIPE_LEN) { > @@ -778,7 +798,7 @@ rcu_torture_reader(void *arg) > } while (!kthread_should_stop() && fullstop == FULLSTOP_DONTSTOP); > VERBOSE_PRINTK_STRING("rcu_torture_reader task stopping"); > rcutorture_shutdown_absorb("rcu_torture_reader"); > - if (irqreader && cur_ops->irqcapable) > + if (irqreader && cur_ops->irq_capable) > del_timer_sync(&t); > while (!kthread_should_stop()) > schedule_timeout_uninterruptible(1); > @@ -1078,6 +1098,7 @@ rcu_torture_init(void) > int firsterr = 0; > static struct rcu_torture_ops *torture_ops[] = > { &rcu_ops, &rcu_sync_ops, &rcu_bh_ops, &rcu_bh_sync_ops, > + &rcu_bh_expedited_ops, > &srcu_ops, &sched_ops, &sched_ops_sync, }; > > mutex_lock(&fullstop_mutex); > diff --git a/kernel/rcutree.c b/kernel/rcutree.c > index d2a372f..bf2c21d 100644 > --- a/kernel/rcutree.c > +++ b/kernel/rcutree.c > @@ -89,6 +89,7 @@ void rcu_qsctr_inc(int cpu) > struct rcu_data *rdp = &per_cpu(rcu_data, cpu); > rdp->passed_quiesc = 1; > rdp->passed_quiesc_completed = rdp->completed; > + synchronize_rcu_expedited_qs(cpu); > } > > void rcu_bh_qsctr_inc(int cpu) -- Mathieu Desnoyers OpenPGP key fingerprint: 8CD5 52C3 8E3C 4140 715F BA06 3F25 A8FE 3BAE 9A68 -- To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html