On 2019/08/06 7:05, Damien Le Moal wrote: > On 2019/08/06 6:59, Damien Le Moal wrote: >> On 2019/08/06 6:28, Jens Axboe wrote: >>> On 8/5/19 2:27 PM, Damien Le Moal wrote: >>>> On 2019/08/06 6:26, Jens Axboe wrote: >>>>>> In any case, looking again at this code, it looks like there is a >>>>>> problem with dio->size being incremented early, even for fragments >>>>>> that get BLK_QC_T_EAGAIN, because dio->size is being used in >>>>>> blkdev_bio_end_io(). So an incorrect size can be reported to user >>>>>> space in that case on completion (e.g. large asynchronous no-wait dio >>>>>> that cannot be issued in one go). >>>>>> >>>>>> So maybe something like this ? (completely untested) >>>>> >>>>> I think that looks pretty good, I like not double accounting with >>>>> this_size and dio->size, and we retain the old style ordering for the >>>>> ret value. >>>> >>>> Do you want a proper patch with real testing backup ? I can send that >>>> later today. >>> >>> Yeah that'd be great, I like your approach better. >>> >> >> Looking again, I think this is not it yet: dio->size is being referenced after >> submit_bio(), so blkdev_bio_end_io() may see the old value if the bio completes >> before dio->size increment. So the use-after-free is still there. And since >> blkdev_bio_end_io() processes completion to user space only when dio->ref >> becomes 0, adding an atomic_inc/dec(&dio->ref) over the loop would not help and >> does not cover the single BIO case. Any idea how to address this one ? >> > > May be add a bio_get/put() over the 2 places that do submit_bio() would work, > for all cases (single/multi BIO, sync & async). E.g.: > > + bio_get(bio); > qc = submit_bio(bio); > if (qc == BLK_QC_T_EAGAIN) { > if (!dio->size) > ret = -EAGAIN; > + bio_put(bio); > goto error; > } > dio->size += bio_size; > + bio_put(bio); > > Thoughts ? > That does not work since the reference to dio->size in blkdev_bio_end_io() depends on atomic_dec_and_test(&dio->ref) which counts the BIO fragments for the dio (+1 for async multi-bio case). So completion of the last bio can still reference the old value of dio->size. Adding a bio_get/put() on dio->bio ensures that dio stays around, but does not prevent the use of the wrong dio->size. Adding an additional atomic_inc/dec(&dio->ref) would prevent that, but we would need to handle dio completion at the end of __blkdev_direct_IO() if all BIO fragments already completed at that point. That is a lot more plumbing needed, relying completely on dio->ref for all cases, thus removing the dio->multi_bio management. Something like this: diff --git a/fs/block_dev.c b/fs/block_dev.c index a6f7c892cb4a..51d36baa367c 100644 --- a/fs/block_dev.c +++ b/fs/block_dev.c @@ -279,7 +279,6 @@ struct blkdev_dio { }; size_t size; atomic_t ref; - bool multi_bio : 1; bool should_dirty : 1; bool is_sync : 1; struct bio bio; @@ -295,15 +294,14 @@ static int blkdev_iopoll(struct kiocb *kiocb, bool wait) return blk_poll(q, READ_ONCE(kiocb->ki_cookie), wait); } -static void blkdev_bio_end_io(struct bio *bio) +static inline void blkdev_get_dio(struct blkdev_dio *dio) { - struct blkdev_dio *dio = bio->bi_private; - bool should_dirty = dio->should_dirty; - - if (bio->bi_status && !dio->bio.bi_status) - dio->bio.bi_status = bio->bi_status; + atomic_inc(&dio->ref); +} - if (!dio->multi_bio || atomic_dec_and_test(&dio->ref)) { +static void blkdev_put_dio(struct blkdev_dio *dio) +{ + if (atomic_dec_and_test(&dio->ref)) { if (!dio->is_sync) { struct kiocb *iocb = dio->iocb; ssize_t ret; @@ -316,15 +314,25 @@ static void blkdev_bio_end_io(struct bio *bio) } dio->iocb->ki_complete(iocb, ret, 0); - if (dio->multi_bio) - bio_put(&dio->bio); } else { struct task_struct *waiter = dio->waiter; WRITE_ONCE(dio->waiter, NULL); blk_wake_io_task(waiter); } + bio_put(&dio->bio); } +} + +static void blkdev_bio_end_io(struct bio *bio) +{ + struct blkdev_dio *dio = bio->bi_private; + bool should_dirty = dio->should_dirty; + + if (bio->bi_status && !dio->bio.bi_status) + dio->bio.bi_status = bio->bi_status; + + blkdev_put_dio(dio); if (should_dirty) { bio_check_pages_dirty(bio); @@ -349,7 +357,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) loff_t pos = iocb->ki_pos; blk_qc_t qc = BLK_QC_T_NONE; gfp_t gfp; - ssize_t ret; + ssize_t ret = 0; if ((pos | iov_iter_alignment(iter)) & (bdev_logical_block_size(bdev) - 1)) @@ -366,17 +374,24 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) dio = container_of(bio, struct blkdev_dio, bio); dio->is_sync = is_sync = is_sync_kiocb(iocb); - if (dio->is_sync) { + if (dio->is_sync) dio->waiter = current; - bio_get(bio); - } else { + else dio->iocb = iocb; - } dio->size = 0; - dio->multi_bio = false; dio->should_dirty = is_read && iter_is_iovec(iter); + /* + * Get an extra reference on the first bio to ensure that the dio + * structure which is embedded into the first bio stays around for AIOs + * and while we are still doing dio->size accounting in the loop below. + * For dio->is_sync case, the extra reference is released on exit of + * this function. + */ + bio_get(bio); + blkdev_get_dio(dio); + /* * Don't plug for HIPRI/polled IO, as those should go straight * to issue @@ -386,6 +401,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) ret = 0; for (;;) { + size_t bio_size; int err; bio_set_dev(bio, bdev); @@ -421,7 +437,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) if (nowait) bio->bi_opf |= (REQ_NOWAIT | REQ_NOWAIT_INLINE); - dio->size += bio->bi_iter.bi_size; + bio_size = bio->bi_iter.bi_size; pos += bio->bi_iter.bi_size; nr_pages = iov_iter_npages(iter, BIO_MAX_PAGES); @@ -435,42 +451,30 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) qc = submit_bio(bio); if (qc == BLK_QC_T_EAGAIN) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } - ret = dio->size; + dio->size += bio_size; if (polled) WRITE_ONCE(iocb->ki_cookie, qc); break; } - if (!dio->multi_bio) { - /* - * AIO needs an extra reference to ensure the dio - * structure which is embedded into the first bio - * stays around. - */ - if (!is_sync) - bio_get(bio); - dio->multi_bio = true; - atomic_set(&dio->ref, 2); - } else { - atomic_inc(&dio->ref); - } + blkdev_get_dio(dio); qc = submit_bio(bio); if (qc == BLK_QC_T_EAGAIN) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } - ret = dio->size; + dio->size += bio_size; bio = bio_alloc(gfp, nr_pages); if (!bio) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } @@ -496,8 +500,10 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) out: if (!ret) ret = blk_status_to_errno(dio->bio.bi_status); + if (likely(!ret)) + ret = dio->size; - bio_put(&dio->bio); + blkdev_put_dio(dio); return ret; error: if (!is_poll) I think that works for all cases. Starting tests. Let me know if you think this is not appropriate. -- Damien Le Moal Western Digital Research