Re: [PATCH 2/2 v2] sched/wait: Introduce lock breaker in wake_up_page_bit

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Aug 25, 2017 at 3:51 PM, Linus Torvalds
<torvalds@xxxxxxxxxxxxxxxxxxxx> wrote:
>
> So take it as that: example pseudo-code that happens to pass a
> compiler, but is meant as a RFD rather than actually working.

Oh, and after I sent it out, I wanted to look once again, and realized
that the "remove_myself_from()" function is entirely broken.

The caller has already removed the page entry, we don't want to remove
it again, we want to add a *new* one with us removed from it.

So here's an updated 2/2 patch with that fixed.

Let this be a lesson in just *how* little tested, and *how* crap that
patch probably still is.

                 Linus
From 3f3355eab709e6fa418466e5487a30eb6ec80423 Mon Sep 17 00:00:00 2001
From: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
Date: Fri, 25 Aug 2017 16:01:36 -0700
Subject: [PATCH 2/2] Re-implement the page bit-wait code

The page wait-queues have some horrible scaling issues, and they seem to
be hard to fix.  And the way they use the regular wait-queues made that
really bad, with interrupts disabled for long times etc.

This tries to re-implement them with a totally different model.  It's
known broken, and the add_page_wait_queue() thing that the cachefiles
code wants to use is not implemented at all (it probably will just need
to have a parallel set of wait-queues that are *only* used for that).

The code is untested and probably horribly buggy, but I'm hoping others
will take a look at this.

Not-signed-off-yet-by: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx>
---
 mm/page_wait_bit.c | 335 +++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 235 insertions(+), 100 deletions(-)

diff --git a/mm/page_wait_bit.c b/mm/page_wait_bit.c
index 7550b6d2715a..968bc9b1cf21 100644
--- a/mm/page_wait_bit.c
+++ b/mm/page_wait_bit.c
@@ -9,9 +9,38 @@
 #include <linux/wait.h>
 #include <linux/export.h>
 #include <linux/sched/signal.h>
+#include <linux/list_bl.h>
 
 #include "internal.h"
 
+/*
+ * Each waiter on a page will register this
+ * 'page_wait_struct' as part of waiting.
+ *
+ * Note that for any particular page, only one of the
+ * structs will actually be visible at the head of the
+ * wait queue hash table at any particular time, but
+ * everybody has one, because as one waiter is woken
+ * up we will need to pick another head for waiters.
+ *
+ * NOTE! All the list operations are protected by the
+ * hlist_bl_lock on the hash table.
+ */
+struct page_wait_struct {
+	// This is the hash queue head entry
+	// only used once per { page, bit }
+	struct hlist_bl_node list;
+	struct page *page;
+	int bit;
+
+	struct page_wait_struct *all;
+	struct page_wait_struct *exclusive;
+
+	// This is the waiter list
+	struct page_wait_struct *next;
+	struct task_struct *wake;
+};
+
 /*
  * In order to wait for pages to become available there must be
  * waitqueues associated with pages. By using a hash table of
@@ -22,11 +51,11 @@
  * at a cost of "thundering herd" phenomena during rare hash
  * collisions.
  */
-#define PAGE_WAIT_TABLE_BITS 8
+#define PAGE_WAIT_TABLE_BITS 12
 #define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS)
-static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
+static struct hlist_bl_head page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
 
-static wait_queue_head_t *page_waitqueue(struct page *page)
+static struct hlist_bl_head *page_waitqueue(struct page *page)
 {
 	return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)];
 }
@@ -36,73 +65,155 @@ void __init pagecache_init(void)
 	int i;
 
 	for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++)
-		init_waitqueue_head(&page_wait_table[i]);
+		INIT_HLIST_BL_HEAD(&page_wait_table[i]);
 
 	page_writeback_init();
 }
 
