On 2019/08/06 13:09, Jens Axboe wrote: > On 8/5/19 5:05 PM, Damien Le Moal wrote: >> On 2019/08/06 7:05, Damien Le Moal wrote: >>> On 2019/08/06 6:59, Damien Le Moal wrote: >>>> On 2019/08/06 6:28, Jens Axboe wrote: >>>>> On 8/5/19 2:27 PM, Damien Le Moal wrote: >>>>>> On 2019/08/06 6:26, Jens Axboe wrote: >>>>>>>> In any case, looking again at this code, it looks like there is a >>>>>>>> problem with dio->size being incremented early, even for fragments >>>>>>>> that get BLK_QC_T_EAGAIN, because dio->size is being used in >>>>>>>> blkdev_bio_end_io(). So an incorrect size can be reported to user >>>>>>>> space in that case on completion (e.g. large asynchronous no-wait dio >>>>>>>> that cannot be issued in one go). >>>>>>>> >>>>>>>> So maybe something like this ? (completely untested) >>>>>>> >>>>>>> I think that looks pretty good, I like not double accounting with >>>>>>> this_size and dio->size, and we retain the old style ordering for the >>>>>>> ret value. >>>>>> >>>>>> Do you want a proper patch with real testing backup ? I can send that >>>>>> later today. >>>>> >>>>> Yeah that'd be great, I like your approach better. >>>>> >>>> >>>> Looking again, I think this is not it yet: dio->size is being referenced after >>>> submit_bio(), so blkdev_bio_end_io() may see the old value if the bio completes >>>> before dio->size increment. So the use-after-free is still there. And since >>>> blkdev_bio_end_io() processes completion to user space only when dio->ref >>>> becomes 0, adding an atomic_inc/dec(&dio->ref) over the loop would not help and >>>> does not cover the single BIO case. Any idea how to address this one ? >>>> >>> >>> May be add a bio_get/put() over the 2 places that do submit_bio() would work, >>> for all cases (single/multi BIO, sync & async). E.g.: >>> >>> + bio_get(bio); >>> qc = submit_bio(bio); >>> if (qc == BLK_QC_T_EAGAIN) { >>> if (!dio->size) >>> ret = -EAGAIN; >>> + bio_put(bio); >>> goto error; >>> } >>> dio->size += bio_size; >>> + bio_put(bio); >>> >>> Thoughts ? >>> >> >> That does not work since the reference to dio->size in >> blkdev_bio_end_io() depends on atomic_dec_and_test(&dio->ref) which >> counts the BIO fragments for the dio (+1 for async multi-bio case). So >> completion of the last bio can still reference the old value of >> dio->size. >> >> Adding a bio_get/put() on dio->bio ensures that dio stays around, but >> does not prevent the use of the wrong dio->size. Adding an additional >> atomic_inc/dec(&dio->ref) would prevent that, but we would need to >> handle dio completion at the end of __blkdev_direct_IO() if all BIO >> fragments already completed at that point. That is a lot more plumbing >> needed, relying completely on dio->ref for all cases, thus removing >> the dio->multi_bio management. >> >> Something like this: > > Don't like this, as it adds unnecessary atomics for the sync case. > What's wrong with just adjusting dio->size if we get BLK_QC_T_EAGAIN? > It's safe to do so, since we're doing the final put later. We just can't > do it for the normal case of submit_bio() succeeding. Kill the new 'ret' > usage and return to what we had as well, it's more readable too imho. Here is what I have so far: diff --git a/fs/block_dev.c b/fs/block_dev.c index a6f7c892cb4a..6dd945fdf962 100644 --- a/fs/block_dev.c +++ b/fs/block_dev.c @@ -300,7 +300,9 @@ static void blkdev_bio_end_io(struct bio *bio) struct blkdev_dio *dio = bio->bi_private; bool should_dirty = dio->should_dirty; - if (bio->bi_status && !dio->bio.bi_status) + if (bio->bi_status && + bio->bi_status != BLK_STS_AGAIN && + !dio->bio.bi_status) dio->bio.bi_status = bio->bi_status; if (!dio->multi_bio || atomic_dec_and_test(&dio->ref)) { @@ -349,7 +351,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) loff_t pos = iocb->ki_pos; blk_qc_t qc = BLK_QC_T_NONE; gfp_t gfp; - ssize_t ret; + ssize_t ret = 0; if ((pos | iov_iter_alignment(iter)) & (bdev_logical_block_size(bdev) - 1)) @@ -386,7 +388,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) ret = 0; for (;;) { - int err; + unsigned int bio_size; bio_set_dev(bio, bdev); bio->bi_iter.bi_sector = pos >> 9; @@ -395,10 +397,8 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) bio->bi_end_io = blkdev_bio_end_io; bio->bi_ioprio = iocb->ki_ioprio; - err = bio_iov_iter_get_pages(bio, iter); - if (unlikely(err)) { - if (!ret) - ret = err; + ret = bio_iov_iter_get_pages(bio, iter); + if (unlikely(ret)) { bio->bi_status = BLK_STS_IOERR; bio_endio(bio); break; @@ -421,7 +421,7 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) if (nowait) bio->bi_opf |= (REQ_NOWAIT | REQ_NOWAIT_INLINE); - dio->size += bio->bi_iter.bi_size; + bio_size = bio->bi_iter.bi_size; pos += bio->bi_iter.bi_size; nr_pages = iov_iter_npages(iter, BIO_MAX_PAGES); @@ -435,11 +435,11 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) qc = submit_bio(bio); if (qc == BLK_QC_T_EAGAIN) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } - ret = dio->size; + dio->size += bio_size; if (polled) WRITE_ONCE(iocb->ki_cookie, qc); @@ -462,15 +462,15 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) qc = submit_bio(bio); if (qc == BLK_QC_T_EAGAIN) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } - ret = dio->size; + dio->size += bio_size; bio = bio_alloc(gfp, nr_pages); if (!bio) { - if (!ret) + if (!dio->size) ret = -EAGAIN; goto error; } @@ -496,6 +496,8 @@ __blkdev_direct_IO(struct kiocb *iocb, struct iov_iter *iter, int nr_pages) out: if (!ret) ret = blk_status_to_errno(dio->bio.bi_status); + if (likely(!ret)) + ret = dio->size; bio_put(&dio->bio); return ret; This fixes the sync case return value with nowait, but the async case is still wrong as there is a potential use-after-free of dio and dio->size seen by blkdev_bio_end_io() may not be the uptodate incremented value after submit_bio(). And I am stuck. I do not see how to fix this without the extra dio reference and handling of completion of dio->ref decrement also in __blkdev_direct_IO(). This is the previous proposal I sent that you did not like. This extra ref solution is similar to the iomap direct IO code. Any other idea ? Also, I am seeing some very weird behavior of submit_bio() with the nowait case. For very large dio requests, I see submit_bio returning BLK_QC_T_EAGAIN but the bio actually going to the drive, which of course does not make sense. Another thing I noticed is that REQ_NOWAIT_INLINE is handled for the inline error return only by blk_mq_make_request(). generic_make_request() is lacking handling of REQ_NOWAIT through bio_wouldblock_error(). I will keep digging into all this, but being on vacation this week, I may be very slow. FYI, attaching to this email the small application I used for testing. Planning to use it for blktests cases. Best regards. -- Damien Le Moal Western Digital Research
// SPDX-License-Identifier: GPL-2.0-only /* * Copyright (C) 2019 Western Digital Corporation or its affiliates. * Author: Damien Le Moal <damien.lemoal@xxxxxxx> */ #define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <stdbool.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/uio.h> #include <sys/syscall.h> #include <fcntl.h> #include <linux/aio_abi.h> #include <linux/fs.h> static void dio_usage(char *cmd) { printf("Usage: %s [options] <dev path> <offset (B)> <size (B)>\n", cmd); printf("Options:\n" " -rd: do read (default)\n" " -wr: do wr\n" " -nowait: Use RWF_NOWAIT (do not wait for resources).\n" " -async: Do asynchronous direct IO\n"); } ssize_t preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { return syscall(SYS_preadv2, fd, iov, iovcnt, offset, 0, flags); } ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) { return syscall(SYS_pwritev2, fd, iov, iovcnt, offset, 0, flags); } static void dio_sync(int fd, void *buf, size_t size, loff_t offset, int flags, bool do_read) { ssize_t ret; struct iovec iov = { .iov_base = buf, .iov_len = size, }; if (do_read) ret = preadv2(fd, &iov, 1, offset, flags); else ret = pwritev2(fd, &iov, 1, offset, flags); if (ret < 0) ret = -errno; printf("%zu %zd\n", size, ret); } static inline int io_setup(unsigned int nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } static inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } static inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } static inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } static void dio_async(int fd, void *buf, size_t size, loff_t offset, int flags, bool do_read) { aio_context_t aioctx; struct iocb aiocb, *aiocbs; struct io_event aioevent; ssize_t ret; memset(&aioctx, 0, sizeof(aioctx)); memset(&aiocb, 0, sizeof(aiocb)); memset(&aioevent, 0, sizeof(aioevent)); ret = io_setup(1, &aioctx); if (ret < 0) { fprintf(stderr, "io_setup failed %d (%s)\n", errno, strerror(errno)); return; } aiocb.aio_fildes = fd; aiocb.aio_buf = (unsigned long)buf; aiocb.aio_nbytes = size; if (do_read) aiocb.aio_lio_opcode = IOCB_CMD_PREAD; else aiocb.aio_lio_opcode = IOCB_CMD_PWRITE; aiocb.aio_rw_flags = flags; aiocbs = &aiocb; ret = io_submit(aioctx, 1, &aiocbs); if (ret < 0) { fprintf(stderr, "io_submit failed %d (%s)\n", errno, strerror(errno)); goto out; } ret = io_getevents(aioctx, 1, 1, &aioevent, NULL); if (ret != 1) { fprintf(stderr, "io_getevents failed %d (%s)\n", errno, strerror(errno)); goto out; } printf("%zu %lld\n", size, aioevent.res); out: io_destroy(aioctx); } int main(int argc, char **argv) { int ret, fd, i, flags = 0; char *dev_path; loff_t offset; size_t size; void *buf = NULL; bool async = false; bool do_read = true; int open_flags = O_DIRECT; for (i = 1; i < argc; i++) { if (strcmp(argv[i], "-nowait") == 0) { flags = RWF_NOWAIT; } else if (strcmp(argv[i], "-async") == 0) { async = true; } else if (strcmp(argv[i], "-rd") == 0) { do_read = true; } else if (strcmp(argv[i], "-wr") == 0) { do_read = false; } else if (argv[i][0] == '-') { fprintf(stderr, "Invalid option %s\n", argv[i]); return 1; } else { break; } } if (argc - i != 3) { dio_usage(argv[0]); return 1; } dev_path = argv[i]; offset = atoll(argv[i + 1]); if (offset < 0) { fprintf(stderr, "Invalid offset %s\n", argv[i + 1]); return 1; } size = atoll(argv[i + 2]); if (do_read) open_flags |= O_RDONLY; else open_flags |= O_WRONLY; fd = open(dev_path, open_flags, 0); if (fd < 0) { fprintf(stderr, "Open %s failed %d (%s)\n", dev_path, errno, strerror(errno)); return 1; } ret = posix_memalign((void **) &buf, sysconf(_SC_PAGESIZE), size); if (ret != 0) { fprintf(stderr, "Allocate buffer failed %d (%s)\n", -ret, strerror(-ret)); ret = 1; goto out; } if (!async) dio_sync(fd, buf, size, offset, flags, do_read); else dio_async(fd, buf, size, offset, flags, do_read); out: close(fd); free(buf); return ret; }