[PATCH RFC -next 3/3] sbitmap: improve the fairness of waitqueues' wake up

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Currently, same waitqueue might be woken up continuously:

__sbq_wake_up			__sbq_wake_up
 sbq_wake_ptr -> assume	0	 sbq_wake_ptr -> 0
 atomic_dec_return
				 atomic_dec_return
 atomic_cmpxchg -> succeed
				 atomic_cmpxchg -> failed
				  return true
				__sbq_wake_up
				 sbq_wake_ptr
				  atomic_read(&sbq->wake_index) -> 0
 sbq_index_atomic_inc -> inc to 1
				  if (waitqueue_active(&ws->wait))
				   if (wake_index != atomic_read(&sbq->wake_index))
				    atomic_set(&sbq->wake_index, wake_index); -> reset from 1 to 0
 wake_up_nr -> wake up first waitqueue
				    // continue to wake up in first waitqueue

To fix the problem, add a detection in sbq_wake_ptr() to avoid choose
the same waitqueue; and refactor __sbq_wake_up() to increase
'wake_index' before updating 'wait_cnt'.

Signed-off-by: Yu Kuai <yukuai3@xxxxxxxxxx>
---
 lib/sbitmap.c | 50 ++++++++++++++++++++++++++------------------------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git a/lib/sbitmap.c b/lib/sbitmap.c
index bde0783e4ace..86b18eed83aa 100644
--- a/lib/sbitmap.c
+++ b/lib/sbitmap.c
@@ -583,6 +583,10 @@ static struct sbq_wait_state *sbq_wake_ptr(struct sbitmap_queue *sbq)
 		return NULL;
 
 	wake_index = atomic_read(&sbq->wake_index);
+
+	/* If this waitqueue is about to wake up, switch to the next */
+	if (atomic_read(&sbq->ws[wake_index].wait_cnt) <= 0)
+		wake_index = sbq_index_inc(wake_index);
 	for (i = 0; i < SBQ_WAIT_QUEUES; i++) {
 		struct sbq_wait_state *ws = &sbq->ws[wake_index];
 
@@ -609,33 +613,31 @@ static bool __sbq_wake_up(struct sbitmap_queue *sbq)
 		return false;
 
 	wait_cnt = atomic_dec_return(&ws->wait_cnt);
-	if (wait_cnt <= 0) {
-		int ret;
-
-		wake_batch = READ_ONCE(sbq->wake_batch);
-
-		/*
-		 * Pairs with the memory barrier in sbitmap_queue_resize() to
-		 * ensure that we see the batch size update before the wait
-		 * count is reset.
-		 */
-		smp_mb__before_atomic();
+	if (wait_cnt > 0)
+		return false;
+	/*
+	 * Concurrent callers should call this function again
+	 * to wakeup a new batch on a different 'ws'.
+	 */
+	else if (wait_cnt < 0)
+		return true;
 
-		/*
-		 * For concurrent callers of this, the one that failed the
-		 * atomic_cmpxhcg() race should call this function again
-		 * to wakeup a new batch on a different 'ws'.
-		 */
-		ret = atomic_cmpxchg(&ws->wait_cnt, wait_cnt, wake_batch);
-		if (ret == wait_cnt) {
-			sbq_index_atomic_inc(&sbq->wake_index);
-			wake_up_nr(&ws->wait, wake_batch);
-			return false;
-		}
+	/*
+	 * Increase 'wake_index' before updating 'wake_batch', in case that
+	 * concurrent callers wake up the same 'ws' again.
+	 */
+	sbq_index_atomic_inc(&sbq->wake_index);
+	wake_batch = READ_ONCE(sbq->wake_batch);
 
-		return true;
-	}
+	/*
+	 * Pairs with the memory barrier in sbitmap_queue_resize() to
+	 * ensure that we see the batch size update before the wait
+	 * count is reset.
+	 */
+	smp_mb__before_atomic();
 
+	atomic_set(&ws->wait_cnt, wake_batch);
+	wake_up_nr(&ws->wait, wake_batch);
 	return false;
 }
 
-- 
2.31.1




[Index of Archives]     [Linux RAID]     [Linux SCSI]     [Linux ATA RAID]     [IDE]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Device Mapper]

  Powered by Linux