2018-05-18 17:56 GMT+02:00 Christoph Hellwig <hch@xxxxxx>: > On Wed, May 16, 2018 at 10:27:18PM +0200, Andreas Gruenbacher wrote: >> I/O completion triggers when dio->ref reaches zero, so >> iomap_dio_bio_end_io will never evaluate submit.waiter before the >> atomic_dec_and_test in iomap_dio_rw. We probably need an >> smp_mb__before_atomic() before that atomic_dec_and_test to make sure >> that iomap_dio_bio_end_io sees the right value of submit.waiter >> though. Any concerns beyond that? > > I suspect in the end union usage might still be fine, but it still > looks rather dangerous to scramble over the union that late. It will > also do things like overriding the last_queue pointer which we need > initialized early for block polling. > > I suspect something like the variant below will work much better. > This moves the wait_for_completion flag into struct iomap_dio > (in an existing packing hole) and uses that in the completion > handler. It also always initializes the submit fields to prepare > for this early, and makes all fallbacks synchronous (if we ever > get a read fallback that would be the right thing to do). Yes, that should work. > diff --git a/fs/iomap.c b/fs/iomap.c > index e4617ae1f8d6..54b28a58b7e3 100644 > --- a/fs/iomap.c > +++ b/fs/iomap.c > @@ -1327,6 +1327,7 @@ struct iomap_dio { > atomic_t ref; > unsigned flags; > int error; > + bool wait_for_completion; > > union { > /* used during submission and for synchronous completion: */ > @@ -1430,7 +1431,7 @@ static void iomap_dio_bio_end_io(struct bio *bio) > iomap_dio_set_error(dio, blk_status_to_errno(bio->bi_status)); > > if (atomic_dec_and_test(&dio->ref)) { > - if (is_sync_kiocb(dio->iocb)) { > + if (dio->wait_for_completion) { > struct task_struct *waiter = dio->submit.waiter; > > WRITE_ONCE(dio->submit.waiter, NULL); > @@ -1646,13 +1647,12 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, > dio->end_io = end_io; > dio->error = 0; > dio->flags = 0; > + dio->wait_for_completion = is_sync_kiocb(iocb); > > dio->submit.iter = iter; > - if (is_sync_kiocb(iocb)) { > - dio->submit.waiter = current; > - dio->submit.cookie = BLK_QC_T_NONE; > - dio->submit.last_queue = NULL; > - } > + dio->submit.waiter = current; > + dio->submit.cookie = BLK_QC_T_NONE; > + dio->submit.last_queue = NULL; > > if (iov_iter_rw(iter) == READ) { > if (pos >= dio->i_size) > @@ -1702,7 +1702,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, > dio_warn_stale_pagecache(iocb->ki_filp); > ret = 0; > > - if (iov_iter_rw(iter) == WRITE && !is_sync_kiocb(iocb) && > + if (iov_iter_rw(iter) == WRITE && !dio->wait_for_completion && > !inode->i_sb->s_dio_done_wq) { > ret = sb_init_dio_done_wq(inode->i_sb); > if (ret < 0) > @@ -1717,8 +1717,10 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, > iomap_dio_actor); > if (ret <= 0) { > /* magic error code to fall back to buffered I/O */ > - if (ret == -ENOTBLK) > + if (ret == -ENOTBLK) { > + dio->wait_for_completion = true; > ret = 0; > + } > break; > } > pos += ret; > @@ -1739,7 +1741,7 @@ iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, > dio->flags &= ~IOMAP_DIO_NEED_SYNC; We still want to make sure the right value of dio->wait_for_completion is seen by iomap_dio_bio_end_io: smp_mb__before_atomic(); > if (!atomic_dec_and_test(&dio->ref)) { > - if (!is_sync_kiocb(iocb)) > + if (!dio->wait_for_completion) > return -EIOCBQUEUED; > > for (;;) { Thanks, Andreas