In the percpu_ref_call_confirm_rcu(), we call the wake_up_all() before calling percpu_ref_put(), which will cause the value of percpu_ref to be unstable when percpu_ref_switch_to_atomic_sync() returns. CPU0 CPU1 percpu_ref_switch_to_atomic_sync(&ref) --> percpu_ref_switch_to_atomic(&ref) --> percpu_ref_get(ref); /* put after confirmation */ call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu); percpu_ref_switch_to_atomic_rcu --> percpu_ref_call_confirm_rcu --> data->confirm_switch = NULL; wake_up_all(&percpu_ref_switch_waitq); /* here waiting to wake up */ wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch); (A)percpu_ref_put(ref); /* The value of &ref is unstable! */ percpu_ref_is_zero(&ref) (B)percpu_ref_put(ref); As shown above, assuming that the counts on each cpu add up to 0 before calling percpu_ref_switch_to_atomic_sync(), we expect that after switching to atomic mode, percpu_ref_is_zero() can return true. But actually it will return different values in the two cases of A and B, which is not what we expected. Now there are two users of percpu_ref_switch_to_atomic_sync() in the kernel: i. mddev->writes_pending in the driver/md/md.c ii. q->q_usage_counter in the block/blk-pm.c And they are all used as shown above. In the worst case, percpu_ref_is_zero() may not hold because of the case B every time. While this is unlikely to occur in a production environment, it is a problem. This patch moves percpu_ref_put() out of the rcu handler and call it after wait_event(), which can makes ref stable after percpu_ref_switch_to_atomic_sync() returns. Then in the example above, percpu_ref_is_zero() can see a steady 0 value, which is what we would expect. Signed-off-by: Qi Zheng <zhengqi.arch@xxxxxxxxxxxxx> --- include/linux/percpu-refcount.h | 4 ++- lib/percpu-refcount.c | 56 +++++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/include/linux/percpu-refcount.h b/include/linux/percpu-refcount.h index d73a1c08c3e3..75844939a965 100644 --- a/include/linux/percpu-refcount.h +++ b/include/linux/percpu-refcount.h @@ -98,6 +98,7 @@ struct percpu_ref_data { percpu_ref_func_t *confirm_switch; bool force_atomic:1; bool allow_reinit:1; + bool sync:1; struct rcu_head rcu; struct percpu_ref *ref; }; @@ -123,7 +124,8 @@ int __must_check percpu_ref_init(struct percpu_ref *ref, gfp_t gfp); void percpu_ref_exit(struct percpu_ref *ref); void percpu_ref_switch_to_atomic(struct percpu_ref *ref, - percpu_ref_func_t *confirm_switch); + percpu_ref_func_t *confirm_switch, + bool sync); void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref); void percpu_ref_switch_to_percpu(struct percpu_ref *ref); void percpu_ref_kill_and_confirm(struct percpu_ref *ref, diff --git a/lib/percpu-refcount.c b/lib/percpu-refcount.c index af9302141bcf..3a8906715e09 100644 --- a/lib/percpu-refcount.c +++ b/lib/percpu-refcount.c @@ -99,6 +99,7 @@ int percpu_ref_init(struct percpu_ref *ref, percpu_ref_func_t *release, data->release = release; data->confirm_switch = NULL; data->ref = ref; + data->sync = false; ref->data = data; return 0; } @@ -146,21 +147,33 @@ void percpu_ref_exit(struct percpu_ref *ref) } EXPORT_SYMBOL_GPL(percpu_ref_exit); +static inline void percpu_ref_switch_to_atomic_post(struct percpu_ref *ref) +{ + struct percpu_ref_data *data = ref->data; + + if (!data->allow_reinit) + __percpu_ref_exit(ref); + + /* drop ref from percpu_ref_switch_to_atomic() */ + percpu_ref_put(ref); +} + static void percpu_ref_call_confirm_rcu(struct rcu_head *rcu) { struct percpu_ref_data *data = container_of(rcu, struct percpu_ref_data, rcu); struct percpu_ref *ref = data->ref; + bool need_put = true; + + if (data->sync) + need_put = data->sync = false; data->confirm_switch(ref); data->confirm_switch = NULL; wake_up_all(&percpu_ref_switch_waitq); - if (!data->allow_reinit) - __percpu_ref_exit(ref); - - /* drop ref from percpu_ref_switch_to_atomic() */ - percpu_ref_put(ref); + if (need_put) + percpu_ref_switch_to_atomic_post(ref); } static void percpu_ref_switch_to_atomic_rcu(struct rcu_head *rcu) @@ -210,14 +223,19 @@ static void percpu_ref_noop_confirm_switch(struct percpu_ref *ref) } static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref, - percpu_ref_func_t *confirm_switch) + percpu_ref_func_t *confirm_switch, + bool sync) { if (ref->percpu_count_ptr & __PERCPU_REF_ATOMIC) { if (confirm_switch) confirm_switch(ref); + if (sync) + percpu_ref_get(ref); return; } + ref->data->sync = sync; + /* switching from percpu to atomic */ ref->percpu_count_ptr |= __PERCPU_REF_ATOMIC; @@ -232,13 +250,16 @@ static void __percpu_ref_switch_to_atomic(struct percpu_ref *ref, call_rcu(&ref->data->rcu, percpu_ref_switch_to_atomic_rcu); } -static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref) +static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref, bool sync) { unsigned long __percpu *percpu_count = percpu_count_ptr(ref); int cpu; BUG_ON(!percpu_count); + if (sync) + percpu_ref_get(ref); + if (!(ref->percpu_count_ptr & __PERCPU_REF_ATOMIC)) return; @@ -261,7 +282,8 @@ static void __percpu_ref_switch_to_percpu(struct percpu_ref *ref) } static void __percpu_ref_switch_mode(struct percpu_ref *ref, - percpu_ref_func_t *confirm_switch) + percpu_ref_func_t *confirm_switch, + bool sync) { struct percpu_ref_data *data = ref->data; @@ -276,9 +298,9 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref, percpu_ref_switch_lock); if (data->force_atomic || percpu_ref_is_dying(ref)) - __percpu_ref_switch_to_atomic(ref, confirm_switch); + __percpu_ref_switch_to_atomic(ref, confirm_switch, sync); else - __percpu_ref_switch_to_percpu(ref); + __percpu_ref_switch_to_percpu(ref, sync); } /** @@ -302,14 +324,15 @@ static void __percpu_ref_switch_mode(struct percpu_ref *ref, * switching to atomic mode, this function can be called from any context. */ void percpu_ref_switch_to_atomic(struct percpu_ref *ref, - percpu_ref_func_t *confirm_switch) + percpu_ref_func_t *confirm_switch, + bool sync) { unsigned long flags; spin_lock_irqsave(&percpu_ref_switch_lock, flags); ref->data->force_atomic = true; - __percpu_ref_switch_mode(ref, confirm_switch); + __percpu_ref_switch_mode(ref, confirm_switch, sync); spin_unlock_irqrestore(&percpu_ref_switch_lock, flags); } @@ -325,8 +348,9 @@ EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic); */ void percpu_ref_switch_to_atomic_sync(struct percpu_ref *ref) { - percpu_ref_switch_to_atomic(ref, NULL); + percpu_ref_switch_to_atomic(ref, NULL, true); wait_event(percpu_ref_switch_waitq, !ref->data->confirm_switch); + percpu_ref_switch_to_atomic_post(ref); } EXPORT_SYMBOL_GPL(percpu_ref_switch_to_atomic_sync); @@ -355,7 +379,7 @@ void percpu_ref_switch_to_percpu(struct percpu_ref *ref) spin_lock_irqsave(&percpu_ref_switch_lock, flags); ref->data->force_atomic = false; - __percpu_ref_switch_mode(ref, NULL); + __percpu_ref_switch_mode(ref, NULL, false); spin_unlock_irqrestore(&percpu_ref_switch_lock, flags); } @@ -390,7 +414,7 @@ void percpu_ref_kill_and_confirm(struct percpu_ref *ref, ref->data->release); ref->percpu_count_ptr |= __PERCPU_REF_DEAD; - __percpu_ref_switch_mode(ref, confirm_kill); + __percpu_ref_switch_mode(ref, confirm_kill, false); percpu_ref_put(ref); spin_unlock_irqrestore(&percpu_ref_switch_lock, flags); @@ -470,7 +494,7 @@ void percpu_ref_resurrect(struct percpu_ref *ref) ref->percpu_count_ptr &= ~__PERCPU_REF_DEAD; percpu_ref_get(ref); - __percpu_ref_switch_mode(ref, NULL); + __percpu_ref_switch_mode(ref, NULL, false); spin_unlock_irqrestore(&percpu_ref_switch_lock, flags); } -- 2.20.1