On Wed, Sep 25, 2013 at 08:40:15PM +0200, Peter Zijlstra wrote: > On Wed, Sep 25, 2013 at 07:50:55PM +0200, Oleg Nesterov wrote: > > No. Too tired too ;) damn LSB test failures... > > > ok; I cobbled this together.. I might think better of it tomorrow, but > for now I think I closed the hole before wait_event(readers_active()) > you pointed out -- of course I might have created new holes :/ > > For easy reading the + only version. A couple of nits and some commentary, but if there are races, they are quite subtle. ;-) Thanx, Paul > --- > +++ b/include/linux/cpu.h > @@ -16,6 +16,7 @@ > #include <linux/node.h> > #include <linux/compiler.h> > #include <linux/cpumask.h> > +#include <linux/percpu.h> > > struct device; > > @@ -173,10 +174,50 @@ extern struct bus_type cpu_subsys; > #ifdef CONFIG_HOTPLUG_CPU > /* Stop CPUs going up and down. */ > > +extern void cpu_hotplug_init_task(struct task_struct *p); > + > extern void cpu_hotplug_begin(void); > extern void cpu_hotplug_done(void); > + > +extern int __cpuhp_writer; > +DECLARE_PER_CPU(unsigned int, __cpuhp_refcount); > + > +extern void __get_online_cpus(void); > + > +static inline void get_online_cpus(void) > +{ > + might_sleep(); > + > + /* Support reader-in-reader recursion */ > + if (current->cpuhp_ref++) { > + barrier(); Oleg was right, this barrier() can go. The value was >=1 and remains >=1, so reordering causes no harm. (See below.) > + return; > + } > + > + preempt_disable(); > + if (likely(!__cpuhp_writer)) > + __this_cpu_inc(__cpuhp_refcount); The barrier required here is provided by synchronize_sched(), and all the code is contained by the barrier()s in preempt_disable() and preempt_enable(). > + else > + __get_online_cpus(); And a memory barrier is unconditionally executed by __get_online_cpus(). > + preempt_enable(); The barrier() in preempt_enable() prevents the compiler from bleeding the critical section out. > +} > + > +extern void __put_online_cpus(void); > + > +static inline void put_online_cpus(void) > +{ > + barrier(); This barrier() can also be dispensed with. > + if (--current->cpuhp_ref) If we leave here, the value was >=1 and remains >=1, so reordering does no harm. > + return; > + > + preempt_disable(); The barrier() in preempt_disable() prevents the compiler from bleeding the critical section out. > + if (likely(!__cpuhp_writer)) > + __this_cpu_dec(__cpuhp_refcount); The barrier here is supplied by synchronize_sched(). > + else > + __put_online_cpus(); And a memory barrier is unconditionally executed by __put_online_cpus(). > + preempt_enable(); > +} > + > extern void cpu_hotplug_disable(void); > extern void cpu_hotplug_enable(void); > #define hotcpu_notifier(fn, pri) cpu_notifier(fn, pri) > @@ -200,6 +241,8 @@ static inline void cpu_hotplug_driver_un > > #else /* CONFIG_HOTPLUG_CPU */ > > +static inline void cpu_hotplug_init_task(struct task_struct *p) {} > + > static inline void cpu_hotplug_begin(void) {} > static inline void cpu_hotplug_done(void) {} > #define get_online_cpus() do { } while (0) > +++ b/include/linux/sched.h > @@ -1454,6 +1454,9 @@ struct task_struct { > unsigned int sequential_io; > unsigned int sequential_io_avg; > #endif > +#ifdef CONFIG_HOTPLUG_CPU > + int cpuhp_ref; > +#endif > }; > > /* Future-safe accessor for struct task_struct's cpus_allowed. */ > +++ b/kernel/cpu.c > @@ -49,88 +49,140 @@ static int cpu_hotplug_disabled; > > #ifdef CONFIG_HOTPLUG_CPU > > +enum { readers_fast = 0, readers_slow, readers_block }; > + > +int __cpuhp_writer; > +EXPORT_SYMBOL_GPL(__cpuhp_writer); > + > +DEFINE_PER_CPU(unsigned int, __cpuhp_refcount); > +EXPORT_PER_CPU_SYMBOL_GPL(__cpuhp_refcount); > + > +static DEFINE_PER_CPU(unsigned int, cpuhp_seq); > +static atomic_t cpuhp_waitcount; > +static DECLARE_WAIT_QUEUE_HEAD(cpuhp_readers); > +static DECLARE_WAIT_QUEUE_HEAD(cpuhp_writer); > + > +void cpu_hotplug_init_task(struct task_struct *p) > +{ > + p->cpuhp_ref = 0; > +} > > +void __get_online_cpus(void) > { > +again: > + /* See __srcu_read_lock() */ > + __this_cpu_inc(__cpuhp_refcount); > + smp_mb(); /* A matches B, E */ > + __this_cpu_inc(cpuhp_seq); > + > + if (unlikely(__cpuhp_writer == readers_block)) { > + __put_online_cpus(); Suppose we got delayed here for some time. The writer might complete, and be awakened by the blocked readers (we have not incremented our counter yet). We would then drop through, do the atomic_dec_and_test() and deliver a spurious wake_up_all() at some random time in the future. Which should be OK because __wait_event() looks to handle spurious wake_up()s. > + atomic_inc(&cpuhp_waitcount); > + > + /* > + * We either call schedule() in the wait, or we'll fall through > + * and reschedule on the preempt_enable() in get_online_cpus(). > + */ > + preempt_enable_no_resched(); > + __wait_event(cpuhp_readers, __cpuhp_writer != readers_block); > + preempt_disable(); > > + if (atomic_dec_and_test(&cpuhp_waitcount)) > + wake_up_all(&cpuhp_writer); There can be only one writer, so why the wake_up_all()? > + > + goto again; > + } > +} > +EXPORT_SYMBOL_GPL(__get_online_cpus); > + > +void __put_online_cpus(void) > +{ > + /* See __srcu_read_unlock() */ > + smp_mb(); /* C matches D */ In other words, if they see our decrement (presumably to aggregate zero, as that is the only time it matters) they will also see our critical section. > + this_cpu_dec(__cpuhp_refcount); > + > + /* Prod writer to recheck readers_active */ > + wake_up_all(&cpuhp_writer); > } > +EXPORT_SYMBOL_GPL(__put_online_cpus); > > +#define per_cpu_sum(var) \ > +({ \ > + typeof(var) __sum = 0; \ > + int cpu; \ > + for_each_possible_cpu(cpu) \ > + __sum += per_cpu(var, cpu); \ > + __sum; \ > +)} > + > +/* > + * See srcu_readers_active_idx_check() > + */ > +static bool cpuhp_readers_active_check(void) > { > + unsigned int seq = per_cpu_sum(cpuhp_seq); > + > + smp_mb(); /* B matches A */ In other words, if we see __get_online_cpus() cpuhp_seq increment, we are guaranteed to also see its __cpuhp_refcount increment. > + if (per_cpu_sum(__cpuhp_refcount) != 0) > + return false; > > + smp_mb(); /* D matches C */ > > + return per_cpu_sum(cpuhp_seq) == seq; On equality, we know that there could not be any "sneak path" pairs where we see a decrement but not the corresponding increment for a given reader. If we saw its decrement, the memory barriers guarantee that we now see its cpuhp_seq increment. > } > > /* > * This ensures that the hotplug operation can begin only when the > * refcount goes to zero. > * > * Since cpu_hotplug_begin() is always called after invoking > * cpu_maps_update_begin(), we can be sure that only one writer is active. > */ > void cpu_hotplug_begin(void) > { > + unsigned int count = 0; > + int cpu; > > + lockdep_assert_held(&cpu_add_remove_lock); > + > + /* allow reader-in-writer recursion */ > + current->cpuhp_ref++; > + > + /* make readers take the slow path */ > + __cpuhp_writer = readers_slow; > + > + /* See percpu_down_write() */ > + synchronize_sched(); At this point, we know that all readers take the slow path. > + /* make readers block */ > + __cpuhp_writer = readers_block; > + > + smp_mb(); /* E matches A */ If they don't see our write of readers_block to __cpuhp_writer, then we are guaranteed to see their __cpuhp_refcount increment, and therefore will wait for them. > + /* Wait for all readers to go away */ > + wait_event(cpuhp_writer, cpuhp_readers_active_check()); > } > > void cpu_hotplug_done(void) > { > + /* Signal the writer is done, no fast path yet */ > + __cpuhp_writer = readers_slow; > + wake_up_all(&cpuhp_readers); OK, the wait_event()/wake_up_all() prevents the races where the readers are delayed between fetching __cpuhp_writer and blocking. > + /* See percpu_up_write() */ > + synchronize_sched(); At this point, readers no longer attempt to block. You avoid falling into the usual acquire-release-mismatch trap by using __cpuhp_refcount on both the fastpatch and the slowpath, so that it is OK to acquire on the fastpath and release on the slowpath (and vice versa). > + /* Let 'em rip */ > + __cpuhp_writer = readers_fast; > + current->cpuhp_ref--; > + > + /* > + * Wait for any pending readers to be running. This ensures readers > + * after writer and avoids writers starving readers. > + */ > + wait_event(cpuhp_writer, !atomic_read(&cpuhp_waitcount)); > } > > /* > +++ b/kernel/sched/core.c > @@ -1736,6 +1736,8 @@ static void __sched_fork(unsigned long c > INIT_LIST_HEAD(&p->numa_entry); > p->numa_group = NULL; > #endif /* CONFIG_NUMA_BALANCING */ > + > + cpu_hotplug_init_task(p); > } > > #ifdef CONFIG_NUMA_BALANCING > -- To unsubscribe, send a message with 'unsubscribe linux-mm' in the body to majordomo@xxxxxxxxx. For more info on Linux MM, see: http://www.linux-mm.org/ . Don't email: <a href=mailto:"dont@xxxxxxxxx"> email@xxxxxxxxx </a>