Add support to the core direct-io code to defer AIO completions to user context using a workqueue. This replaces opencoded and less efficient code in XFS and ext4 and will be needed to properly support O_(D)SYNC for AIO. The communication between the filesystem and the direct I/O code requires a new buffer head flag, which is a bit ugly but not avoidable until the direct I/O code stops abusing the buffer_head structure for communicating with the filesystems. Currently this creates a per-superblock unbnound workqueue for these completions, which is taken from an earlier patch by Jan Kara. I'm not really convince about this use and would prefer a "normal" global workqueue with a high concurrently limit, but this needs further discussion. Signed-off-by: Christoph Hellwig <hch@xxxxxx> --- fs/direct-io.c | 50 +++++++++++++++++++++++++++++--------------- fs/ext4/ext4.h | 5 ---- fs/ext4/inode.c | 15 +------------ fs/ext4/page-io.c | 4 --- fs/ocfs2/aops.c | 8 ------- fs/super.c | 20 +++++++++-------- fs/xfs/xfs_aops.c | 28 ++++-------------------- fs/xfs/xfs_aops.h | 3 -- include/linux/buffer_head.h | 2 + include/linux/fs.h | 7 ++++-- 10 files changed, 61 insertions(+), 81 deletions(-) Index: linux-2.6/fs/direct-io.c =================================================================== --- linux-2.6.orig/fs/direct-io.c 2012-11-06 13:33:43.446660128 +0100 +++ linux-2.6/fs/direct-io.c 2012-11-21 21:22:57.875141232 +0100 @@ -126,6 +126,7 @@ struct dio { spinlock_t bio_lock; /* protects BIO fields below */ int page_errors; /* errno from get_user_pages() */ int is_async; /* is IO async ? */ + bool defer_completion; /* defer AIO completion to workqueue? */ int io_error; /* IO error in completion path */ unsigned long refcount; /* direct_io_worker() and bios */ struct bio *bio_list; /* singly linked via bi_private */ @@ -140,7 +141,10 @@ struct dio { * allocation time. Don't add new fields after pages[] unless you * wish that they not be zeroed. */ - struct page *pages[DIO_PAGES]; /* page buffer */ + union { + struct page *pages[DIO_PAGES]; /* page buffer */ + struct work_struct complete_work;/* deferred AIO completion */ + }; } ____cacheline_aligned_in_smp; static struct kmem_cache *dio_cache __read_mostly; @@ -220,16 +224,16 @@ static inline struct page *dio_get_page( * dio_complete() - called when all DIO BIO I/O has been completed * @offset: the byte offset in the file of the completed operation * - * This releases locks as dictated by the locking type, lets interested parties - * know that a DIO operation has completed, and calculates the resulting return - * code for the operation. + * This drops i_dio_count, lets interested parties know that a DIO operation + * has completed, and calculates the resulting return code for the operation. * * It lets the filesystem know if it registered an interest earlier via * get_block. Pass the private field of the map buffer_head so that * filesystems can use it to hold additional state between get_block calls and * dio_complete. */ -static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, bool is_async) +static ssize_t dio_complete(struct dio *dio, loff_t offset, ssize_t ret, + bool is_async) { ssize_t transferred = 0; @@ -257,19 +261,26 @@ static ssize_t dio_complete(struct dio * if (ret == 0) ret = transferred; - if (dio->end_io && dio->result) { - dio->end_io(dio->iocb, offset, transferred, - dio->private, ret, is_async); - } else { - if (is_async) - aio_complete(dio->iocb, ret, 0); - inode_dio_done(dio->inode); - } + if (dio->result && dio->end_io) + dio->end_io(dio->iocb, offset, transferred, dio->private); + + if (is_async) + aio_complete(dio->iocb, ret, 0); + inode_dio_done(dio->inode); + kmem_cache_free(dio_cache, dio); return ret; } +static void dio_aio_complete_work(struct work_struct *work) +{ + struct dio *dio = container_of(work, struct dio, complete_work); + + dio_complete(dio, dio->iocb->ki_pos, 0, true); +} + static int dio_bio_complete(struct dio *dio, struct bio *bio); + /* * Asynchronous IO callback. */ @@ -289,8 +300,13 @@ static void dio_bio_end_aio(struct bio * spin_unlock_irqrestore(&dio->bio_lock, flags); if (remaining == 0) { - dio_complete(dio, dio->iocb->ki_pos, 0, true); - kmem_cache_free(dio_cache, dio); + if (dio->result && dio->defer_completion) { + INIT_WORK(&dio->complete_work, dio_aio_complete_work); + queue_work(dio->inode->i_sb->s_dio_done_wq, + &dio->complete_work); + } else { + dio_complete(dio, dio->iocb->ki_pos, 0, true); + } } } @@ -579,6 +595,9 @@ static int get_more_blocks(struct dio *d /* Store for completion */ dio->private = map_bh->b_private; + + if (buffer_defer_completion(map_bh)) + dio->defer_completion = true; } return ret; } @@ -1271,7 +1290,6 @@ do_blockdev_direct_IO(int rw, struct kio if (drop_refcount(dio) == 0) { retval = dio_complete(dio, offset, retval, false); - kmem_cache_free(dio_cache, dio); } else BUG_ON(retval != -EIOCBQUEUED); Index: linux-2.6/fs/super.c =================================================================== --- linux-2.6.orig/fs/super.c 2012-11-06 13:33:43.706660126 +0100 +++ linux-2.6/fs/super.c 2012-11-21 21:19:50.000000000 +0100 @@ -152,15 +152,13 @@ static struct super_block *alloc_super(s static const struct super_operations default_op; if (s) { - if (security_sb_alloc(s)) { - /* - * We cannot call security_sb_free() without - * security_sb_alloc() succeeding. So bail out manually - */ - kfree(s); - s = NULL; - goto out; - } + s->s_dio_done_wq = alloc_workqueue("dio-sync", WQ_UNBOUND, 1); + if (!s->s_dio_done_wq) + goto out_free_sb; + + if (security_sb_alloc(s)) + goto out_destroy_wq; + #ifdef CONFIG_SMP s->s_files = alloc_percpu(struct list_head); if (!s->s_files) @@ -228,6 +226,9 @@ err_out: free_percpu(s->s_files); #endif destroy_sb_writers(s); +out_destroy_wq: + destroy_workqueue(s->s_dio_done_wq); +out_free_sb: kfree(s); s = NULL; goto out; @@ -244,6 +245,7 @@ static inline void destroy_super(struct #ifdef CONFIG_SMP free_percpu(s->s_files); #endif + destroy_workqueue(s->s_dio_done_wq); destroy_sb_writers(s); security_sb_free(s); WARN_ON(!list_empty(&s->s_mounts)); Index: linux-2.6/include/linux/fs.h =================================================================== --- linux-2.6.orig/include/linux/fs.h 2012-11-06 13:33:44.526660122 +0100 +++ linux-2.6/include/linux/fs.h 2012-11-21 21:19:50.000000000 +0100 @@ -44,6 +44,7 @@ struct vm_area_struct; struct vfsmount; struct cred; struct swap_info_struct; +struct workqueue_struct; extern void __init inode_init(void); extern void __init inode_init_early(void); @@ -61,8 +62,7 @@ struct buffer_head; typedef int (get_block_t)(struct inode *inode, sector_t iblock, struct buffer_head *bh_result, int create); typedef void (dio_iodone_t)(struct kiocb *iocb, loff_t offset, - ssize_t bytes, void *private, int ret, - bool is_async); + ssize_t bytes, void *private); #define MAY_EXEC 0x00000001 #define MAY_WRITE 0x00000002 @@ -1321,6 +1321,9 @@ struct super_block { /* Being remounted read-only */ int s_readonly_remount; + + /* AIO completions deferred from interrupt context */ + struct workqueue_struct *s_dio_done_wq; }; /* superblock cache pruning functions */ Index: linux-2.6/fs/ext4/inode.c =================================================================== --- linux-2.6.orig/fs/ext4/inode.c 2012-11-06 13:33:43.483326794 +0100 +++ linux-2.6/fs/ext4/inode.c 2012-11-21 21:19:35.227136042 +0100 @@ -2876,15 +2876,13 @@ static int ext4_get_block_write_nolock(s } static void ext4_end_io_dio(struct kiocb *iocb, loff_t offset, - ssize_t size, void *private, int ret, - bool is_async) + ssize_t size, void *private) { - struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode; ext4_io_end_t *io_end = iocb->private; /* if not async direct IO or dio with 0 bytes write, just return */ if (!io_end || !size) - goto out; + return; ext_debug("ext4_end_io_dio(): io_end 0x%p " "for inode %lu, iocb 0x%p, offset %llu, size %zd\n", @@ -2896,19 +2894,11 @@ static void ext4_end_io_dio(struct kiocb /* if not aio dio with unwritten extents, just free io and return */ if (!(io_end->flag & EXT4_IO_END_UNWRITTEN)) { ext4_free_io_end(io_end); -out: - if (is_async) - aio_complete(iocb, ret, 0); - inode_dio_done(inode); return; } io_end->offset = offset; io_end->size = size; - if (is_async) { - io_end->iocb = iocb; - io_end->result = ret; - } ext4_add_complete_io(io_end); } @@ -3044,7 +3034,6 @@ static ssize_t ext4_ext_direct_IO(int rw ret = -ENOMEM; goto retake_lock; } - io_end->flag |= EXT4_IO_END_DIRECT; iocb->private = io_end; /* * we save the io structure for current async Index: linux-2.6/fs/ext4/page-io.c =================================================================== --- linux-2.6.orig/fs/ext4/page-io.c 2012-11-06 13:33:43.486660128 +0100 +++ linux-2.6/fs/ext4/page-io.c 2012-11-21 19:51:44.475001087 +0100 @@ -104,11 +104,7 @@ static int ext4_end_io(ext4_io_end_t *io "(inode %lu, offset %llu, size %zd, error %d)", inode->i_ino, offset, size, ret); } - if (io->iocb) - aio_complete(io->iocb, io->result, 0); - if (io->flag & EXT4_IO_END_DIRECT) - inode_dio_done(inode); /* Wake up anyone waiting on unwritten extent conversion */ if (atomic_dec_and_test(&EXT4_I(inode)->i_unwritten)) wake_up_all(ext4_ioend_wq(io->inode)); Index: linux-2.6/fs/ocfs2/aops.c =================================================================== --- linux-2.6.orig/fs/ocfs2/aops.c 2012-11-06 13:33:43.646660127 +0100 +++ linux-2.6/fs/ocfs2/aops.c 2012-11-21 19:52:05.291001619 +0100 @@ -565,9 +565,7 @@ bail: static void ocfs2_dio_end_io(struct kiocb *iocb, loff_t offset, ssize_t bytes, - void *private, - int ret, - bool is_async) + void *private) { struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode; int level; @@ -592,10 +590,6 @@ static void ocfs2_dio_end_io(struct kioc level = ocfs2_iocb_rw_locked_level(iocb); ocfs2_rw_unlock(inode, level); - - if (is_async) - aio_complete(iocb, ret, 0); - inode_dio_done(inode); } /* Index: linux-2.6/fs/xfs/xfs_aops.c =================================================================== --- linux-2.6.orig/fs/xfs/xfs_aops.c 2012-11-21 10:58:21.356982409 +0100 +++ linux-2.6/fs/xfs/xfs_aops.c 2012-11-21 21:19:50.000000000 +0100 @@ -85,14 +85,6 @@ xfs_destroy_ioend( bh->b_end_io(bh, !ioend->io_error); } - if (ioend->io_iocb) { - if (ioend->io_isasync) { - aio_complete(ioend->io_iocb, ioend->io_error ? - ioend->io_error : ioend->io_result, 0); - } - inode_dio_done(ioend->io_inode); - } - mempool_free(ioend, xfs_ioend_pool); } @@ -290,7 +282,6 @@ xfs_alloc_ioend( * all the I/O from calling the completion routine too early. */ atomic_set(&ioend->io_remaining, 1); - ioend->io_isasync = 0; ioend->io_isdirect = 0; ioend->io_error = 0; ioend->io_list = NULL; @@ -300,8 +291,6 @@ xfs_alloc_ioend( ioend->io_buffer_tail = NULL; ioend->io_offset = 0; ioend->io_size = 0; - ioend->io_iocb = NULL; - ioend->io_result = 0; ioend->io_append_trans = NULL; INIT_WORK(&ioend->io_work, xfs_end_io); @@ -1280,8 +1269,10 @@ __xfs_get_blocks( if (create || !ISUNWRITTEN(&imap)) xfs_map_buffer(inode, bh_result, &imap, offset); if (create && ISUNWRITTEN(&imap)) { - if (direct) + if (direct) { bh_result->b_private = inode; + set_buffer_defer_completion(bh_result); + } set_buffer_unwritten(bh_result); } } @@ -1378,9 +1369,7 @@ xfs_end_io_direct_write( struct kiocb *iocb, loff_t offset, ssize_t size, - void *private, - int ret, - bool is_async) + void *private) { struct xfs_ioend *ioend = iocb->private; @@ -1402,17 +1391,10 @@ xfs_end_io_direct_write( ioend->io_offset = offset; ioend->io_size = size; - ioend->io_iocb = iocb; - ioend->io_result = ret; if (private && size > 0) ioend->io_type = XFS_IO_UNWRITTEN; - if (is_async) { - ioend->io_isasync = 1; - xfs_finish_ioend(ioend); - } else { - xfs_finish_ioend_sync(ioend); - } + xfs_finish_ioend_sync(ioend); } STATIC ssize_t Index: linux-2.6/fs/ext4/ext4.h =================================================================== --- linux-2.6.orig/fs/ext4/ext4.h 2012-11-06 13:33:43.479993461 +0100 +++ linux-2.6/fs/ext4/ext4.h 2012-11-21 21:19:50.000000000 +0100 @@ -142,7 +142,7 @@ struct ext4_allocation_request { */ #define EXT4_MAP_NEW (1 << BH_New) #define EXT4_MAP_MAPPED (1 << BH_Mapped) -#define EXT4_MAP_UNWRITTEN (1 << BH_Unwritten) +#define EXT4_MAP_UNWRITTEN ((1 << BH_Unwritten) | (1 << BH_Defer_Completion)) #define EXT4_MAP_BOUNDARY (1 << BH_Boundary) #define EXT4_MAP_UNINIT (1 << BH_Uninit) /* Sometimes (in the bigalloc case, from ext4_da_get_block_prep) the caller of @@ -185,7 +185,6 @@ struct mpage_da_data { #define EXT4_IO_END_UNWRITTEN 0x0001 #define EXT4_IO_END_ERROR 0x0002 #define EXT4_IO_END_QUEUED 0x0004 -#define EXT4_IO_END_DIRECT 0x0008 struct ext4_io_page { struct page *p_page; @@ -209,8 +208,6 @@ typedef struct ext4_io_end { loff_t offset; /* offset in the file */ ssize_t size; /* size of the extent */ struct work_struct work; /* data work queue */ - struct kiocb *iocb; /* iocb struct for AIO */ - int result; /* error value for AIO */ int num_io_pages; /* for writepages() */ struct ext4_io_page *pages[MAX_IO_PAGES]; /* for writepages() */ } ext4_io_end_t; Index: linux-2.6/fs/xfs/xfs_aops.h =================================================================== --- linux-2.6.orig/fs/xfs/xfs_aops.h 2012-11-06 13:33:43.736660126 +0100 +++ linux-2.6/fs/xfs/xfs_aops.h 2012-11-21 20:11:42.211031753 +0100 @@ -45,7 +45,6 @@ typedef struct xfs_ioend { unsigned int io_type; /* delalloc / unwritten */ int io_error; /* I/O error code */ atomic_t io_remaining; /* hold count */ - unsigned int io_isasync : 1; /* needs aio_complete */ unsigned int io_isdirect : 1;/* direct I/O */ struct inode *io_inode; /* file being written to */ struct buffer_head *io_buffer_head;/* buffer linked list head */ @@ -54,8 +53,6 @@ typedef struct xfs_ioend { xfs_off_t io_offset; /* offset in the file */ struct work_struct io_work; /* xfsdatad work queue */ struct xfs_trans *io_append_trans;/* xact. for size update */ - struct kiocb *io_iocb; - int io_result; } xfs_ioend_t; extern const struct address_space_operations xfs_address_space_operations; Index: linux-2.6/include/linux/buffer_head.h =================================================================== --- linux-2.6.orig/include/linux/buffer_head.h 2012-11-21 21:19:35.227136042 +0100 +++ linux-2.6/include/linux/buffer_head.h 2012-11-21 21:19:50.000000000 +0100 @@ -34,6 +34,7 @@ enum bh_state_bits { BH_Write_EIO, /* I/O error on write */ BH_Unwritten, /* Buffer is allocated on disk but not written */ BH_Quiet, /* Buffer Error Prinks to be quiet */ + BH_Defer_Completion, /* Defer AIO completion to workqueue */ BH_PrivateStart,/* not a state bit, but the first bit available * for private allocation by other entities @@ -124,6 +125,7 @@ BUFFER_FNS(Delay, delay) BUFFER_FNS(Boundary, boundary) BUFFER_FNS(Write_EIO, write_io_error) BUFFER_FNS(Unwritten, unwritten) +BUFFER_FNS(Defer_Completion, defer_completion) #define bh_offset(bh) ((unsigned long)(bh)->b_data & ~PAGE_MASK) #define touch_buffer(bh) mark_page_accessed(bh->b_page) -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html