From: Liang Chen <liangchen.linux@xxxxxxxxx> As pointed out in commit 332391a, mixing buffered reads and asynchronous direct writes risks ending up with a situation where stale data is left in page cache while new data is already written to disk. The same problem hits block dev fs too. A similar approach needs to be taken here. Signed-off-by: Liang Chen <liangchen.linux@xxxxxxxxx> --- block/fops.c | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/block/fops.c b/block/fops.c index 9f2ecec406b0..8ab679814b9d 100644 --- a/block/fops.c +++ b/block/fops.c @@ -136,11 +136,56 @@ struct blkdev_dio { size_t size; atomic_t ref; unsigned int flags; + struct work_struct complete_work; struct bio bio ____cacheline_aligned_in_smp; }; static struct bio_set blkdev_dio_pool; +static void blkdev_aio_complete_work(struct work_struct *work) +{ + struct blkdev_dio *dio = container_of(work, struct blkdev_dio, complete_work); + struct kiocb *iocb = dio->iocb; + int err; + struct inode *inode = bdev_file_inode(iocb->ki_filp); + loff_t offset = iocb->ki_pos; + ssize_t ret; + + WRITE_ONCE(iocb->private, NULL); + + if (likely(!dio->bio.bi_status)) { + ret = dio->size; + iocb->ki_pos += ret; + } else { + ret = blk_status_to_errno(dio->bio.bi_status); + } + + /* + * Try again to invalidate clean pages which might have been cached by + * non-direct readahead, or faulted in by get_user_pages() if the source + * of the write was an mmap'ed region of the file we're writing. Either + * one is a pretty crazy thing to do, so we don't support it 100%. If + * this invalidation fails, tough, the write still worked... + */ + if (iocb->ki_flags & IOCB_WRITE && ret > 0 && + inode->i_mapping->nrpages) { + err = invalidate_inode_pages2_range(inode->i_mapping, + offset >> PAGE_SHIFT, + (offset + ret - 1) >> PAGE_SHIFT); + if (err) + dio_warn_stale_pagecache(iocb->ki_filp); + } + + iocb->ki_complete(iocb, ret); + + /* + * For multi-bio dio dio->bio has an extra reference to ensure the + * dio stays around. In the other case, an extra reference is taken + * to make sure + */ + bio_put(&dio->bio); +} + static void blkdev_bio_end_io(struct bio *bio) { struct blkdev_dio *dio = bio->bi_private; @@ -153,6 +198,14 @@ static void blkdev_bio_end_io(struct bio *bio) if (!(dio->flags & DIO_IS_SYNC)) { struct kiocb *iocb = dio->iocb; ssize_t ret; + struct inode *inode = bdev_file_inode(iocb->ki_filp); + + if (iocb->ki_flags & IOCB_WRITE){ + INIT_WORK(&dio->complete_work, blkdev_aio_complete_work); + queue_work(inode->i_sb->s_dio_done_wq, + &dio->complete_work); + goto out; + } WRITE_ONCE(iocb->private, NULL); @@ -173,6 +226,7 @@ static void blkdev_bio_end_io(struct bio *bio) } } +out: if (should_dirty) { bio_check_pages_dirty(bio); } else { @@ -284,6 +338,20 @@ static void blkdev_bio_end_io_async(struct bio *bio) struct blkdev_dio *dio = container_of(bio, struct blkdev_dio, bio); struct kiocb *iocb = dio->iocb; ssize_t ret; + struct inode *inode = bdev_file_inode(iocb->ki_filp); + + if (iocb->ki_flags & IOCB_WRITE){ + INIT_WORK(&dio->complete_work, blkdev_aio_complete_work); + /* + * Grab an extra reference to ensure the dio structure + * which the bio embeds in stays around for complete_work + * to access. + */ + bio_get(bio); + queue_work(inode->i_sb->s_dio_done_wq, + &dio->complete_work); + goto out; + } WRITE_ONCE(iocb->private, NULL); @@ -296,6 +364,7 @@ static void blkdev_bio_end_io_async(struct bio *bio) iocb->ki_complete(iocb, ret); +out: if (dio->flags & DIO_SHOULD_DIRTY) { bio_check_pages_dirty(bio); } else { @@ -366,14 +435,37 @@ static ssize_t __blkdev_direct_IO_async(struct kiocb *iocb, return -EIOCBQUEUED; } +int blkdev_sb_init_dio_done_wq(struct super_block *sb) +{ + struct workqueue_struct *old; + struct workqueue_struct *wq = alloc_workqueue("dio/%s", + WQ_MEM_RECLAIM, 0, + sb->s_id); + if (!wq) + return -ENOMEM; + /* + * This has to be atomic as more DIOs can race to create the workqueue + */ + old = cmpxchg(&sb->s_dio_done_wq, NULL, wq); + /* Someone created workqueue before us? Free ours... */ + if (old) + destroy_workqueue(wq); + return 0; +} + static ssize_t blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter) { unsigned int nr_pages; + struct inode *inode = bdev_file_inode(iocb->ki_filp); if (!iov_iter_count(iter)) return 0; nr_pages = bio_iov_vecs_to_alloc(iter, BIO_MAX_VECS + 1); + + if(!inode->i_sb->s_dio_done_wq && blkdev_sb_init_dio_done_wq(inode->i_sb)) + return -ENOMEM; + if (likely(nr_pages <= BIO_MAX_VECS)) { if (is_sync_kiocb(iocb)) return __blkdev_direct_IO_simple(iocb, iter, nr_pages); -- 2.31.1