Re: [PATCH 1/2] CIFS: Add asynchronous read support through kernel AIO

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



2017-03-25 8:12 GMT-07:00 Jeff Layton <jlayton@xxxxxxxxxx>:
> On Fri, 2017-02-10 at 15:17 -0800, Pavel Shilovsky wrote:
>> Currently the code doesn't recognize asynchronous calls passed
>> by io_submit() and processes all calls synchronously. This is not
>> what kernel AIO expects. This patch improves this for reading
>> by introducing new async context that keeps track of all issued i/o
>> requests and moving a response collecting procedure to a separate
>> thread. This allows to return to the caller immediately for
>> asynchronous calls and call the iocb->ki_complete() once all requests
>> are completed. For synchronous calls the current thread simply
>> waits until all requests are completed.
>>
>> This improves reading performance of single threaded applications
>> with increasing of i/o queue depth size.
>>
>> Signed-off-by: Pavel Shilovsky <pshilov@xxxxxxxxxxxxx>
>
> I would really, really appreciate seeing this patch broken up into
> smaller, logical changes. It's very difficult to
>
>> ---
>>  fs/cifs/cifsglob.h |  18 ++++
>>  fs/cifs/file.c     | 273 +++++++++++++++++++++++++++++++++++++++++++++--------
>>  2 files changed, 252 insertions(+), 39 deletions(-)
>>
>> diff --git a/fs/cifs/cifsglob.h b/fs/cifs/cifsglob.h
>> index f9a9a12..80771ca 100644
>> --- a/fs/cifs/cifsglob.h
>> +++ b/fs/cifs/cifsglob.h
>> @@ -1109,6 +1109,23 @@ struct cifs_io_parms {
>>       struct cifs_tcon *tcon;
>>  };
>>
>> +struct cifs_aio_ctx {
>> +     struct kref             refcount;
>> +     struct list_head        list;
>> +     struct mutex            aio_mutex;
>> +     struct completion       done;
>> +     struct iov_iter         iter;
>> +     struct kiocb            *iocb;
>> +     struct cifsFileInfo     *cfile;
>> +     struct page             **pages;
> ^^^
> Nothing uses the above pointer.
>
>> +     struct bio_vec          *bv;
>> +     unsigned int            npages;
>> +     ssize_t                 rc;
>> +     unsigned int            len;
>> +     unsigned int            total_len;
>> +     bool                    should_dirty;
>> +};
>> +
>>  struct cifs_readdata;
>>
>>  /* asynchronous read support */
>> @@ -1118,6 +1135,7 @@ struct cifs_readdata {
>>       struct completion               done;
>>       struct cifsFileInfo             *cfile;
>>       struct address_space            *mapping;
>> +     struct cifs_aio_ctx             *ctx;
>>       __u64                           offset;
>>       unsigned int                    bytes;
>>       unsigned int                    got_bytes;
>> diff --git a/fs/cifs/file.c b/fs/cifs/file.c
>> index 98dc842..6ceeed2 100644
>> --- a/fs/cifs/file.c
>> +++ b/fs/cifs/file.c
>> @@ -33,6 +33,7 @@
>>  #include <linux/mount.h>
>>  #include <linux/slab.h>
>>  #include <linux/swap.h>
>> +#include <linux/vmalloc.h>
>>  #include <asm/div64.h>
>>  #include "cifsfs.h"
>>  #include "cifspdu.h"
>> @@ -2410,6 +2411,108 @@ int cifs_flush(struct file *file, fl_owner_t id)
>>       return rc;
>>  }
>>
>> +static inline struct cifs_aio_ctx *
>> +cifs_aio_ctx_alloc(void)
>> +{
>> +     struct cifs_aio_ctx *ctx;
>> +
>> +     ctx = kzalloc(sizeof(struct cifs_aio_ctx), GFP_KERNEL);
>> +     if (!ctx)
>> +             return NULL;
>> +
>> +     INIT_LIST_HEAD(&ctx->list);
>> +     mutex_init(&ctx->aio_mutex);
>> +     init_completion(&ctx->done);
>> +     kref_init(&ctx->refcount);
>> +     return ctx;
>> +}
>> +
>> +static inline void
>> +cifs_aio_ctx_release(struct kref *refcount)
>> +{
>> +     struct cifs_aio_ctx *ctx = container_of(refcount,
>> +                                     struct cifs_aio_ctx, refcount);
>> +
>> +     cifsFileInfo_put(ctx->cfile);
>> +     kvfree(ctx->bv);
>> +     kfree(ctx);
>> +}
>> +
>> +static int
>> +setup_aio_ctx_iter(struct cifs_aio_ctx *ctx, struct iov_iter *iter, int rw)
>> +{
>> +     ssize_t rc;
>> +     unsigned int cur_npages;
>> +     unsigned int npages = 0;
>> +     unsigned int i;
>> +     size_t len;
>> +     size_t count = iov_iter_count(iter);
>> +     unsigned int saved_len;
>> +     size_t start;
>> +     unsigned int max_pages = iov_iter_npages(iter, INT_MAX);
>> +     struct page **pages;
>> +     struct bio_vec *bv;
>> +
>> +     if (iter->type & ITER_KVEC) {
>> +             memcpy(&ctx->iter, iter, sizeof(struct iov_iter));
>> +             ctx->len = count;
>> +             iov_iter_advance(iter, count);
>> +             return 0;
>> +     }
>> +
>> +     bv = kmalloc_array(max_pages, sizeof(struct bio_vec), GFP_KERNEL);
>> +     if (!bv) {
>> +             bv = vmalloc(max_pages * sizeof(struct bio_vec));
>> +             if (!bv)
>> +                     return -ENOMEM;
>> +     }
>> +
>> +     saved_len = count;
>> +
>> +     while (count && npages < max_pages) {
>> +             rc = iov_iter_get_pages_alloc(iter, &pages, count, &start);
>
> It would be more efficient to allocate a page array of the max length,
> and then reuse that array in each call to iov_iter_get_pages --
> especially if contiguous memory is tight and you end up having to use
> vmalloc.

Do you mean allocating a fixed size array (like 4096 pages) or an
array of "max_pages" size? The latter can potentially lead to large
allocation and need to fallback to vmalloc (as we have for bio_vec
above). The former should work with kmalloc but the code will stuck in
the loop for a while for big i/o sizes if we choose the fixed size
rather small.

>
> Also, just a heads up that I sent a patch a couple of months ago to
> rework the above function to generate longer page arrays when possible,
> but Al didn't care for it. He's supposedly going to rework the above
> function into something that grabs a pile of pages and issues a callback
> to each of them. Once that goes in, you may want to rework this code to
> use that instead.
>
>> +             if (rc < 0) {
>> +                     cifs_dbg(VFS, "couldn't get user pages (rc=%zd)\n", rc);
>> +                     break;
>> +             }
>> +
>
>> +             if (rc > count) {
>> +                     cifs_dbg(VFS, "get pages rc=%zd more than %zu\n", rc,
>> +                              count);
>> +                     break;
>> +             }
>> +
>> +             iov_iter_advance(iter, rc);
>> +             count -= rc;
>> +             rc += start;
>> +             cur_npages = (rc + PAGE_SIZE - 1) / PAGE_SIZE;
>
> cur_npages = DIV_ROUND_UP(rc, PAGE_SIZE);
>
>> +
>> +             if (npages + cur_npages > max_pages) {
>> +                     cifs_dbg(VFS, "out of vec array capacity (%u vs %u)\n",
>> +                              npages + cur_npages, max_pages);
>> +                     break;
>> +             }
>> +
>> +             for (i = 0; i < cur_npages; i++) {
>> +                     len = rc > PAGE_SIZE ? PAGE_SIZE : rc;
>> +                     bv[npages + i].bv_page = pages[i];
>> +                     bv[npages + i].bv_offset = start;
>> +                     bv[npages + i].bv_len = len - start;
>> +                     rc -= len;
>> +                     start = 0;
>> +             }
>> +
>> +             npages += cur_npages;
>> +             kvfree(pages);
>> +     }
>> +
>> +     ctx->bv = bv;
>> +     ctx->len = saved_len - count;
>> +     ctx->npages = npages;
>> +     iov_iter_bvec(&ctx->iter, ITER_BVEC | rw, ctx->bv, npages, ctx->len);
>> +     return 0;
>> +}
>> +
>>  static int
>>  cifs_write_allocate_pages(struct page **pages, unsigned long num_pages)
>>  {
>> @@ -2859,6 +2962,7 @@ cifs_uncached_readdata_release(struct kref *refcount)
>>                                       struct cifs_readdata, refcount);
>>       unsigned int i;
>>
>> +     kref_put(&rdata->ctx->refcount, cifs_aio_ctx_release);
>>       for (i = 0; i < rdata->nr_pages; i++) {
>>               put_page(rdata->pages[i]);
>>               rdata->pages[i] = NULL;
>> @@ -2900,6 +3004,8 @@ cifs_readdata_to_iov(struct cifs_readdata *rdata, struct iov_iter *iter)
>>       return remaining ? -EFAULT : 0;
>>  }
>>
>> +static void collect_uncached_read_data(struct cifs_aio_ctx *ctx);
>> +
>>  static void
>>  cifs_uncached_readv_complete(struct work_struct *work)
>>  {
>> @@ -2907,6 +3013,8 @@ cifs_uncached_readv_complete(struct work_struct *work)
>>                                               struct cifs_readdata, work);
>>
>>       complete(&rdata->done);
>> +     collect_uncached_read_data(rdata->ctx);
>> +     /* the below call can possibly free the last ref to aio ctx */
>>       kref_put(&rdata->refcount, cifs_uncached_readdata_release);
>>  }
>>
>> @@ -2973,7 +3081,8 @@ cifs_uncached_copy_into_pages(struct TCP_Server_Info *server,
>>
>>  static int
>>  cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file,
>> -                  struct cifs_sb_info *cifs_sb, struct list_head *rdata_list)
>> +                  struct cifs_sb_info *cifs_sb, struct list_head *rdata_list,
>> +                  struct cifs_aio_ctx *ctx)
>>  {
>>       struct cifs_readdata *rdata;
>>       unsigned int npages, rsize, credits;
>> @@ -3020,6 +3129,8 @@ cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file,
>>               rdata->read_into_pages = cifs_uncached_read_into_pages;
>>               rdata->copy_into_pages = cifs_uncached_copy_into_pages;
>>               rdata->credits = credits;
>> +             rdata->ctx = ctx;
>> +             kref_get(&ctx->refcount);
>>
>>               if (!rdata->cfile->invalidHandle ||
>>                   !cifs_reopen_file(rdata->cfile, true))
>> @@ -3042,50 +3153,37 @@ cifs_send_async_read(loff_t offset, size_t len, struct cifsFileInfo *open_file,
>>       return rc;
>>  }
>>
>> -ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to)
>> +static void
>> +collect_uncached_read_data(struct cifs_aio_ctx *ctx)
>>  {
>> -     struct file *file = iocb->ki_filp;
>> -     ssize_t rc;
>> -     size_t len;
>> -     ssize_t total_read = 0;
>> -     loff_t offset = iocb->ki_pos;
>> +     struct cifs_readdata *rdata, *tmp;
>> +     struct iov_iter *to = &ctx->iter;
>>       struct cifs_sb_info *cifs_sb;
>>       struct cifs_tcon *tcon;
>> -     struct cifsFileInfo *open_file;
>> -     struct cifs_readdata *rdata, *tmp;
>> -     struct list_head rdata_list;
>> -
>> -     len = iov_iter_count(to);
>> -     if (!len)
>> -             return 0;
>> -
>> -     INIT_LIST_HEAD(&rdata_list);
>> -     cifs_sb = CIFS_FILE_SB(file);
>> -     open_file = file->private_data;
>> -     tcon = tlink_tcon(open_file->tlink);
>> -
>> -     if (!tcon->ses->server->ops->async_readv)
>> -             return -ENOSYS;
>> +     unsigned int i;
>> +     int rc;
>>
>> -     if ((file->f_flags & O_ACCMODE) == O_WRONLY)
>> -             cifs_dbg(FYI, "attempting read on write only file instance\n");
>> +     tcon = tlink_tcon(ctx->cfile->tlink);
>> +     cifs_sb = CIFS_SB(ctx->cfile->dentry->d_sb);
>>
>> -     rc = cifs_send_async_read(offset, len, open_file, cifs_sb, &rdata_list);
>> +     mutex_lock(&ctx->aio_mutex);
>>
>> -     /* if at least one read request send succeeded, then reset rc */
>> -     if (!list_empty(&rdata_list))
>> -             rc = 0;
>> +     if (list_empty(&ctx->list)) {
>> +             mutex_unlock(&ctx->aio_mutex);
>> +             return;
>> +     }
>>
>> -     len = iov_iter_count(to);
>> +     rc = ctx->rc;
>>       /* the loop below should proceed in the order of increasing offsets */
>>  again:
>> -     list_for_each_entry_safe(rdata, tmp, &rdata_list, list) {
>> +     list_for_each_entry_safe(rdata, tmp, &ctx->list, list) {
>>               if (!rc) {
>> -                     /* FIXME: freezable sleep too? */
>> -                     rc = wait_for_completion_killable(&rdata->done);
>> -                     if (rc)
>> -                             rc = -EINTR;
>> -                     else if (rdata->result == -EAGAIN) {
>> +                     if (!try_wait_for_completion(&rdata->done)) {
>> +                             mutex_unlock(&ctx->aio_mutex);
>> +                             return;
>> +                     }
>> +
>> +                     if (rdata->result == -EAGAIN) {
>>                               /* resend call if it's a retryable error */
>>                               struct list_head tmp_list;
>>                               unsigned int got_bytes = rdata->got_bytes;
>> @@ -3111,9 +3209,9 @@ ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to)
>>                                               rdata->offset + got_bytes,
>>                                               rdata->bytes - got_bytes,
>>                                               rdata->cfile, cifs_sb,
>> -                                             &tmp_list);
>> +                                             &tmp_list, ctx);
>>
>> -                             list_splice(&tmp_list, &rdata_list);
>> +                             list_splice(&tmp_list, &ctx->list);
>>
>>                               kref_put(&rdata->refcount,
>>                                        cifs_uncached_readdata_release);
>> @@ -3131,14 +3229,111 @@ ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to)
>>               kref_put(&rdata->refcount, cifs_uncached_readdata_release);
>>       }
>>
>> -     total_read = len - iov_iter_count(to);
>> +     for (i = 0; i < ctx->npages; i++) {
>> +             if (ctx->should_dirty)
>> +                     set_page_dirty(ctx->bv[i].bv_page);
>> +             put_page(ctx->bv[i].bv_page);
>> +     }
>> +
>> +     ctx->total_len = ctx->len - iov_iter_count(to);
>>
>> -     cifs_stats_bytes_read(tcon, total_read);
>> +     cifs_stats_bytes_read(tcon, ctx->total_len);
>>
>>       /* mask nodata case */
>>       if (rc == -ENODATA)
>>               rc = 0;
>>
>> +     ctx->rc = (rc == 0) ? ctx->total_len : rc;
>> +
>> +     mutex_unlock(&ctx->aio_mutex);
>> +
>> +     if (ctx->iocb && ctx->iocb->ki_complete)
>> +             ctx->iocb->ki_complete(ctx->iocb, ctx->rc, 0);
>> +     else
>> +             complete(&ctx->done);
>> +}
>> +
>> +ssize_t cifs_user_readv(struct kiocb *iocb, struct iov_iter *to)
>> +{
>> +     struct file *file = iocb->ki_filp;
>> +     ssize_t rc;
>> +     size_t len;
>> +     ssize_t total_read = 0;
>> +     loff_t offset = iocb->ki_pos;
>> +     struct cifs_sb_info *cifs_sb;
>> +     struct cifs_tcon *tcon;
>> +     struct cifsFileInfo *cfile;
>> +     struct cifs_aio_ctx *ctx;
>> +
>> +     len = iov_iter_count(to);
>> +     if (!len)
>> +             return 0;
>> +
>> +     cifs_sb = CIFS_FILE_SB(file);
>> +     cfile = file->private_data;
>> +     tcon = tlink_tcon(cfile->tlink);
>> +
>> +     if (!tcon->ses->server->ops->async_readv)
>> +             return -ENOSYS;
>> +
>> +     if ((file->f_flags & O_ACCMODE) == O_WRONLY)
>> +             cifs_dbg(FYI, "attempting read on write only file instance\n");
>> +
>> +     ctx = cifs_aio_ctx_alloc();
>> +     if (!ctx)
>> +             return -ENOMEM;
>> +
>> +     ctx->cfile = cifsFileInfo_get(cfile);
>> +
>> +     if (!is_sync_kiocb(iocb))
>> +             ctx->iocb = iocb;
>> +
>> +     if (to->type & ITER_IOVEC)
>> +             ctx->should_dirty = true;
>> +
>> +     rc = setup_aio_ctx_iter(ctx, to, READ);
>> +     if (rc) {
>> +             kref_put(&ctx->refcount, cifs_aio_ctx_release);
>> +             return rc;
>> +     }
>> +
>> +     len = ctx->len;
>> +
>> +     /* grab a lock here due to read response handlers can access ctx */
>> +     mutex_lock(&ctx->aio_mutex);
>> +
>> +     rc = cifs_send_async_read(offset, len, cfile, cifs_sb, &ctx->list, ctx);
>> +
>> +     /* if at least one read request send succeeded, then reset rc */
>> +     if (!list_empty(&ctx->list))
>> +             rc = 0;
>> +
>> +     mutex_unlock(&ctx->aio_mutex);
>> +
>> +     if (rc) {
>> +             kref_put(&ctx->refcount, cifs_aio_ctx_release);
>> +             return rc;
>> +     }
>> +
>> +     if (!is_sync_kiocb(iocb)) {
>> +             kref_put(&ctx->refcount, cifs_aio_ctx_release);
>> +             return -EIOCBQUEUED;
>> +     }
>> +
>> +     /* FIXME: freezable sleep too? */
>
> I think you can remove the above comment, and this should probably be a
> TASK_INTERRUPTIBLE sleep.
>
>> +     rc = wait_for_completion_killable(&ctx->done);
>> +     if (rc) {
>> +             mutex_lock(&ctx->aio_mutex);
>> +             ctx->rc = rc = -EINTR;
>> +             total_read = ctx->total_len;
>> +             mutex_unlock(&ctx->aio_mutex);
>> +     } else {
>> +             rc = ctx->rc;
>> +             total_read = ctx->total_len;
>> +     }
>> +
>> +     kref_put(&ctx->refcount, cifs_aio_ctx_release);
>> +
>>       if (total_read) {
>>               iocb->ki_pos += total_read;
>>               return total_read;
>
> Other than the above, this looks pretty reasonable. Nice work!
> --
> Jeff Layton <jlayton@xxxxxxxxxx>

Thanks. I will move the new infrastructure to a separate patch with
your suggestions.

--
Best regards,
Pavel Shilovsky
--
To unsubscribe from this list: send the line "unsubscribe linux-cifs" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux