On 04/03/2018 02:59 PM, Sebastian Andrzej Siewior wrote: > I noticed commit 2554db916586 ("sched/wait: Break up long wake list > walk") in which it is claimed that > |We saw page wait list that are up to 3700+ entries long in tests of > |large 4 and 8 socket systems. > > Here is another approach: instead of a waitlist a rbtree is used. The > tree is ordered by page and bit_nr that it waits for. A wake up request > for specific page + bit_nr combination does not need to browse through > the whole set of waiters but does only look for the specific waiter(s). > For the actual wake up wake_q mechanism is used. That means we enqueue > all to-be-woken tasks on a queue and perform the actual wakeup without > holding the queue lock. One concern I have is the overhead of maintaining this rbtree. There's also locking overhead when adding/removing/lookup pages from this rbtree for any page wait/wake operation. Have you done some measurement on some workload to measure its effect? Tim > > add_page_wait_queue() is currently not wired up which means it breaks > the one user we have right now. Instead of fixing that I would be > interrested in some specific benchmark to see if that is any help or > just making things worse. > > Cc: Kan Liang <kan.liang@xxxxxxxxx> > Cc: Tim Chen <tim.c.chen@xxxxxxxxxxxxxxx> > Cc: Linus Torvalds <torvalds@xxxxxxxxxxxxxxxxxxxx> > Cc: Peter Zijlstra <peterz@xxxxxxxxxxxxx> > Cc: Thomas Gleixner <tglx@xxxxxxxxxxxxx> > Signed-off-by: Sebastian Andrzej Siewior <sebastian@xxxxxxxxxxxxx> > --- > mm/filemap.c | 286 +++++++++++++++++++++++++++++++++++++++++++---------------- > 1 file changed, 208 insertions(+), 78 deletions(-) > > diff --git a/mm/filemap.c b/mm/filemap.c > index 693f62212a59..6f44eaac1a53 100644 > --- a/mm/filemap.c > +++ b/mm/filemap.c > @@ -36,6 +36,7 @@ > #include <linux/cleancache.h> > #include <linux/shmem_fs.h> > #include <linux/rmap.h> > +#include <linux/sched/wake_q.h> > #include "internal.h" > > #define CREATE_TRACE_POINTS > @@ -957,11 +958,15 @@ EXPORT_SYMBOL(__page_cache_alloc); > * at a cost of "thundering herd" phenomena during rare hash > * collisions. > */ > +struct page_wait_rb { > + struct rb_root tree; > + spinlock_t lock; > +}; > + > #define PAGE_WAIT_TABLE_BITS 8 > #define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS) > -static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned; > - > -static wait_queue_head_t *page_waitqueue(struct page *page) > +static struct page_wait_rb page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned; > +static struct page_wait_rb *page_waitqueue(struct page *page) > { > return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)]; > } > @@ -970,77 +975,134 @@ void __init pagecache_init(void) > { > int i; > > - for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++) > - init_waitqueue_head(&page_wait_table[i]); > + for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++) { > + spin_lock_init(&page_wait_table[i].lock); > + page_wait_table[i].tree = RB_ROOT; > + } > > page_writeback_init(); > } > > /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */ > -struct wait_page_key { > - struct page *page; > - int bit_nr; > - int page_match; > -}; > - > struct wait_page_queue { > + struct rb_node node; > struct page *page; > int bit_nr; > - wait_queue_entry_t wait; > + bool one; > + bool dequeued; > + struct task_struct *task; > + struct list_head queue; > + struct list_head head; > }; > > -static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg) > +static int wake_page_match(struct page *page, int bit_nr, > + struct wait_page_queue *page_q, bool *page_match) > { > - struct wait_page_key *key = arg; > - struct wait_page_queue *wait_page > - = container_of(wait, struct wait_page_queue, wait); > - > - if (wait_page->page != key->page) > - return 0; > - key->page_match = 1; > - > - if (wait_page->bit_nr != key->bit_nr) > - return 0; > - > - /* Stop walking if it's locked */ > - if (test_bit(key->bit_nr, &key->page->flags)) > + if ((unsigned long)page < (unsigned long)page_q->page) > return -1; > > - return autoremove_wake_function(wait, mode, sync, key); > + if ((unsigned long)page > (unsigned long)page_q->page) > + return 1; > + > + /* page hit */ > + *page_match = true; > + > + if (bit_nr < page_q->bit_nr) > + return -1; > + > + if (bit_nr > page_q->bit_nr) > + return 1; > + > + return 0; > +} > + > +static struct rb_node *find_wake_page(struct page_wait_rb *rb, > + struct page *page, int bit_nr, > + bool *page_match) > +{ > + struct rb_node *node; > + > + node = rb->tree.rb_node; > + while (node) { > + struct wait_page_queue *page_q; > + int match; > + > + page_q = rb_entry(node, struct wait_page_queue, node); > + match = wake_page_match(page, bit_nr, page_q, page_match); > + > + if (match < 0) > + node = node->rb_left; > + else if (match > 0) > + node = node->rb_right; > + else > + break; > + } > + return node; > +} > + > +static void wake_up_rb(struct page_wait_rb *rb, struct wake_q_head *wake_q, > + struct wait_page_queue *page_q) > +{ > + struct wait_page_queue *next; > + struct rb_node *node = &page_q->node; > + > + while (1) { > + struct task_struct *t; > + bool one; > + > + if (list_empty(&page_q->head)) { > + > + rb_erase(node, &rb->tree); > + RB_CLEAR_NODE(node); > + > + t = READ_ONCE(page_q->task); > + /* full barrier in wake_q_add() */ > + page_q->dequeued = true; > + wake_q_add(wake_q, t); > + break; > + } > + > + next = list_first_entry(&page_q->head, struct wait_page_queue, > + queue); > + > + list_del_init(&next->queue); > + list_splice_init(&page_q->head, &next->head); > + > + rb_replace_node(node, &next->node, &rb->tree); > + RB_CLEAR_NODE(node); > + t = READ_ONCE(page_q->task); > + one = READ_ONCE(page_q->one); > + > + /* full barrier in wake_q_add() */ > + page_q->dequeued = true; > + wake_q_add(wake_q, t); > + if (one == true) > + break; > + page_q = next; > + node = &page_q->node; > + } > } > > static void wake_up_page_bit(struct page *page, int bit_nr) > { > - wait_queue_head_t *q = page_waitqueue(page); > - struct wait_page_key key; > + struct page_wait_rb *rb = page_waitqueue(page); > + struct wait_page_queue *page_q; > + struct rb_node *node; > unsigned long flags; > - wait_queue_entry_t bookmark; > + bool page_match = false; > + DEFINE_WAKE_Q(wake_q); > > - key.page = page; > - key.bit_nr = bit_nr; > - key.page_match = 0; > + spin_lock_irqsave(&rb->lock, flags); > > - bookmark.flags = 0; > - bookmark.private = NULL; > - bookmark.func = NULL; > - INIT_LIST_HEAD(&bookmark.entry); > + node = find_wake_page(rb, page, bit_nr, &page_match); > + if (node) { > > - spin_lock_irqsave(&q->lock, flags); > - __wake_up_locked_key_bookmark(q, TASK_NORMAL, &key, &bookmark); > - > - while (bookmark.flags & WQ_FLAG_BOOKMARK) { > - /* > - * Take a breather from holding the lock, > - * allow pages that finish wake up asynchronously > - * to acquire the lock and remove themselves > - * from wait queue > - */ > - spin_unlock_irqrestore(&q->lock, flags); > - cpu_relax(); > - spin_lock_irqsave(&q->lock, flags); > - __wake_up_locked_key_bookmark(q, TASK_NORMAL, &key, &bookmark); > + page_q = rb_entry(node, struct wait_page_queue, node); > + /* Stop walking if it's locked */ > + if (test_bit(bit_nr, &page->flags)) > + goto no_wakeup; > + wake_up_rb(rb, &wake_q, page_q); > } > - > /* > * It is possible for other pages to have collided on the waitqueue > * hash, so in that case check for a page match. That prevents a long- > @@ -1050,7 +1112,7 @@ static void wake_up_page_bit(struct page *page, int bit_nr) > * and removed them from the waitqueue, but there are still other > * page waiters. > */ > - if (!waitqueue_active(q) || !key.page_match) { > + if (!page_match || RB_EMPTY_ROOT(&rb->tree)) { > ClearPageWaiters(page); > /* > * It's possible to miss clearing Waiters here, when we woke > @@ -1060,7 +1122,10 @@ static void wake_up_page_bit(struct page *page, int bit_nr) > * That's okay, it's a rare case. The next waker will clear it. > */ > } > - spin_unlock_irqrestore(&q->lock, flags); > +no_wakeup: > + > + spin_unlock_irqrestore(&rb->lock, flags); > + wake_up_q(&wake_q); > } > > static void wake_up_page(struct page *page, int bit) > @@ -1070,30 +1135,63 @@ static void wake_up_page(struct page *page, int bit) > wake_up_page_bit(page, bit); > } > > -static inline int wait_on_page_bit_common(wait_queue_head_t *q, > - struct page *page, int bit_nr, int state, bool lock) > +static int wait_on_page_bit_common(struct page_wait_rb *rb, struct page *page, > + int bit_nr, int state, bool lock) > { > - struct wait_page_queue wait_page; > - wait_queue_entry_t *wait = &wait_page.wait; > + struct wait_page_queue page_q; > + struct rb_node *node = &page_q.node; > + struct rb_node **p; > + struct rb_node *parent; > int ret = 0; > > - init_wait(wait); > - wait->flags = lock ? WQ_FLAG_EXCLUSIVE : 0; > - wait->func = wake_page_function; > - wait_page.page = page; > - wait_page.bit_nr = bit_nr; > + page_q.page = page; > + page_q.bit_nr = bit_nr; > + page_q.task = current; > + page_q.one = lock; > + INIT_LIST_HEAD(&page_q.queue); > + INIT_LIST_HEAD(&page_q.head); > + RB_CLEAR_NODE(&page_q.node); > > for (;;) { > - spin_lock_irq(&q->lock); > + spin_lock_irq(&rb->lock); > > - if (likely(list_empty(&wait->entry))) { > - __add_wait_queue_entry_tail(q, wait); > - SetPageWaiters(page); > + if (likely(RB_EMPTY_NODE(node)) && > + list_empty(&page_q.queue)) { > + > + page_q.dequeued = false; > + > + p = &rb->tree.rb_node; > + parent = NULL; > + while (*p) { > + struct wait_page_queue *tmp; > + int match; > + bool page_match; > + > + parent = *p; > + tmp = rb_entry(parent, struct wait_page_queue, node); > + > + match = wake_page_match(page, bit_nr, tmp, &page_match); > + > + if (match < 0) { > + p = &parent->rb_left; > + > + } else if (match > 0) { > + p = &parent->rb_right; > + } else { > + list_add_tail(&page_q.queue, > + &tmp->head); > + break; > + } > + } > + if (list_empty(&page_q.queue)) { > + rb_link_node(node, parent, p); > + rb_insert_color(node, &rb->tree); > + } > } > - > + SetPageWaiters(page); > set_current_state(state); > > - spin_unlock_irq(&q->lock); > + spin_unlock_irq(&rb->lock); > > if (likely(test_bit(bit_nr, &page->flags))) { > io_schedule(); > @@ -1112,8 +1210,34 @@ static inline int wait_on_page_bit_common(wait_queue_head_t *q, > break; > } > } > + __set_current_state(TASK_RUNNING); > > - finish_wait(q, wait); > + /* paired with the full barrier in wake_q_add() */ > + smp_rmb(); > + if (!page_q.dequeued) { > + spin_lock_irq(&rb->lock); > + > + if (!list_empty(&page_q.queue)) > + list_del_init(&page_q.queue); > + > + if (!list_empty(&page_q.head)) { > + struct wait_page_queue *tmp; > + > + BUG_ON(RB_EMPTY_NODE(node)); > + > + tmp = list_first_entry(&page_q.head, > + struct wait_page_queue, > + queue); > + list_del_init(&tmp->queue); > + list_splice(&page_q.head, &tmp->head); > + > + rb_replace_node(node, &tmp->node, &rb->tree); > + > + } else if (!RB_EMPTY_NODE(node)) { > + rb_erase(node, &rb->tree); > + } > + spin_unlock_irq(&rb->lock); > + } > > /* > * A signal could leave PageWaiters set. Clearing it here if > @@ -1128,18 +1252,21 @@ static inline int wait_on_page_bit_common(wait_queue_head_t *q, > > void wait_on_page_bit(struct page *page, int bit_nr) > { > - wait_queue_head_t *q = page_waitqueue(page); > - wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false); > + struct page_wait_rb *rb = page_waitqueue(page); > + > + wait_on_page_bit_common(rb, page, bit_nr, TASK_UNINTERRUPTIBLE, false); > } > EXPORT_SYMBOL(wait_on_page_bit); > > int wait_on_page_bit_killable(struct page *page, int bit_nr) > { > - wait_queue_head_t *q = page_waitqueue(page); > - return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false); > + struct page_wait_rb *rb = page_waitqueue(page); > + > + return wait_on_page_bit_common(rb, page, bit_nr, TASK_KILLABLE, false); > } > EXPORT_SYMBOL(wait_on_page_bit_killable); > > +#if 0 > /** > * add_page_wait_queue - Add an arbitrary waiter to a page's wait queue > * @page: Page defining the wait queue of interest > @@ -1158,6 +1285,7 @@ void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter) > spin_unlock_irqrestore(&q->lock, flags); > } > EXPORT_SYMBOL_GPL(add_page_wait_queue); > +#endif > > #ifndef clear_bit_unlock_is_negative_byte > > @@ -1268,16 +1396,18 @@ EXPORT_SYMBOL_GPL(page_endio); > void __lock_page(struct page *__page) > { > struct page *page = compound_head(__page); > - wait_queue_head_t *q = page_waitqueue(page); > - wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true); > + struct page_wait_rb *rb = page_waitqueue(page); > + > + wait_on_page_bit_common(rb, page, PG_locked, TASK_UNINTERRUPTIBLE, true); > } > EXPORT_SYMBOL(__lock_page); > > int __lock_page_killable(struct page *__page) > { > struct page *page = compound_head(__page); > - wait_queue_head_t *q = page_waitqueue(page); > - return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true); > + struct page_wait_rb *rb = page_waitqueue(page); > + > + return wait_on_page_bit_common(rb, page, PG_locked, TASK_KILLABLE, true); > } > EXPORT_SYMBOL_GPL(__lock_page_killable); > >