Introduce a place holder page for the radix tree. mm/filemap.c is changed to wait on these before adding a page into the page cache, and truncates are changed to wait for all of the place holder pages to disappear. Place holder pages can only be tested or looked at with the mapping lock held, and only PagePlaceHolder(page) can be trusted. They cannot be locked, and cannot have references increased or decreased on them. Signed-off-by: Chris Mason <chris.mason@xxxxxxxxxx> diff -r 18a9e9f5c707 include/linux/mm.h --- a/include/linux/mm.h Thu Oct 19 08:30:00 2006 +0700 +++ b/include/linux/mm.h Tue Oct 24 15:10:48 2006 -0400 @@ -276,6 +276,7 @@ static inline void get_page(struct page if (unlikely(PageCompound(page))) page = (struct page *)page_private(page); VM_BUG_ON(atomic_read(&page->_count) == 0); + VM_BUG_ON(PagePlaceHolder(page)); atomic_inc(&page->_count); } diff -r 18a9e9f5c707 include/linux/page-flags.h --- a/include/linux/page-flags.h Thu Oct 19 08:30:00 2006 +0700 +++ b/include/linux/page-flags.h Tue Oct 24 15:10:48 2006 -0400 @@ -267,4 +266,6 @@ static inline void set_page_writeback(st test_set_page_writeback(page); } +void set_page_placeholder(struct page *page); + #endif /* PAGE_FLAGS_H */ diff -r 18a9e9f5c707 include/linux/pagemap.h --- a/include/linux/pagemap.h Thu Oct 19 08:30:00 2006 +0700 +++ b/include/linux/pagemap.h Tue Oct 24 15:10:48 2006 -0400 @@ -72,6 +72,12 @@ extern struct page * find_get_page(struc unsigned long index); extern struct page * find_lock_page(struct address_space *mapping, unsigned long index); +int find_or_insert_placeholders(struct address_space *mapping, + struct page **pages, unsigned long start, + unsigned long end, unsigned long nr, + gfp_t gfp_mask, + struct page *insert, + int wait); extern __deprecated_for_modules struct page * find_trylock_page( struct address_space *mapping, unsigned long index); extern struct page * find_or_create_page(struct address_space *mapping, @@ -82,6 +88,16 @@ unsigned find_get_pages_contig(struct ad unsigned int nr_pages, struct page **pages); unsigned find_get_pages_tag(struct address_space *mapping, pgoff_t *index, int tag, unsigned int nr_pages, struct page **pages); +void remove_placeholder_pages(struct address_space *mapping, + struct page **pages, + struct page *placeholder, + unsigned long offset, + unsigned long end, + unsigned long nr); +void wake_up_placeholder_page(struct page *page); +void wait_on_placeholder_pages_range(struct address_space *mapping, pgoff_t start, + pgoff_t end); + /* * Returns locked page at given index in given cache, creating it if needed. diff -r 18a9e9f5c707 include/linux/radix-tree.h --- a/include/linux/radix-tree.h Thu Oct 19 08:30:00 2006 +0700 +++ b/include/linux/radix-tree.h Tue Oct 24 15:10:48 2006 -0400 @@ -56,6 +56,7 @@ radix_tree_gang_lookup(struct radix_tree radix_tree_gang_lookup(struct radix_tree_root *root, void **results, unsigned long first_index, unsigned int max_items); int radix_tree_preload(gfp_t gfp_mask); +int radix_tree_needs_preload(void); void radix_tree_init(void); void *radix_tree_tag_set(struct radix_tree_root *root, unsigned long index, unsigned int tag); diff -r 18a9e9f5c707 lib/radix-tree.c --- a/lib/radix-tree.c Thu Oct 19 08:30:00 2006 +0700 +++ b/lib/radix-tree.c Tue Oct 24 15:10:48 2006 -0400 @@ -139,6 +139,19 @@ out: out: return ret; } + +/* + * Must be called inside the preload pair. This tells you if another + * run of radix_tree_preload is required, allowing the caller to decide + * if a repeated insert will succeed + */ +int radix_tree_needs_preload(void) +{ + struct radix_tree_preload *rtp; + rtp = &__get_cpu_var(radix_tree_preloads); + return rtp->nr < ARRAY_SIZE(rtp->nodes); +} +EXPORT_SYMBOL_GPL(radix_tree_needs_preload); static inline void tag_set(struct radix_tree_node *node, unsigned int tag, int offset) diff -r 18a9e9f5c707 mm/filemap.c --- a/mm/filemap.c Thu Oct 19 08:30:00 2006 +0700 +++ b/mm/filemap.c Tue Oct 24 15:10:48 2006 -0400 @@ -44,6 +44,25 @@ generic_file_direct_IO(int rw, struct ki generic_file_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t offset, unsigned long nr_segs); +static wait_queue_head_t *page_waitqueue(struct page *page); +static void wait_on_placeholder_page(struct address_space *mapping, + struct page *page, + int write_lock); + +static struct address_space placeholder_address_space; +#define PagePlaceHolder(page) ((page)->mapping == &placeholder_address_space) + +/* + * Turns the page struct into a marker for a placeholder page. + * This should not be called with a real page, only a struct page stub. + */ +void set_page_placeholder(struct page *page) +{ + memset(page, 0, sizeof(*page)); + page->mapping = &placeholder_address_space; +} +EXPORT_SYMBOL_GPL(set_page_placeholder); + /* * Shared mappings implemented 30.11.1994. It's not fully working yet, * though. @@ -437,12 +456,24 @@ int add_to_page_cache(struct page *page, int add_to_page_cache(struct page *page, struct address_space *mapping, pgoff_t offset, gfp_t gfp_mask) { - int error = radix_tree_preload(gfp_mask & ~__GFP_HIGHMEM); + int error; +again: + error = radix_tree_preload(gfp_mask & ~__GFP_HIGHMEM); if (error == 0) { write_lock_irq(&mapping->tree_lock); error = radix_tree_insert(&mapping->page_tree, offset, page); - if (!error) { + if (error == -EEXIST && (gfp_mask & __GFP_WAIT)) { + struct page *tmp; + tmp = radix_tree_lookup(&mapping->page_tree, offset); + if (tmp && PagePlaceHolder(tmp)) { + radix_tree_preload_end(); + /* drops the lock */ + wait_on_placeholder_page(mapping, tmp, 1); + goto again; + } + } + if (!error && !PagePlaceHolder(page)) { page_cache_get(page); SetPageLocked(page); page->mapping = mapping; @@ -526,6 +557,79 @@ void fastcall wait_on_page_bit(struct pa } EXPORT_SYMBOL(wait_on_page_bit); +/* + * Call with either a read lock or a write lock on the mapping tree. + * The lock is dropped just before waiting for the place holder + */ +static void wait_on_placeholder_page(struct address_space *mapping, + struct page *page, + int write_lock) +{ + DEFINE_WAIT(wait); + wait_queue_head_t *wqh = page_waitqueue(page); + prepare_to_wait(wqh, &wait, TASK_UNINTERRUPTIBLE); + if (write_lock) + write_unlock_irq(&mapping->tree_lock); + else + read_unlock_irq(&mapping->tree_lock); + io_schedule(); + finish_wait(wqh, &wait); +} + +void wake_up_placeholder_page(struct page *page) +{ + __wake_up_bit(page_waitqueue(page), &page->flags, PG_locked); +} +EXPORT_SYMBOL_GPL(wake_up_placeholder_page); + +/** + * wait_on_placeholder_pages - gang placeholder page waiter + * @mapping: The address_space to search + * @start: The starting page index + * @end: The max page index + * + * wait_on_placeholder_pages() will search for and wait on a range of pages + * in the mapping + * + * On return, the range has no placeholder pages sitting in it. + */ +void wait_on_placeholder_pages_range(struct address_space *mapping, + pgoff_t start, pgoff_t end) +{ + unsigned int i; + unsigned int ret; + struct page *pages[8]; + pgoff_t cur = start; + pgoff_t highest = start; + + /* + * we expect a very small number of place holder pages, so + * this code isn't trying to be very fast. + */ +again: + read_lock_irq(&mapping->tree_lock); + ret = radix_tree_gang_lookup(&mapping->page_tree, + (void **)pages, cur, ARRAY_SIZE(pages)); + for (i = 0; i < ret; i++) { + if (PagePlaceHolder(pages[i])) { + /* drops the lock */ + wait_on_placeholder_page(mapping, pages[i], 0); + goto again; + } else if (pages[i]->index > highest) + highest = pages[i]->index; + } + if (highest < end && ret == ARRAY_SIZE(pages)) { + cur = highest; + if (need_resched()) { + read_unlock_irq(&mapping->tree_lock); + cond_resched(); + } + goto again; + } + read_unlock_irq(&mapping->tree_lock); +} +EXPORT_SYMBOL_GPL(wait_on_placeholder_pages_range); + /** * unlock_page - unlock a locked page * @page: the page @@ -542,6 +646,7 @@ EXPORT_SYMBOL(wait_on_page_bit); */ void fastcall unlock_page(struct page *page) { + BUG_ON(PagePlaceHolder(page)); smp_mb__before_clear_bit(); if (!TestClearPageLocked(page)) BUG(); @@ -578,6 +683,7 @@ void fastcall __lock_page(struct page *p { DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); + BUG_ON(PagePlaceHolder(page)); __wait_on_bit_lock(page_waitqueue(page), &wait, sync_page, TASK_UNINTERRUPTIBLE); } @@ -590,6 +696,7 @@ void fastcall __lock_page_nosync(struct void fastcall __lock_page_nosync(struct page *page) { DEFINE_WAIT_BIT(wait, &page->flags, PG_locked); + BUG_ON(PagePlaceHolder(page)); __wait_on_bit_lock(page_waitqueue(page), &wait, __sleep_on_page_lock, TASK_UNINTERRUPTIBLE); } @@ -608,12 +715,262 @@ struct page * find_get_page(struct addre read_lock_irq(&mapping->tree_lock); page = radix_tree_lookup(&mapping->page_tree, offset); - if (page) - page_cache_get(page); + if (page) { + if (PagePlaceHolder(page)) + page = NULL; + else + page_cache_get(page); + } read_unlock_irq(&mapping->tree_lock); return page; } EXPORT_SYMBOL(find_get_page); + +/** + * remove_placeholder_pages - remove a range of placeholder or locked pages + * @mapping: the page's address_space + * @pages: an array of page pointers to use for gang looukps + * @placeholder: the placeholder page previously inserted (for verification) + * @start: the search starting point + * @end: the search end point (offsets >= end are not touched) + * @nr: the size of the pages array. + * + * Any placeholder pages in the range specified are removed. Any real + * pages are unlocked and released. + */ +void remove_placeholder_pages(struct address_space *mapping, + struct page **pages, + struct page *placeholder, + unsigned long start, + unsigned long end, + unsigned long nr) +{ + struct page *page; + int ret; + int i; + unsigned long num; + + write_lock_irq(&mapping->tree_lock); + while (start < end) { + num = min(nr, end-start); + ret = radix_tree_gang_lookup(&mapping->page_tree, + (void **)pages, start, num); + BUG_ON(ret != num); + for (i = 0; i < ret; i++) { + page = pages[i]; + if (PagePlaceHolder(page)) { + BUG_ON(page != placeholder); + radix_tree_delete(&mapping->page_tree, + start + i); + } else { + BUG_ON(page->index >= end); + unlock_page(page); + page_cache_release(page); + } + } + start += ret; + if (need_resched()) { + write_unlock_irq(&mapping->tree_lock); + cond_resched(); + write_lock_irq(&mapping->tree_lock); + } + } + write_unlock_irq(&mapping->tree_lock); + if (placeholder) + wake_up_placeholder_page(placeholder); +} +EXPORT_SYMBOL_GPL(remove_placeholder_pages); + +/* + * a helper function to insert a placeholder into multiple slots + * in the radix tree. This could probably use an optimized version + * in the radix code. It may insert fewer than the request number + * of placeholders if we need to reschedule or the radix tree needs to + * be preloaded again. + * + * returns zero on error or the number actually inserted. + */ +static unsigned long insert_placeholders(struct address_space *mapping, + struct page *insert, + unsigned long start, unsigned long nr) +{ + int err; + unsigned long i; + for (i = 0 ; i < nr; i++) { + err = radix_tree_insert(&mapping->page_tree, start + i, + insert); + if (err) + return i; + if (radix_tree_needs_preload() || (i && need_resched())) + return i + 1; + } + return i; +} + + +/** + * find_or_insert_placeholders - locate a group of pagecache pages or insert one + * @mapping: the page's address_space + * @pages: an array of page pointers to use for gang looukps + * @start: the search starting point + * @end: the search end point (offsets >= end are not touched) + * @nr: the size of the pages array. + * @gfp_mask: page allocation mode + * @insert: the page to insert if none is found + * @iowait: 1 if you want to wait for dirty or writeback pages. + * + * This locks down a range of offsets in the address space. Any pages + * already present are locked and a placeholder page is inserted into + * the radix tree for any offsets without pages. + */ +int find_or_insert_placeholders(struct address_space *mapping, + struct page **pages, unsigned long start, + unsigned long end, unsigned long nr, + gfp_t gfp_mask, + struct page *insert, + int iowait) +{ + int err = 0; + int i, ret; + unsigned long cur = start; + unsigned long ins; + struct page *page; + int restart; + + /* + * this gets complicated. Placeholders and page locks need to + * be taken in order. We use gang lookup to cut down on the cpu + * cost, but we need to keep track of holes in the results and + * insert placeholders as appropriate. + * + * If a locked page or a placeholder is found, we wait for it and + * pick up where we left off. If a dirty or PG_writeback page is found + * and iowait==1, we have to drop all of our locks, kick/wait for the + * io and start over. + */ +repeat: + if (cur != start ) + cond_resched(); + err = radix_tree_preload(gfp_mask & ~__GFP_HIGHMEM); + if (err) + goto fail; + write_lock_irq(&mapping->tree_lock); +repeat_lock: + ret = radix_tree_gang_lookup(&mapping->page_tree, + (void **)pages, cur, + min(nr, end-cur)); + for (i = 0 ; i < ret ; i++) { + restart = 0; + page = pages[i]; + + if (PagePlaceHolder(page)) { + radix_tree_preload_end(); + /* drops the lock */ + wait_on_placeholder_page(mapping, page, 1); + goto repeat; + } + + if (page->index > cur) { + unsigned long top = min(end, page->index); + ins = insert_placeholders(mapping, insert, cur, + top - cur); + cur += ins; + if (radix_tree_needs_preload() || ins != top - cur) { + write_unlock_irq(&mapping->tree_lock); + radix_tree_preload_end(); + if (ins == 0) { + err = -1; + goto fail; + } + goto repeat; + } + } + if (page->index >= end) { + ret = 0; + break; + } + page_cache_get(page); + if (TestSetPageLocked(page)) { + unsigned long tmpoff = page->index; + page_cache_get(page); + write_unlock_irq(&mapping->tree_lock); + radix_tree_preload_end(); + __lock_page(page); + /* Has the page been truncated while we slept? */ + if (unlikely(page->mapping != mapping || + page->index != tmpoff)) { + unlock_page(page); + page_cache_release(page); + goto repeat; + } else { + /* we've locked the page, but we need to + * check it for dirty/writeback + */ + restart = 1; + } + } + if (iowait && (PageDirty(page) || PageWriteback(page))) { + /* + * mapge_writepages collects a number of pages + * into a bio, marking them PG_writeback as it goes. + * It also locks pages while deciding what to do + * with them. What we want to do is have a bunch + * of pages locked, and then wait for any io to + * finish. But if mpage_writepages is waiting for + * one of our locked pages, it may never start the + * io on the bio it has created. + * + * so, we have to drop all of our locks and + * start over + */ + unlock_page(page); + page_cache_release(page); + if (!restart) { + write_unlock_irq(&mapping->tree_lock); + radix_tree_preload_end(); + } + remove_placeholder_pages(mapping, pages, insert, + start, cur, nr); + filemap_write_and_wait_range(mapping, + start << PAGE_CACHE_SHIFT, + end << PAGE_CACHE_SHIFT); + cur = start; + goto repeat; + } + cur++; + if (restart) + goto repeat; + if (cur >= end) + break; + } + + /* we haven't yet filled the range */ + if (cur < end) { + /* if the search filled our array, there is more to do. */ + if (ret && ret == nr) + goto repeat_lock; + + /* otherwise insert placeholders for the remaining offsets */ + ins = insert_placeholders(mapping, insert, cur, end - cur); + cur += ins; + if (ins != end -cur) { + write_unlock_irq(&mapping->tree_lock); + radix_tree_preload_end(); + if (ins == 0) { + err = -1; + goto fail; + } + goto repeat; + } + } + write_unlock_irq(&mapping->tree_lock); + radix_tree_preload_end(); + return err; +fail: + remove_placeholder_pages(mapping, pages, insert, start, cur, nr); + return err; +} +EXPORT_SYMBOL_GPL(find_or_insert_placeholders); /** * find_trylock_page - find and lock a page @@ -628,7 +985,7 @@ struct page *find_trylock_page(struct ad read_lock_irq(&mapping->tree_lock); page = radix_tree_lookup(&mapping->page_tree, offset); - if (page && TestSetPageLocked(page)) + if (page && (PagePlaceHolder(page) || TestSetPageLocked(page))) page = NULL; read_unlock_irq(&mapping->tree_lock); return page; @@ -654,6 +1011,12 @@ repeat: repeat: page = radix_tree_lookup(&mapping->page_tree, offset); if (page) { + if (PagePlaceHolder(page)) { + /* drops the lock */ + wait_on_placeholder_page(mapping, page, 0); + read_lock_irq(&mapping->tree_lock); + goto repeat; + } page_cache_get(page); if (TestSetPageLocked(page)) { read_unlock_irq(&mapping->tree_lock); @@ -737,14 +1100,25 @@ unsigned find_get_pages(struct address_s unsigned find_get_pages(struct address_space *mapping, pgoff_t start, unsigned int nr_pages, struct page **pages) { - unsigned int i; + unsigned int i = 0; unsigned int ret; read_lock_irq(&mapping->tree_lock); ret = radix_tree_gang_lookup(&mapping->page_tree, (void **)pages, start, nr_pages); - for (i = 0; i < ret; i++) - page_cache_get(pages[i]); + while(i < ret) { + if (PagePlaceHolder(pages[i])) { + /* we can't return a place holder, shift it away */ + if (i + 1 < ret) { + memcpy(&pages[i], &pages[i+1], + (ret - i - 1) * sizeof(struct page *)); + } + ret--; + continue; + } else + page_cache_get(pages[i]); + i++; + } read_unlock_irq(&mapping->tree_lock); return ret; } @@ -771,6 +1145,8 @@ unsigned find_get_pages_contig(struct ad ret = radix_tree_gang_lookup(&mapping->page_tree, (void **)pages, index, nr_pages); for (i = 0; i < ret; i++) { + if (PagePlaceHolder(pages[i])) + break; if (pages[i]->mapping == NULL || pages[i]->index != index) break; @@ -795,14 +1171,25 @@ unsigned find_get_pages_tag(struct addre unsigned find_get_pages_tag(struct address_space *mapping, pgoff_t *index, int tag, unsigned int nr_pages, struct page **pages) { - unsigned int i; + unsigned int i = 0; unsigned int ret; read_lock_irq(&mapping->tree_lock); ret = radix_tree_gang_lookup_tag(&mapping->page_tree, (void **)pages, *index, nr_pages, tag); - for (i = 0; i < ret; i++) - page_cache_get(pages[i]); + while(i < ret) { + if (PagePlaceHolder(pages[i])) { + /* we can't return a place holder, shift it away */ + if (i + 1 < ret) { + memcpy(&pages[i], &pages[i+1], + (ret - i - 1) * sizeof(struct page *)); + } + ret--; + continue; + } else + page_cache_get(pages[i]); + i++; + } if (ret) *index = pages[ret - 1]->index + 1; read_unlock_irq(&mapping->tree_lock); diff -r 18a9e9f5c707 mm/truncate.c --- a/mm/truncate.c Thu Oct 19 08:30:00 2006 +0700 +++ b/mm/truncate.c Tue Oct 24 15:10:48 2006 -0400 @@ -207,6 +207,7 @@ void truncate_inode_pages_range(struct a } pagevec_release(&pvec); } + wait_on_placeholder_pages_range(mapping, start, end); } EXPORT_SYMBOL(truncate_inode_pages_range); - To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html