Hi,
在 2022/09/07 5:27, Keith Busch 写道:
On Wed, Aug 03, 2022 at 08:15:04PM +0800, Yu Kuai wrote:
wait_cnt = atomic_dec_return(&ws->wait_cnt);
- if (wait_cnt <= 0) {
- int ret;
+ /*
+ * For concurrent callers of this, callers should call this function
+ * again to wakeup a new batch on a different 'ws'.
+ */
+ if (wait_cnt < 0 || !waitqueue_active(&ws->wait))
+ return true;
If wait_cnt is '0', but the waitqueue_active happens to be false due to racing
with add_wait_queue(), this returns true so the caller will retry. The next
atomic_dec will set the current waitstate wait_cnt < 0, which also forces an
early return true. When does the wake up happen, or wait_cnt and wait_index get
updated in that case?
If waitqueue becomes empty, then concurrent callers can go on:
__sbq_wake_up
sbq_wake_ptr
for (i = 0; i < SBQ_WAIT_QUEUES; i++)
if (waitqueue_active(&ws->wait)) -> only choose the active waitqueue
If waitqueue is not empty, it is the same with or without this patch,
concurrent caller will have to wait for the one who wins the race:
Before:
__sbq_wake_up
atomic_cmpxchg -> win the race
sbq_index_atomic_inc -> concurrent callers can go
After:
__sbq_wake_up
wake_up_nr -> concurrent callers can go on if waitqueue become empty
atomic_dec_return -> return 0
sbq_index_atomic_inc
Thanks,
Kuai
- wake_batch = READ_ONCE(sbq->wake_batch);
+ if (wait_cnt > 0)
+ return false;
- /*
- * Pairs with the memory barrier in sbitmap_queue_resize() to
- * ensure that we see the batch size update before the wait
- * count is reset.
- */
- smp_mb__before_atomic();
+ wake_batch = READ_ONCE(sbq->wake_batch);
- /*
- * For concurrent callers of this, the one that failed the
- * atomic_cmpxhcg() race should call this function again
- * to wakeup a new batch on a different 'ws'.
- */
- ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
- if (ret == wait_cnt) {
- sbq_index_atomic_inc(&sbq->wake_index);
- wake_up_nr(&ws->wait, wake_batch);
- return false;
- }
+ /*
+ * Wake up first in case that concurrent callers decrease wait_cnt
+ * while waitqueue is empty.
+ */
+ wake_up_nr(&ws->wait, wake_batch);
- return true;
- }
+ /*
+ * Pairs with the memory barrier in sbitmap_queue_resize() to
+ * ensure that we see the batch size update before the wait
+ * count is reset.
+ *
+ * Also pairs with the implicit barrier between decrementing wait_cnt
+ * and checking for waitqueue_active() to make sure waitqueue_active()
+ * sees result of the wakeup if atomic_dec_return() has seen the result
+ * of atomic_set().
+ */
+ smp_mb__before_atomic();
+
+ /*
+ * Increase wake_index before updating wait_cnt, otherwise concurrent
+ * callers can see valid wait_cnt in old waitqueue, which can cause
+ * invalid wakeup on the old waitqueue.
+ */
+ sbq_index_atomic_inc(&sbq->wake_index);
+ atomic_set(&ws->wait_cnt, wake_batch);
return false;
}
--
2.31.1
.