From: Viacheslav Dubeyko <Slava.Dubeyko@xxxxxxx> First step of ceph_writepages_start() logic is of finding the dirty memory folios and processing it. This patch introduces ceph_process_folio_batch() method that moves this logic into dedicated method. The ceph_writepages_start() has this logic: if (ceph_wbc.locked_pages == 0) lock_page(page); /* first page */ else if (!trylock_page(page)) break; <skipped> if (folio_test_writeback(folio) || folio_test_private_2(folio) /* [DEPRECATED] */) { if (wbc->sync_mode == WB_SYNC_NONE) { doutc(cl, "%p under writeback\n", folio); folio_unlock(folio); continue; } doutc(cl, "waiting on writeback %p\n", folio); folio_wait_writeback(folio); folio_wait_private_2(folio); /* [DEPRECATED] */ } The problem here that folio/page is locked here at first and it is by set_page_writeback(page) later before submitting the write request. The folio/page is unlocked by writepages_finish() after finishing the write request. It means that logic of checking folio_test_writeback() and folio_wait_writeback() never works because page is locked and it cannot be locked again until write request completion. However, for majority of folios/pages the trylock_page() is used. As a result, multiple threads can try to lock the same folios/pages multiple times even if they are under writeback already. It makes this logic more compute intensive than it is necessary. This patch changes this logic: if (folio_test_writeback(folio) || folio_test_private_2(folio) /* [DEPRECATED] */) { if (wbc->sync_mode == WB_SYNC_NONE) { doutc(cl, "%p under writeback\n", folio); folio_unlock(folio); continue; } doutc(cl, "waiting on writeback %p\n", folio); folio_wait_writeback(folio); folio_wait_private_2(folio); /* [DEPRECATED] */ } if (ceph_wbc.locked_pages == 0) lock_page(page); /* first page */ else if (!trylock_page(page)) break; This logic should exclude the ignoring of writeback state of folios/pages. Signed-off-by: Viacheslav Dubeyko <Slava.Dubeyko@xxxxxxx> --- fs/ceph/addr.c | 568 +++++++++++++++++++++++++++++++------------------ 1 file changed, 365 insertions(+), 203 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index d002ff62d867..739329846a07 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -978,6 +978,27 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_dec_osd_stopping_blocker(fsc->mdsc); } +static inline +bool is_forced_umount(struct address_space *mapping) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + + if (ceph_inode_is_shutdown(inode)) { + if (ci->i_wrbuffer_ref > 0) { + pr_warn_ratelimited_client(cl, + "%llx.%llx %lld forced umount\n", + ceph_vinop(inode), ceph_ino(inode)); + } + mapping_set_error(mapping, -EIO); + return true; + } + + return false; +} + static inline unsigned int ceph_define_write_size(struct address_space *mapping) { @@ -1046,6 +1067,334 @@ void ceph_init_writeback_ctl(struct address_space *mapping, ceph_wbc->data_pages = NULL; } +static inline +int ceph_define_writeback_range(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + + /* find oldest snap context with dirty data */ + ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); + if (!ceph_wbc->snapc) { + /* hmm, why does writepages get called when there + is no dirty data? */ + doutc(cl, " no snap context with dirty data?\n"); + return -ENODATA; + } + + doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", + ceph_wbc->snapc, ceph_wbc->snapc->seq, + ceph_wbc->snapc->num_snaps); + + ceph_wbc->should_loop = false; + + if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { + /* where to start/end? */ + if (wbc->range_cyclic) { + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); + } else { + ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; + ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; + if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) + ceph_wbc->range_whole = true; + doutc(cl, " not cyclic, %lu to %lu\n", + ceph_wbc->index, ceph_wbc->end); + } + } else if (!ceph_wbc->head_snapc) { + /* Do not respect wbc->range_{start,end}. Dirty pages + * in that range can be associated with newer snapc. + * They are not writeable until we write all dirty pages + * associated with 'snapc' get written */ + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " non-head snapc, range whole\n"); + } + + ceph_put_snap_context(ceph_wbc->last_snapc); + ceph_wbc->last_snapc = ceph_wbc->snapc; + + return 0; +} + +static inline +bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; +} + +static inline +bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, + unsigned index) +{ + return index < ceph_wbc->nr_folios && + ceph_wbc->locked_pages < ceph_wbc->max_pages; +} + +static +int ceph_check_page_before_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_snap_context *pgsnapc; + struct page *page = &folio->page; + + /* only dirty pages, or our accounting breaks */ + if (unlikely(!PageDirty(page)) || unlikely(page->mapping != mapping)) { + doutc(cl, "!dirty or !mapping %p\n", page); + return -ENODATA; + } + + /* only if matching snap context */ + pgsnapc = page_snap_context(page); + if (pgsnapc != ceph_wbc->snapc) { + doutc(cl, "page snapc %p %lld != oldest %p %lld\n", + pgsnapc, pgsnapc->seq, + ceph_wbc->snapc, ceph_wbc->snapc->seq); + + if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && + wbc->sync_mode != WB_SYNC_NONE) + ceph_wbc->should_loop = true; + + return -ENODATA; + } + + if (page_offset(page) >= ceph_wbc->i_size) { + doutc(cl, "folio at %lu beyond eof %llu\n", + folio->index, ceph_wbc->i_size); + + if ((ceph_wbc->size_stable || + folio_pos(folio) >= i_size_read(inode)) && + folio_clear_dirty_for_io(folio)) + folio_invalidate(folio, 0, folio_size(folio)); + + return -ENODATA; + } + + if (ceph_wbc->strip_unit_end && + (page->index > ceph_wbc->strip_unit_end)) { + doutc(cl, "end of strip unit %p\n", page); + return -E2BIG; + } + + return 0; +} + +static inline +void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, + unsigned int max_pages) +{ + ceph_wbc->pages = kmalloc_array(max_pages, + sizeof(*ceph_wbc->pages), + GFP_NOFS); + if (!ceph_wbc->pages) { + ceph_wbc->from_pool = true; + ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); + BUG_ON(!ceph_wbc->pages); + } +} + +static inline +void ceph_allocate_page_array(struct address_space *mapping, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + u64 objnum; + u64 objoff; + u32 xlen; + + /* prepare async write request */ + ceph_wbc->offset = (u64)page_offset(page); + ceph_calc_file_object_mapping(&ci->i_layout, + ceph_wbc->offset, ceph_wbc->wsize, + &objnum, &objoff, &xlen); + + ceph_wbc->num_ops = 1; + ceph_wbc->strip_unit_end = page->index + ((xlen - 1) >> PAGE_SHIFT); + + BUG_ON(ceph_wbc->pages); + ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); + + ceph_wbc->len = 0; +} + +static inline +bool is_page_index_contiguous(struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + return page->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; +} + +static inline +bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->num_ops >= + (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); +} + +static inline +bool is_write_congestion_happened(struct ceph_fs_client *fsc) +{ + return atomic_long_inc_return(&fsc->writeback_count) > + CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); +} + +static inline +int ceph_move_dirty_page_in_page_array(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct page **pages = ceph_wbc->pages; + unsigned int index = ceph_wbc->locked_pages; + gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; + + if (IS_ENCRYPTED(inode)) { + pages[index] = fscrypt_encrypt_pagecache_blocks(page, + PAGE_SIZE, + 0, + gfp_flags); + if (IS_ERR(pages[index])) { + if (PTR_ERR(pages[index]) == -EINVAL) { + pr_err_client(cl, "inode->i_blkbits=%hhu\n", + inode->i_blkbits); + } + + /* better not fail on first page! */ + BUG_ON(ceph_wbc->locked_pages == 0); + + pages[index] = NULL; + return PTR_ERR(pages[index]); + } + } else { + pages[index] = page; + } + + ceph_wbc->locked_pages++; + + return 0; +} + +static +int ceph_process_folio_batch(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct folio *folio = NULL; + struct page *page = NULL; + unsigned i; + int rc = 0; + + for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { + folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + + doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " + "folio_test_dirty %#x, folio_test_locked %#x\n", + page, page->index, folio_test_writeback(folio), + folio_test_dirty(folio), + folio_test_locked(folio)); + + if (folio_test_writeback(folio) || + folio_test_private_2(folio) /* [DEPRECATED] */) { + doutc(cl, "waiting on writeback %p\n", folio); + folio_wait_writeback(folio); + folio_wait_private_2(folio); /* [DEPRECATED] */ + continue; + } + + if (ceph_wbc->locked_pages == 0) + lock_page(page); /* first page */ + else if (!trylock_page(page)) + break; + + rc = ceph_check_page_before_write(mapping, wbc, + ceph_wbc, folio); + if (rc == -ENODATA) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } else if (rc == -E2BIG) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + break; + } + + if (!clear_page_dirty_for_io(page)) { + doutc(cl, "%p !clear_page_dirty_for_io\n", page); + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } + + /* + * We have something to write. If this is + * the first locked page this time through, + * calculate max possible write size and + * allocate a page array + */ + if (ceph_wbc->locked_pages == 0) { + ceph_allocate_page_array(mapping, ceph_wbc, page); + } else if (!is_page_index_contiguous(ceph_wbc, page)) { + if (is_num_ops_too_big(ceph_wbc)) { + redirty_page_for_writepage(wbc, page); + unlock_page(page); + break; + } + + ceph_wbc->num_ops++; + ceph_wbc->offset = (u64)page_offset(page); + ceph_wbc->len = 0; + } + + /* note position of first page in fbatch */ + doutc(cl, "%llx.%llx will write page %p idx %lu\n", + ceph_vinop(inode), page, page->index); + + fsc->write_congested = is_write_congestion_happened(fsc); + + rc = ceph_move_dirty_page_in_page_array(mapping, wbc, + ceph_wbc, page); + if (rc) { + redirty_page_for_writepage(wbc, page); + unlock_page(page); + break; + } + + ceph_wbc->fbatch.folios[i] = NULL; + ceph_wbc->len += thp_size(page); + } + + ceph_wbc->processed_in_fbatch = i; + + return rc; +} + /* * initiate async writeback */ @@ -1057,7 +1406,6 @@ static int ceph_writepages_start(struct address_space *mapping, struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; struct ceph_vino vino = ceph_vino(inode); - struct ceph_snap_context *pgsnapc; struct ceph_writeback_ctl ceph_wbc; struct ceph_osd_request *req = NULL; int rc = 0; @@ -1071,235 +1419,49 @@ static int ceph_writepages_start(struct address_space *mapping, wbc->sync_mode == WB_SYNC_NONE ? "NONE" : (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); - if (ceph_inode_is_shutdown(inode)) { - if (ci->i_wrbuffer_ref > 0) { - pr_warn_ratelimited_client(cl, - "%llx.%llx %lld forced umount\n", - ceph_vinop(inode), ceph_ino(inode)); - } - mapping_set_error(mapping, -EIO); - return -EIO; /* we're in a forced umount, don't write! */ + if (is_forced_umount(mapping)) { + /* we're in a forced umount, don't write! */ + return -EIO; } ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); retry: - /* find oldest snap context with dirty data */ - ceph_wbc.snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!ceph_wbc.snapc) { + rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); + if (rc == -ENODATA) { /* hmm, why does writepages get called when there is no dirty data? */ - doutc(cl, " no snap context with dirty data?\n"); + rc = 0; goto out; } - doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", - ceph_wbc.snapc, ceph_wbc.snapc->seq, - ceph_wbc.snapc->num_snaps); - - ceph_wbc.should_loop = false; - if (ceph_wbc.head_snapc && ceph_wbc.snapc != ceph_wbc.last_snapc) { - /* where to start/end? */ - if (wbc->range_cyclic) { - ceph_wbc.index = ceph_wbc.start_index; - ceph_wbc.end = -1; - if (ceph_wbc.index > 0) - ceph_wbc.should_loop = true; - doutc(cl, " cyclic, start at %lu\n", ceph_wbc.index); - } else { - ceph_wbc.index = wbc->range_start >> PAGE_SHIFT; - ceph_wbc.end = wbc->range_end >> PAGE_SHIFT; - if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - ceph_wbc.range_whole = true; - doutc(cl, " not cyclic, %lu to %lu\n", - ceph_wbc.index, ceph_wbc.end); - } - } else if (!ceph_wbc.head_snapc) { - /* Do not respect wbc->range_{start,end}. Dirty pages - * in that range can be associated with newer snapc. - * They are not writeable until we write all dirty pages - * associated with 'snapc' get written */ - if (ceph_wbc.index > 0) - ceph_wbc.should_loop = true; - doutc(cl, " non-head snapc, range whole\n"); - } if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); - ceph_put_snap_context(ceph_wbc.last_snapc); - ceph_wbc.last_snapc = ceph_wbc.snapc; - - while (!ceph_wbc.done && ceph_wbc.index <= ceph_wbc.end) { + while (!has_writeback_done(&ceph_wbc)) { unsigned i; struct page *page; + ceph_wbc.locked_pages = 0; ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; get_more_pages: + ceph_folio_batch_reinit(&ceph_wbc); + ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, &ceph_wbc.index, ceph_wbc.end, ceph_wbc.tag, &ceph_wbc.fbatch); - doutc(cl, "pagevec_lookup_range_tag got %d\n", - ceph_wbc.nr_folios); + doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", + ceph_wbc.tag, ceph_wbc.nr_folios); + if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) break; - for (i = 0; i < ceph_wbc.nr_folios && - ceph_wbc.locked_pages < ceph_wbc.max_pages; i++) { - struct folio *folio = ceph_wbc.fbatch.folios[i]; - - page = &folio->page; - doutc(cl, "? %p idx %lu\n", page, page->index); - if (ceph_wbc.locked_pages == 0) - lock_page(page); /* first page */ - else if (!trylock_page(page)) - break; - - /* only dirty pages, or our accounting breaks */ - if (unlikely(!PageDirty(page)) || - unlikely(page->mapping != mapping)) { - doutc(cl, "!dirty or !mapping %p\n", page); - unlock_page(page); - continue; - } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != ceph_wbc.snapc) { - doutc(cl, "page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, - ceph_wbc.snapc, ceph_wbc.snapc->seq); - if (!ceph_wbc.should_loop && - !ceph_wbc.head_snapc && - wbc->sync_mode != WB_SYNC_NONE) - ceph_wbc.should_loop = true; - unlock_page(page); - continue; - } - if (page_offset(page) >= ceph_wbc.i_size) { - doutc(cl, "folio at %lu beyond eof %llu\n", - folio->index, ceph_wbc.i_size); - if ((ceph_wbc.size_stable || - folio_pos(folio) >= i_size_read(inode)) && - folio_clear_dirty_for_io(folio)) - folio_invalidate(folio, 0, - folio_size(folio)); - folio_unlock(folio); - continue; - } - if (ceph_wbc.strip_unit_end && - (page->index > ceph_wbc.strip_unit_end)) { - doutc(cl, "end of strip unit %p\n", page); - unlock_page(page); - break; - } - if (folio_test_writeback(folio) || - folio_test_private_2(folio) /* [DEPRECATED] */) { - if (wbc->sync_mode == WB_SYNC_NONE) { - doutc(cl, "%p under writeback\n", folio); - folio_unlock(folio); - continue; - } - doutc(cl, "waiting on writeback %p\n", folio); - folio_wait_writeback(folio); - folio_wait_private_2(folio); /* [DEPRECATED] */ - } - - if (!clear_page_dirty_for_io(page)) { - doutc(cl, "%p !clear_page_dirty_for_io\n", page); - unlock_page(page); - continue; - } - - /* - * We have something to write. If this is - * the first locked page this time through, - * calculate max possinle write size and - * allocate a page array - */ - if (ceph_wbc.locked_pages == 0) { - u64 objnum; - u64 objoff; - u32 xlen; - - /* prepare async write request */ - ceph_wbc.offset = (u64)page_offset(page); - ceph_calc_file_object_mapping(&ci->i_layout, - ceph_wbc.offset, - ceph_wbc.wsize, - &objnum, &objoff, - &xlen); - ceph_wbc.len = xlen; - - ceph_wbc.num_ops = 1; - ceph_wbc.strip_unit_end = page->index + - ((ceph_wbc.len - 1) >> PAGE_SHIFT); - - BUG_ON(ceph_wbc.pages); - ceph_wbc.max_pages = - calc_pages_for(0, (u64)ceph_wbc.len); - ceph_wbc.pages = kmalloc_array(ceph_wbc.max_pages, - sizeof(*ceph_wbc.pages), - GFP_NOFS); - if (!ceph_wbc.pages) { - ceph_wbc.from_pool = true; - ceph_wbc.pages = - mempool_alloc(ceph_wb_pagevec_pool, - GFP_NOFS); - BUG_ON(!ceph_wbc.pages); - } - ceph_wbc.len = 0; - } else if (page->index != - (ceph_wbc.offset + ceph_wbc.len) >> PAGE_SHIFT) { - if (ceph_wbc.num_ops >= - (ceph_wbc.from_pool ? CEPH_OSD_SLAB_OPS : - CEPH_OSD_MAX_OPS)) { - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - - ceph_wbc.num_ops++; - ceph_wbc.offset = (u64)page_offset(page); - ceph_wbc.len = 0; - } - - /* note position of first page in fbatch */ - doutc(cl, "%llx.%llx will write page %p idx %lu\n", - ceph_vinop(inode), page, page->index); - - if (atomic_long_inc_return(&fsc->writeback_count) > - CONGESTION_ON_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = true; - - if (IS_ENCRYPTED(inode)) { - ceph_wbc.pages[ceph_wbc.locked_pages] = - fscrypt_encrypt_pagecache_blocks(page, - PAGE_SIZE, 0, - ceph_wbc.locked_pages ? - GFP_NOWAIT : GFP_NOFS); - if (IS_ERR(ceph_wbc.pages[ceph_wbc.locked_pages])) { - if (PTR_ERR(ceph_wbc.pages[ceph_wbc.locked_pages]) == -EINVAL) - pr_err_client(cl, - "inode->i_blkbits=%hhu\n", - inode->i_blkbits); - /* better not fail on first page! */ - BUG_ON(ceph_wbc.locked_pages == 0); - ceph_wbc.pages[ceph_wbc.locked_pages] = NULL; - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - ++ceph_wbc.locked_pages; - } else { - ceph_wbc.pages[ceph_wbc.locked_pages++] = page; - } - - ceph_wbc.fbatch.folios[i] = NULL; - ceph_wbc.len += thp_size(page); - } + rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; /* did we get anything? */ if (!ceph_wbc.locked_pages) -- 2.48.0