-struct wait_page_key {
-	struct page *page;
-	int bit_nr;
-	int page_match;
-};
+/*
+ * We found a wait entry for the requested page and bit.
+ *
+ * We now need to create a wakeup list, which includes the
+ * first exclusive waiter (if any), and all the non-exclusive
+ * ones.
+ *
+ * If there are more than one exclusive waiters, we need to
+ * turns the next exclusive waiter into a wait entry, and
+ * add it back to the page wait list.
+ */
+static struct page_wait_struct *create_wake_up_list(struct page_wait_struct *entry, struct hlist_bl_head *head)
+{
+	struct page_wait_struct *all = entry->all;
+	struct page_wait_struct *exclusive = entry->exclusive;
 
-struct wait_page_queue {
-	struct page *page;
-	int bit_nr;
-	wait_queue_entry_t wait;
-};
+	if (exclusive) {
+		struct page_wait_struct *remain = exclusive->next;
+
+		if (remain) {
+			remain->all = NULL;
+			remain->exclusive = remain;
+			hlist_bl_add_head(&remain->list, head);
+		}
+		exclusive->next = all;
+		all = exclusive;
+	}
+	return all;
+}
 
-static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
+static inline int remove_myself_from_one_list(struct page_wait_struct **p, struct page_wait_struct *entry)
 {
-	struct wait_page_key *key = arg;
-	struct wait_page_queue *wait_page
-		= container_of(wait, struct wait_page_queue, wait);
+	while (*p) {
+		struct page_wait_struct *n = *p;
+		if (n == entry) {
+			*p = n->next;
+			return 1;
+		}
+		p = &n->next;
+	}
+	return 0;
+}
 
-	if (wait_page->page != key->page)
-	       return 0;
-	key->page_match = 1;
+/*
+ * We got woken up, and we need to make sure there is no more
+ * access to us in the list.
+ */
+static void remove_myself_from(struct page_wait_struct *old, struct page_wait_struct *entry, struct hlist_bl_head *head)
+{
+	struct page_wait_struct *new;
 
-	if (wait_page->bit_nr != key->bit_nr)
-		return 0;
-	if (test_bit(key->bit_nr, &key->page->flags))
-		return 0;
+	/* We can be on only one list */
+	if (!remove_myself_from_one_list(&old->all, entry))
+		remove_myself_from_one_list(&old->exclusive, entry);
 
-	return autoremove_wake_function(wait, mode, sync, key);
+	/*
+	 * If we were the old entry for the page/bit on the hash list,
+	 * we need to create a new one from one of the existing other
+	 * ones.
+	 *
+	 * If the head entry was somebody else, or if there are no
+	 * other wait entries for this page, we're done.
+	 */
+	if (old != entry)
+		return;
+
+	new = entry->exclusive;
+	if (!new) {
+		new = entry->all;
+		if (!new)
+			return;
+	}
+
+	/*
+	 * We can just use our old lists - we already removed our own
+	 * entry from them above.
+	 */
+	new->exclusive = entry->exclusive;
+	new->all = entry->all;
+	hlist_bl_add_head(&new->list, head);
+}
+
+
+/*
+ * Find and remove the matching page/bit entry from the (locked) bl list
+ *
+ * Return ERR_PTR(-ESRCH) if no matching page at all, NULL if page found
+ * but not with matching bit.
+ */
+static struct page_wait_struct *find_del_entry(struct page *page, int bit_nr, struct hlist_bl_head *head)
+{
+	struct page_wait_struct *entry;
+	struct page_wait_struct *ret = ERR_PTR(-ESRCH);
+	struct hlist_bl_node *node;
+
+	hlist_bl_for_each_entry(entry, node, head, list) {
+		if (entry->page != page)
+			continue;
+		ret = NULL;
+		if (entry->bit != bit_nr)
+			continue;
+		__hlist_bl_del(node);
+		INIT_HLIST_BL_NODE(node);
+		ret = entry;
+		break;
+	}
+	return ret;
 }
 
 static void wake_up_page_bit(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	struct wait_page_key key;
+	struct hlist_bl_head *head = page_waitqueue(page);
+	struct page_wait_struct *wake;
 	unsigned long flags;
 
-	key.page = page;
-	key.bit_nr = bit_nr;
-	key.page_match = 0;
+	local_save_flags(flags);
+	hlist_bl_lock(head);
+
+	wake = find_del_entry(page, bit_nr, head);
+	if (IS_ERR(wake)) {
+		ClearPageWaiters(page);
+		wake = NULL;
+	} else if (wake) {
+		wake = create_wake_up_list(wake, head);
+	}
+
+	hlist_bl_unlock(head);
+	local_irq_restore(flags);
 
-	spin_lock_irqsave(&q->lock, flags);
-	__wake_up_locked_key(q, TASK_NORMAL, &key);
 	/*
-	 * It is possible for other pages to have collided on the waitqueue
-	 * hash, so in that case check for a page match. That prevents a long-
-	 * term waiter
+	 * Actually wake everybody up. Note that as we
+	 * wake them up, we can't use the 'wake_list'
+	 * entry any more, because it's on their stack.
 	 *
-	 * It is still possible to miss a case here, when we woke page waiters
-	 * and removed them from the waitqueue, but there are still other
-	 * page waiters.
+	 * We also clear the 'wake' field so that the
+	 * target process can see if they got woken up
+	 * by a page bit event.
 	 */
-	if (!waitqueue_active(q) || !key.page_match) {
-		ClearPageWaiters(page);
-		/*
-		 * It's possible to miss clearing Waiters here, when we woke
-		 * our page waiters, but the hashed waitqueue has waiters for
-		 * other pages on it.
-		 *
-		 * That's okay, it's a rare case. The next waker will clear it.
-		 */
-	}
-	spin_unlock_irqrestore(&q->lock, flags);
+	while (wake) {
+		struct task_struct *p = wake->wake;
+		wake = wake->next;
+		smp_store_release(&wake->wake, NULL);
+		wake_up_process(p);
+	};
 }
 
 static void wake_up_page(struct page *page, int bit)
