2017-03-25 8:12 GMT-07:00 Jeff Layton <jlayton@xxxxxxxxxx>: > On Fri, 2017-02-10 at 15:17 -0800, Pavel Shilovsky wrote: >> Currently the code doesn't recognize asynchronous calls passed >> by io_submit() and processes all calls synchronously. This is not >> what kernel AIO expects. This patch improves this for reading >> by introducing new async context that keeps track of all issued i/o >> requests and moving a response collecting procedure to a separate >> thread. This allows to return to the caller immediately for >> asynchronous calls and call the iocb->ki_complete() once all requests >> are completed. For synchronous calls the current thread simply >> waits until all requests are completed. >> >> This improves reading performance of single threaded applications >> with increasing of i/o queue depth size. >> >> Signed-off-by: Pavel Shilovsky <pshilov@xxxxxxxxxxxxx> > > I would really, really appreciate seeing this patch broken up into > smaller, logical changes. It's very difficult to > >> --- >> fs/cifs/cifsglob.h | 18 ++++ >> fs/cifs/file.c | 273 +++++++++++++++++++++++++++++++++++++++++++++-------- >> 2 files changed, 252 insertions(+), 39 deletions(-) >> >> diff --git a/fs/cifs/cifsglob.h b/fs/cifs/cifsglob.h >> index f9a9a12..80771ca 100644 >> --- a/fs/cifs/cifsglob.h >> +++ b/fs/cifs/cifsglob.h >> @@ -1109,6 +1109,23 @@ struct cifs_io_parms { >> struct cifs_tcon *tcon; >> }; >> >> +struct cifs_aio_ctx { >> + struct kref refcount; >> + struct list_head list; >> + struct mutex aio_mutex; >> + struct completion done; >> + struct iov_iter iter; >> + struct kiocb *iocb; >> + struct cifsFileInfo *cfile; >> + struct page **pages; > ^^^ > Nothing uses the above pointer. > >> + struct bio_vec *bv; >> + unsigned int npages; >> + ssize_t rc; >> + unsigned int len; >> + unsigned int total_len; >> + bool should_dirty; >> +}; >> + >> struct cifs_readdata; >> >> /* asynchronous read support */ >> @@ -1118,6 +1135,7 @@ struct cifs_readdata { >> struct completion done; >> struct cifsFileInfo *cfile; >> struct address_space *mapping; >> + struct cifs_aio_ctx *ctx; >> __u64 offset; >> unsigned int bytes; >> unsigned int got_bytes; >> diff --git a/fs/cifs/file.c b/fs/cifs/file.c >> index 98dc842..6ceeed2 100644 >> --- a/fs/cifs/file.c >> +++ b/fs/cifs/file.c >> @@ -33,6 +33,7 @@ >> #include <linux/mount.h> >> #include <linux/slab.h> >> #include <linux/swap.h> >> +#include <linux/vmalloc.h> >> #include <asm/div64.h> >> #include "cifsfs.h" >> #include "cifspdu.h" >> @@ -2410,6 +2411,108 @@ int cifs_flush(struct file *file, fl_owner_t id) >> return rc; >> } >> >> +static inline struct cifs_aio_ctx * >> +cifs_aio_ctx_alloc(void) >> +{ >> + struct cifs_aio_ctx *ctx; >> + >> + ctx = kzalloc(sizeof(struct cifs_aio_ctx), GFP_KERNEL); >> + if (!ctx) >> + return NULL; >> + >> + INIT_LIST_HEAD(&ctx->list); >> + mutex_init(&ctx->aio_mutex); >> + init_completion(&ctx->done); >> + kref_init(&ctx->refcount); >> + return ctx; >> +} >> + >> +static inline void >> +cifs_aio_ctx_release(struct kref *refcount) >> +{ >> + struct cifs_aio_ctx *ctx = container_of(refcount, >> + struct cifs_aio_ctx, refcount); >> + >> + cifsFileInfo_put(ctx->cfile); >> + kvfree(ctx->bv); >> + kfree(ctx); >> +} >> + >> +static int >> +setup_aio_ctx_iter(struct cifs_aio_ctx *ctx, struct iov_iter *iter, int rw) >> +{ >> + ssize_t rc; >> + unsigned int cur_npages; >> + unsigned int npages = 0; >> + unsigned int i; >> + size_t len; >> + size_t count = iov_iter_count(iter); >> + unsigned int saved_len; >> + size_t start; >> + unsigned int max_pages = iov_iter_npages(iter, INT_MAX); >> + struct page **pages; >> + struct bio_vec *bv; >> + >> + if (iter->type & ITER_KVEC) { >> + memcpy(&ctx->iter, iter, sizeof(struct iov_iter)); >> + ctx->len = count; >> + iov_iter_advance(iter, count); >> + return 0; >> + } >> + >> + bv = kmalloc_array(max_pages, sizeof(struct bio_vec), GFP_KERNEL); >> + if (!bv) { >> + bv = vmalloc(max_pages * sizeof(struct bio_vec)); >> + if (!bv) >> + return -ENOMEM; >> + } >> + >> + saved_len = count; >> + >> + while (count && npages < max_pages) { >> + rc = iov_iter_get_pages_alloc(iter, &pages, count, &start); > > It would be more efficient to allocate a page array of the max length, > and then reuse that array in each call to iov_iter_get_pages -- > especially if contiguous memory is tight and you end up having to use > vmalloc. Do you mean allocating a fixed size array (like 4096 pages) or an array of "max_pages" size? The latter can potentially lead to large allocation and need to fallback to vmalloc (as we have for bio_vec above). The former should work with kmalloc but the code will stuck in the loop for a while for big i/o sizes if we choose the fixed size rather small. > > Also, just a heads up that I sent a patch a couple of months ago to > rework the above function to generate longer page arrays when possible, > but Al didn't care for it. He's supposedly going to rework the above > function into something that grabs a pile of pages and issues a callback > to each of them. Once that goes in, you may want to rework this code to > use that instead. > >> + if (rc < 0) { >> + cifs_dbg(VFS, "couldn't get user pages (rc=%zd)\n", rc); >> + break; >> + } >> + > >> + if (rc > count) { >> + cifs_dbg(VFS, "get pages rc=%zd more than %zu\n", rc, >> + count); >> + break; >> + } >> + >> + iov_iter_advance(iter, rc); >> + count -= rc; >> + rc += start; >> + cur_npages = (rc + PAGE_SIZE - 1) / PAGE_SIZE; > > cur_npages = DIV_ROUND_UP(rc, PAGE_SIZE); > >> + >> + if (npages + cur_npages > max_pages) { >> + cifs_dbg(VFS, "out of vec array capacity (%u vs %u)\n", >> + npages + cur_npages, max_pages); >> + break; >> + } >> + >> + for (i = 0; i < cur_npages; i++) { >> + len = rc > PAGE_SIZE ? PAGE_SIZE : rc; >> + bv[npages + i].bv_page = pages[i]; >> + bv[npages + i].bv_offset = start; >> + bv[npages + i].bv_len = len - start; >> + rc -= len; >> + start = 0; >> + } >> + >> + npages += cur_npages; >> + kvfree(pages); >> + } >> + >> + ctx->bv = bv; >> + ctx->len = saved_len - count; >> + ctx->npages = npages; >> + iov_iter_bvec(&ctx->iter, ITER_BVEC | rw, ctx->bv, npages, ctx->len); >> + return 0; >> +} >> + >> static int >> cifs_write_allocate_pages(struct page **pages, unsigned long num_pages) >> { >> @@ -2859,6 +2962,7 @@ cifs_uncached_readdata_release(struct kref *refcount) >> struct cifs_readdata, refcount); >> unsigned int i; >> >> + kref_put(&rdata->ctx->refcount, cifs_aio_ctx_release); >> for (i = 0; i < rdata->nr_pages; i++) { >> put_page(rdata->pages[i]); >> rdata->pages[i] = NULL; >> @@ -2900,6 +3004,8 @@ cifs_readdata_to_iov(struct cifs_readdata *rdata, struct iov_iter *iter) >> return remaining ? -EFAULT : 0; >> } >> >> +static void collect_uncached_read_data(struct cifs_aio_ctx *ctx); >> + >> static void >> cifs_uncached_readv_complete(struct work_struct *work) >> { >> @@ -2907,6 +3013,8 @@ cifs_uncached_readv_complete(struct work_struct *work) >> struct cifs_readdata, work); >> >> complete(&rdata->done); >> + collect_uncached_read_data(rdata->ctx); >> + /* the below call can possibly free the last ref to aio ctx */ >> kref_put(&rdata->refcount, cifs_uncached_readdata_release); >> } >> >> @@ -2973,7 +3081,8 @@ cifs_uncached_copy_into_pages(struct TCP_Server_Info *server, >> >> static int >> cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file, >> - struct cifs_sb_info *cifs_sb, struct list_head *rdata_list) >> + struct cifs_sb_info *cifs_sb, struct list_head *rdata_list, >> + struct cifs_aio_ctx *ctx) >> { >> struct cifs_readdata *rdata; >> unsigned int npages, rsize, credits; >> @@ -3020,6 +3129,8 @@ cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file, >> rdata->read_into_pages = cifs_uncached_read_into_pages; >> rdata->copy_into_pages = cifs_uncached_copy_into_pages; >> rdata->credits = credits; >> + rdata->ctx = ctx; >> + kref_get(&ctx->refcount); >> >> if (!rdata->cfile->invalidHandle || >> !cifs_reopen_file(rdata->cfile, true)) >> @@ -3042,50 +3153,37 @@ cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file, >> return rc; >> } >> >> -ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to) >> +static void >> +collect_uncached_read_data(struct cifs_aio_ctx *ctx) >> { >> - struct file *file = iocb->ki_filp; >> - ssize_t rc; >> - size_t len; >> - ssize_t total_read = 0; >> - loff_t offset = iocb->ki_pos; >> + struct cifs_readdata *rdata, *tmp; >> + struct iov_iter *to = &ctx->iter; >> struct cifs_sb_info *cifs_sb; >> struct cifs_tcon *tcon; >> - struct cifsFileInfo *open_file; >> - struct cifs_readdata *rdata, *tmp; >> - struct list_head rdata_list; >> - >> - len = iov_iter_count(to); >> - if (!len) >> - return 0; >> - >> - INIT_LIST_HEAD(&rdata_list); >> - cifs_sb = CIFS_FILE_SB(file); >> - open_file = file->private_data; >> - tcon = tlink_tcon(open_file->tlink); >> - >> - if (!tcon->ses->server->ops->async_readv) >> - return -ENOSYS; >> + unsigned int i; >> + int rc; >> >> - if ((file->f_flags & O_ACCMODE) == O_WRONLY) >> - cifs_dbg(FYI, "attempting read on write only file instance\n"); >> + tcon = tlink_tcon(ctx->cfile->tlink); >> + cifs_sb = CIFS_SB(ctx->cfile->dentry->d_sb); >> >> - rc = cifs_send_async_read(offset, len, open_file, cifs_sb, &rdata_list); >> + mutex_lock(&ctx->aio_mutex); >> >> - /* if at least one read request send succeeded, then reset rc */ >> - if (!list_empty(&rdata_list)) >> - rc = 0; >> + if (list_empty(&ctx->list)) { >> + mutex_unlock(&ctx->aio_mutex); >> + return; >> + } >> >> - len = iov_iter_count(to); >> + rc = ctx->rc; >> /* the loop below should proceed in the order of increasing offsets */ >> again: >> - list_for_each_entry_safe(rdata, tmp, &rdata_list, list) { >> + list_for_each_entry_safe(rdata, tmp, &ctx->list, list) { >> if (!rc) { >> - /* FIXME: freezable sleep too? */ >> - rc = wait_for_completion_killable(&rdata->done); >> - if (rc) >> - rc = -EINTR; >> - else if (rdata->result == -EAGAIN) { >> + if (!try_wait_for_completion(&rdata->done)) { >> + mutex_unlock(&ctx->aio_mutex); >> + return; >> + } >> + >> + if (rdata->result == -EAGAIN) { >> /* resend call if it's a retryable error */ >> struct list_head tmp_list; >> unsigned int got_bytes = rdata->got_bytes; >> @@ -3111,9 +3209,9 @@ ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to) >> rdata->offset + got_bytes, >> rdata->bytes - got_bytes, >> rdata->cfile, cifs_sb, >> - &tmp_list); >> + &tmp_list, ctx); >> >> - list_splice(&tmp_list, &rdata_list); >> + list_splice(&tmp_list, &ctx->list); >> >> kref_put(&rdata->refcount, >> cifs_uncached_readdata_release); >> @@ -3131,14 +3229,111 @@ ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to) >> kref_put(&rdata->refcount, cifs_uncached_readdata_release); >> } >> >> - total_read = len - iov_iter_count(to); >> + for (i = 0; i < ctx->npages; i++) { >> + if (ctx->should_dirty) >> + set_page_dirty(ctx->bv[i].bv_page); >> + put_page(ctx->bv[i].bv_page); >> + } >> + >> + ctx->total_len = ctx->len - iov_iter_count(to); >> >> - cifs_stats_bytes_read(tcon, total_read); >> + cifs_stats_bytes_read(tcon, ctx->total_len); >> >> /* mask nodata case */ >> if (rc == -ENODATA) >> rc = 0; >> >> + ctx->rc = (rc == 0) ? ctx->total_len : rc; >> + >> + mutex_unlock(&ctx->aio_mutex); >> + >> + if (ctx->iocb && ctx->iocb->ki_complete) >> + ctx->iocb->ki_complete(ctx->iocb, ctx->rc, 0); >> + else >> + complete(&ctx->done); >> +} >> + >> +ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to) >> +{ >> + struct file *file = iocb->ki_filp; >> + ssize_t rc; >> + size_t len; >> + ssize_t total_read = 0; >> + loff_t offset = iocb->ki_pos; >> + struct cifs_sb_info *cifs_sb; >> + struct cifs_tcon *tcon; >> + struct cifsFileInfo *cfile; >> + struct cifs_aio_ctx *ctx; >> + >> + len = iov_iter_count(to); >> + if (!len) >> + return 0; >> + >> + cifs_sb = CIFS_FILE_SB(file); >> + cfile = file->private_data; >> + tcon = tlink_tcon(cfile->tlink); >> + >> + if (!tcon->ses->server->ops->async_readv) >> + return -ENOSYS; >> + >> + if ((file->f_flags & O_ACCMODE) == O_WRONLY) >> + cifs_dbg(FYI, "attempting read on write only file instance\n"); >> + >> + ctx = cifs_aio_ctx_alloc(); >> + if (!ctx) >> + return -ENOMEM; >> + >> + ctx->cfile = cifsFileInfo_get(cfile); >> + >> + if (!is_sync_kiocb(iocb)) >> + ctx->iocb = iocb; >> + >> + if (to->type & ITER_IOVEC) >> + ctx->should_dirty = true; >> + >> + rc = setup_aio_ctx_iter(ctx, to, READ); >> + if (rc) { >> + kref_put(&ctx->refcount, cifs_aio_ctx_release); >> + return rc; >> + } >> + >> + len = ctx->len; >> + >> + /* grab a lock here due to read response handlers can access ctx */ >> + mutex_lock(&ctx->aio_mutex); >> + >> + rc = cifs_send_async_read(offset, len, cfile, cifs_sb, &ctx->list, ctx); >> + >> + /* if at least one read request send succeeded, then reset rc */ >> + if (!list_empty(&ctx->list)) >> + rc = 0; >> + >> + mutex_unlock(&ctx->aio_mutex); >> + >> + if (rc) { >> + kref_put(&ctx->refcount, cifs_aio_ctx_release); >> + return rc; >> + } >> + >> + if (!is_sync_kiocb(iocb)) { >> + kref_put(&ctx->refcount, cifs_aio_ctx_release); >> + return -EIOCBQUEUED; >> + } >> + >> + /* FIXME: freezable sleep too? */ > > I think you can remove the above comment, and this should probably be a > TASK_INTERRUPTIBLE sleep. > >> + rc = wait_for_completion_killable(&ctx->done); >> + if (rc) { >> + mutex_lock(&ctx->aio_mutex); >> + ctx->rc = rc = -EINTR; >> + total_read = ctx->total_len; >> + mutex_unlock(&ctx->aio_mutex); >> + } else { >> + rc = ctx->rc; >> + total_read = ctx->total_len; >> + } >> + >> + kref_put(&ctx->refcount, cifs_aio_ctx_release); >> + >> if (total_read) { >> iocb->ki_pos += total_read; >> return total_read; > > Other than the above, this looks pretty reasonable. Nice work! > -- > Jeff Layton <jlayton@xxxxxxxxxx> Thanks. I will move the new infrastructure to a separate patch with your suggestions. -- Best regards, Pavel Shilovsky -- To unsubscribe from this list: send the line "unsubscribe linux-cifs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html