On Wed, May 16, 2018 at 10:27:18PM +0200, Andreas Gruenbacher wrote: > I/O completion triggers when dio->ref reaches zero, so > iomap_dio_bio_end_io will never evaluate submit.waiter before the > atomic_dec_and_test in iomap_dio_rw. We probably need an > smp_mb__before_atomic() before that atomic_dec_and_test to make sure > that iomap_dio_bio_end_io sees the right value of submit.waiter > though. Any concerns beyond that? I suspect in the end union usage might still be fine, but it still looks rather dangerous to scramble over the union that late. It will also do things like overriding the last_queue pointer which we need initialized early for block polling. I suspect something like the variant below will work much better. This moves the wait_for_completion flag into struct iomap_dio (in an existing packing hole) and uses that in the completion handler. It also always initializes the submit fields to prepare for this early, and makes all fallbacks synchronous (if we ever get a read fallback that would be the right thing to do). diff --git a/fs/iomap.c b/fs/iomap.c index e4617ae1f8d6..54b28a58b7e3 100644 --- a/fs/iomap.c +++ b/fs/iomap.c @@ -1327,6 +1327,7 @@ struct iomap_dio { atomic_t ref; unsigned flags; int error; + bool wait_for_completion; union { /* used during submission and for synchronous completion: */ @@ -1430,7 +1431,7 @@ static void iomap_dio_bio_end_io(struct bio *bio) iomap_dio_set_error(dio, blk_status_to_errno(bio->bi_status)); if (atomic_dec_and_test(&dio->ref)) { - if (is_sync_kiocb(dio->iocb)) { + if (dio->wait_for_completion) { struct task_struct *waiter = dio->submit.waiter; WRITE_ONCE(dio->submit.waiter, NULL); @@ -1646,13 +1647,12 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, dio->end_io = end_io; dio->error = 0; dio->flags = 0; + dio->wait_for_completion = is_sync_kiocb(iocb); dio->submit.iter = iter; - if (is_sync_kiocb(iocb)) { - dio->submit.waiter = current; - dio->submit.cookie = BLK_QC_T_NONE; - dio->submit.last_queue = NULL; - } + dio->submit.waiter = current; + dio->submit.cookie = BLK_QC_T_NONE; + dio->submit.last_queue = NULL; if (iov_iter_rw(iter) == READ) { if (pos >= dio->i_size) @@ -1702,7 +1702,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, dio_warn_stale_pagecache(iocb->ki_filp); ret = 0; - if (iov_iter_rw(iter) == WRITE && !is_sync_kiocb(iocb) && + if (iov_iter_rw(iter) == WRITE && !dio->wait_for_completion && !inode->i_sb->s_dio_done_wq) { ret = sb_init_dio_done_wq(inode->i_sb); if (ret < 0) @@ -1717,8 +1717,10 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, iomap_dio_actor); if (ret <= 0) { /* magic error code to fall back to buffered I/O */ - if (ret == -ENOTBLK) + if (ret == -ENOTBLK) { + dio->wait_for_completion = true; ret = 0; + } break; } pos += ret; @@ -1739,7 +1741,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, dio->flags &= ~IOMAP_DIO_NEED_SYNC; if (!atomic_dec_and_test(&dio->ref)) { - if (!is_sync_kiocb(iocb)) + if (!dio->wait_for_completion) return -EIOCBQUEUED; for (;;) {