On Thu, May 15, 2014 at 11:48:09AM +0100, Mel Gorman wrote: > +static inline wait_queue_head_t *clear_page_waiters(struct page *page) > { > + wait_queue_head_t *wqh = NULL; > + > + if (!PageWaiters(page)) > + return NULL; > + > + /* > + * Prepare to clear PG_waiters if the waitqueue is no longer > + * active. Note that there is no guarantee that a page with no > + * waiters will get cleared as there may be unrelated pages > + * sleeping on the same page wait queue. Accurate detection > + * would require a counter. In the event of a collision, the > + * waiter bit will dangle and lookups will be required until > + * the page is unlocked without collisions. The bit will need to > + * be cleared before freeing to avoid triggering debug checks. > + * > + * Furthermore, this can race with processes about to sleep on > + * the same page if it adds itself to the waitqueue just after > + * this check. The timeout in sleep_on_page prevents the race > + * being a terminal one. In effect, the uncontended and non-race > + * cases are faster in exchange for occasional worst case of the > + * timeout saving us. > + */ > + wqh = page_waitqueue(page); > + if (!waitqueue_active(wqh)) > + ClearPageWaiters(page); > + > + return wqh; > +} So clear_page_waiters() is I think a bad name for this function, for one it doesn't relate to returning a wait_queue_head. Secondly, I think the clear condition is wrong, if I understand the rest of the code correctly we'll keep PageWaiters set until the above condition, which is not a single waiter on the waitqueue. Would it not make much more sense to clear the page when there are no more waiters of this page? For the case where there are no waiters at all, this is the same condition, but in case there's a hash collision and there's other pages waiting, we'll iterate the lot anyway, so we might as well clear it there. > +/* Returns true if the page is locked */ > +static inline bool prepare_wait_bit(struct page *page, wait_queue_head_t *wqh, > + wait_queue_t *wq, int state, int bit_nr, bool exclusive) > +{ > + > + /* Set PG_waiters so a racing unlock_page will check the waitiqueue */ > + if (!PageWaiters(page)) > + SetPageWaiters(page); > + > + if (exclusive) > + prepare_to_wait_exclusive(wqh, wq, state); > + else > + prepare_to_wait(wqh, wq, state); > + return test_bit(bit_nr, &page->flags); > } > > void wait_on_page_bit(struct page *page, int bit_nr) > { > + wait_queue_head_t *wqh; > DEFINE_WAIT_BIT(wait, &page->flags, bit_nr); > > + if (!test_bit(bit_nr, &page->flags)) > + return; > + wqh = page_waitqueue(page); > + > + do { > + if (prepare_wait_bit(page, wqh, &wait.wait, TASK_KILLABLE, bit_nr, false)) > + sleep_on_page_killable(page); > + } while (test_bit(bit_nr, &page->flags)); > + finish_wait(wqh, &wait.wait); > } > EXPORT_SYMBOL(wait_on_page_bit); Afaict, after this patch, wait_on_page_bit() is only used by wait_on_page_writeback(), and might I ask why that needs the PageWaiter set? > int wait_on_page_bit_killable(struct page *page, int bit_nr) > { > + wait_queue_head_t *wqh; > DEFINE_WAIT_BIT(wait, &page->flags, bit_nr); > + int ret = 0; > > if (!test_bit(bit_nr, &page->flags)) > return 0; > + wqh = page_waitqueue(page); > + > + do { > + if (prepare_wait_bit(page, wqh, &wait.wait, TASK_KILLABLE, bit_nr, false)) > + ret = sleep_on_page_killable(page); > + } while (!ret && test_bit(bit_nr, &page->flags)); > + finish_wait(wqh, &wait.wait); > > + return ret; > } The only user of wait_on_page_bit_killable() _was_ wait_on_page_locked_killable(), but you've just converted that to use __wait_on_page_bit_killable(). So we can scrap this function. > /** > @@ -721,6 +785,8 @@ void add_page_wait_queue(struct page *page, wait_queue_t *waiter) > unsigned long flags; > > spin_lock_irqsave(&q->lock, flags); > + if (!PageWaiters(page)) > + SetPageWaiters(page); > __add_wait_queue(q, waiter); > spin_unlock_irqrestore(&q->lock, flags); > } What does add_page_wait_queue() do and why does it need PageWaiters? > @@ -740,10 +806,29 @@ EXPORT_SYMBOL_GPL(add_page_wait_queue); > */ > void unlock_page(struct page *page) > { > + wait_queue_head_t *wqh = clear_page_waiters(page); > + > VM_BUG_ON_PAGE(!PageLocked(page), page); > + > + /* > + * clear_bit_unlock is not necessary in this case as there is no > + * need to strongly order the clearing of PG_waiters and PG_locked. > + * The smp_mb__after_atomic() barrier is still required for RELEASE > + * semantics as there is no guarantee that a wakeup will take place > + */ > + clear_bit(PG_locked, &page->flags); > smp_mb__after_atomic(); If you need RELEASE, use _unlock() because that's exactly what it does. > + > + /* > + * Wake the queue if waiters were detected. Ordinarily this wakeup > + * would be unconditional to catch races between the lock bit being > + * set and a new process joining the queue. However, that would > + * require the waitqueue to be looked up every time. Instead we > + * optimse for the uncontended and non-race case and recover using > + * a timeout in sleep_on_page. > + */ > + if (wqh) > + __wake_up_bit(wqh, &page->flags, PG_locked); And the only reason we're not clearing PageWaiters under q->lock is to skimp on the last contended unlock_page() ? > } > EXPORT_SYMBOL(unlock_page); > > @@ -795,22 +884,69 @@ EXPORT_SYMBOL_GPL(page_endio); > */ > void __lock_page(struct page *page) > { > + wait_queue_head_t *wqh = page_waitqueue(page); > DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); > > + do { > + if (prepare_wait_bit(page, wqh, &wait.wait, TASK_UNINTERRUPTIBLE, PG_locked, true)) > + sleep_on_page(page); > + } while (!trylock_page(page)); > + > + finish_wait(wqh, &wait.wait); > } So I suppose I'm failing to see the problem with something like: extern void __lock_page(struct page *); extern void __unlock_page(struct page *); static inline void lock_page(struct page *page) { if (!trylock_page(page)) __lock_page(page); } static inline void unlock_page(struct page *page) { clear_bit_unlock(&page->flags, PG_locked); if (PageWaiters(page)) __unlock_page(); } void __lock_page(struct page *page) { struct wait_queue_head_t *wqh = page_waitqueue(page); DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); spin_lock_irq(&wqh->lock); if (!PageWaiters(page)) SetPageWaiters(page); wait.flags |= WQ_FLAG_EXCLUSIVE; preempt_disable(); do { if (list_empty(&wait->task_list)) __add_wait_queue_tail(wqh, &wait); set_current_state(TASK_UNINTERRUPTIBLE); if (test_bit(wait.key.bit_nr, wait.key.flags)) { spin_unlock_irq(&wqh->lock); schedule_preempt_disabled(); spin_lock_irq(&wqh->lock); } } while (!trylock_page(page)); __remove_wait_queue(wqh, &wait); __set_current_state(TASK_RUNNING); preempt_enable(); spin_unlock_irq(&wqh->lock); } void __unlock_page(struct page *page) { struct wait_bit_key key = __WAIT_BIT_KEY_INITIALIZER(&page->flags, PG_locked); struct wait_queue_head_t *wqh = page_waitqueue(page); wait_queue_t *curr; spin_lock_irq(&wqh->lock); list_for_each_entry(curr, &wqh->task_list, task_list) { unsigned int flags = curr->flags; if (curr->func(curr, TASK_NORMAL, 0, &key)) goto unlock; } ClearPageWaiters(page); unlock: spin_unlock_irq(&wqh->lock); } Yes, the __unlock_page() will have the unconditional wqh->lock, but it should also call __unlock_page() a lot less, and it doesn't have that horrid timeout. Now, the above is clearly sub-optimal when !extended_page_flags, but I suppose we could have two versions of __unlock_page() for that.
Attachment:
pgpMB3rp0rCDt.pgp
Description: PGP signature