@@ -112,76 +223,101 @@ static void wake_up_page(struct page *page, int bit)
 	wake_up_page_bit(page, bit);
 }
 
-static inline int wait_on_page_bit_common(wait_queue_head_t *q,
-		struct page *page, int bit_nr, int state, bool lock)
+/*
+ * Wait for the specific page bit to clear.
+ */
+static void wait_once_on_page_bit(struct page *page, int bit_nr, int state, bool lock)
 {
-	struct wait_page_queue wait_page;
-	wait_queue_entry_t *wait = &wait_page.wait;
-	int ret = 0;
+	struct page_wait_struct entry, *old;
+	struct hlist_bl_head *head;
+	unsigned long flags;
 
-	init_wait(wait);
-	wait->func = wake_page_function;
-	wait_page.page = page;
-	wait_page.bit_nr = bit_nr;
+	INIT_HLIST_BL_NODE(&entry.list);
+	entry.page = page;
+	entry.bit = bit_nr;
+	entry.all = entry.exclusive = NULL;
+	entry.next = NULL;
+	entry.wake = current;
+
+	head = page_waitqueue(page);
+	local_save_flags(flags);
+	hlist_bl_lock(head);
+
+	old = find_del_entry(page, bit_nr, head);
+	if (IS_ERR(old))
+		old = NULL;
+	if (old) {
+		entry.all = old->all;
+		entry.exclusive = old->exclusive;
+	}
 
-	for (;;) {
-		spin_lock_irq(&q->lock);
-
-		if (likely(list_empty(&wait->entry))) {
-			if (lock)
-				__add_wait_queue_entry_tail_exclusive(q, wait);
-			else
-				__add_wait_queue(q, wait);
-			SetPageWaiters(page);
-		}
+	if (lock) {
+		entry.next = entry.exclusive;
+		entry.exclusive = &entry;
+	} else {
+		entry.next = entry.all;
+		entry.all = &entry;
+	}
 
-		set_current_state(state);
+	hlist_bl_add_head(&entry.list, head);
+	current->state = state;
 
-		spin_unlock_irq(&q->lock);
+	hlist_bl_unlock(head);
+	local_irq_restore(flags);
 
-		if (likely(test_bit(bit_nr, &page->flags))) {
-			io_schedule();
-			if (unlikely(signal_pending_state(state, current))) {
-				ret = -EINTR;
-				break;
-			}
-		}
+	if (likely(test_bit(bit_nr, &page->flags)))
+		io_schedule();
 
+	/*
+	 * NOTE! If we were woken up by something else,
+	 * we have to remove ourselves from the hash list.
+	 *
+	 * But in order to avoid extra locking overhead in
+	 * the common case, we only do this if we can't
+	 * already tell that we were woken up (and thus
+	 * no longer on the lists).
+	 */
+	if (smp_load_acquire(&entry.wake) != NULL) {
+		local_save_flags(flags);
+		hlist_bl_lock(head);
+
+		old = find_del_entry(page, bit_nr, head);
+		if (old && !IS_ERR(old))
+			remove_myself_from(old, &entry, head);
+
+		hlist_bl_unlock(head);
+		local_irq_restore(flags);
+	}
+}
+
+static inline int wait_on_page_bit_common(struct page *page, int bit_nr, int state, bool lock)
+{
+	for (;;) {
+		wait_once_on_page_bit(page, bit_nr, state, lock);
 		if (lock) {
 			if (!test_and_set_bit_lock(bit_nr, &page->flags))
-				break;
+				return 0;
 		} else {
 			if (!test_bit(bit_nr, &page->flags))
-				break;
+				return 0;
 		}
+		if (unlikely(signal_pending_state(state, current)))
+			return -EINTR;
 	}
-
-	finish_wait(q, wait);
-
-	/*
-	 * A signal could leave PageWaiters set. Clearing it here if
-	 * !waitqueue_active would be possible (by open-coding finish_wait),
-	 * but still fail to catch it in the case of wait hash collision. We
-	 * already can fail to clear wait hash collision cases, so don't
-	 * bother with signals either.
-	 */
-
-	return ret;
 }
 
 void wait_on_page_bit(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
+	wait_on_page_bit_common(page, bit_nr, TASK_UNINTERRUPTIBLE, false);
 }
 EXPORT_SYMBOL(wait_on_page_bit);
 
 int wait_on_page_bit_killable(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false);
+	return wait_on_page_bit_common(page, bit_nr, TASK_KILLABLE, false);
 }
 
+#if 0
 /**
  * add_page_wait_queue - Add an arbitrary waiter to a page's wait queue
  * @page: Page defining the wait queue of interest
@@ -200,6 +336,7 @@ void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter)
 	spin_unlock_irqrestore(&q->lock, flags);
 }
 EXPORT_SYMBOL_GPL(add_page_wait_queue);
+#endif
 
 
 /**
@@ -209,16 +346,14 @@ EXPORT_SYMBOL_GPL(add_page_wait_queue);
 void __lock_page(struct page *__page)
 {
 	struct page *page = compound_head(__page);
-	wait_queue_head_t *q = page_waitqueue(page);
-	wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
+	wait_on_page_bit_common(page, PG_locked, TASK_UNINTERRUPTIBLE, true);
 }
 EXPORT_SYMBOL(__lock_page);
 
 int __lock_page_killable(struct page *__page)
 {
 	struct page *page = compound_head(__page);
-	wait_queue_head_t *q = page_waitqueue(page);
-	return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true);
+	return wait_on_page_bit_common(page, PG_locked, TASK_KILLABLE, true);
 }
 EXPORT_SYMBOL_GPL(__lock_page_killable);
 
-- 
2.14.0.rc1.2.g4c8247ec3


[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Bugtraq]     [Linux OMAP]     [Linux MIPS]     [eCos]     [Asterisk Internet PBX]     [Linux API]
  Powered by Linux