Re: a patch to improve cephfs direct io performance

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Wed, Sep 30, 2015 at 5:40 PM, zhucaifeng <zhucaifeng@xxxxxxxxxxxxxxx> wrote:
> Hi, Yan
>
> iov_iter APIs seems unsuitable for the direct io manipulation below.
> iov_iter APIs
> hide how to iterate over elements, whileas dio_xxx below explicitly control
> over
> the iteration. They conflict with each other in the principle.
>
> The patch for the newest kernel branch is below.
>
> Best Regards
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 8b79d87..3938ac9 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
>
> @@ -34,6 +34,115 @@
>   * need to wait for MDS acknowledgement.
>   */
>
> +/*
> + * Calculate the length sum of direct io vectors that can
> + * be combined into one page vector.
> + */
> +static int
> +dio_get_pagevlen(const struct iov_iter *it)
> +{
> +    const struct iovec *iov = it->iov;
> +    const struct iovec *iovend = iov + it->nr_segs;
> +    int pagevlen;
> +
> +    pagevlen = iov->iov_len - it->iov_offset;
> +    /*
> +     * An iov can be page vectored when both the current tail
> +     * and the next base are page aligned.
> +     */
> +    while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
> +           (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
> +        pagevlen += iov->iov_len;
> +    }
> +    dout("dio_get_pagevlen len = %d\n", pagevlen);
> +    return pagevlen;
> +}
> +
> +/*
> + * Grab @num_pages from the process vm space. These pages are
> + * continuous and start from @data.
> + */
> +static int
> +dio_grab_pages(const void *data, int num_pages, bool write_page,
> +    struct page **pages)
> +{
> +    int got = 0;
> +    int rc = 0;
> +
> +    down_read(&current->mm->mmap_sem);
> +    while (got < num_pages) {
> +        rc = get_user_pages(current, current->mm,
> +            (unsigned long)data + ((unsigned long)got * PAGE_SIZE),
> +            num_pages - got, write_page, 0, pages + got, NULL);
> +        if (rc < 0)
> +            break;
> +        BUG_ON(rc == 0);
> +        got += rc;
> +    }
> +    up_read(&current->mm->mmap_sem);
> +    return got;
> +}
> +
> +static void
> +dio_free_pagev(struct page **pages, int num_pages, bool dirty)
> +{
> +    int i;
> +
> +    for (i = 0; i < num_pages; i++) {
> +        if (dirty)
> +            set_page_dirty_lock(pages[i]);
> +        put_page(pages[i]);
> +    }
> +    kfree(pages);
> +}
> +
> +/*
> + * Allocate a page vector based on (@it, @pagevlen).
> + * The return value is the tuple describing a page vector,
> + * that is (@pages, @pagevlen, @page_align, @num_pages).
> + */
> +static struct page **
> +dio_alloc_pagev(const struct iov_iter *it, int pagevlen, bool write_page,
> +    size_t *page_align, int *num_pages)
> +{
> +    const struct iovec *iov = it->iov;
> +    struct page **pages;
> +    int n, m, k, npages;
> +    int align;
> +    int len;
> +    void *data;
> +
> +    data = iov->iov_base + it->iov_offset;
> +    len = iov->iov_len - it->iov_offset;
> +    align = ((ulong)data) & ~PAGE_MASK;
> +    npages = calc_pages_for((ulong)data, pagevlen);
> +    pages = kmalloc(sizeof(*pages) * npages, GFP_NOFS);
> +    if (!pages)
> +        return ERR_PTR(-ENOMEM);
> +    for (n = 0; n < npages; n += m) {
> +        m = calc_pages_for((ulong)data, len);
> +        if (n + m > npages)
> +            m = npages - n;
> +        k = dio_grab_pages(data, m, write_page, pages + n);
> +        if (k < m) {
> +            n += k;
> +            goto failed;
> +        }
> +
> +        iov++;
> +        data = iov->iov_base;
> +        len = iov->iov_len;
> +    }
> +    *num_pages = npages;
> +    *page_align = align;
> +    dout("dio_alloc_pagev: alloc pages pages[0:%d], page align %d\n",
> +        npages, align);
> +    return pages;
> +
> +failed:
> +    dio_free_pagev(pages, n, false);
> +    return ERR_PTR(-ENOMEM);
> +}
>
>  /*
>   * Prepare an open request.  Preallocate ceph_cap to avoid an
> @@ -462,17 +571,17 @@ static ssize_t ceph_sync_read(struct kiocb *iocb,
> struct iov_iter *i,
>              size_t start;
>              ssize_t n;
>
> -            n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
> -            if (n < 0)
> -                return n;
> -
> -            num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
> +            n = dio_get_pagevlen(i);
> +            pages = dio_alloc_pagev(i, n, true, &start,
> +                &num_pages);
> +            if (IS_ERR(pages))
> +                return PTR_ERR(pages);
>
>              ret = striped_read(inode, off, n,
>                         pages, num_pages, checkeof,
>                         1, start);
>
> -            ceph_put_page_vector(pages, num_pages, true);
> +            dio_free_pagev(pages, num_pages, true);
>
>              if (ret <= 0)
>                  break;
> @@ -596,7 +705,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct
> iov_iter *from, loff_t pos,
>          CEPH_OSD_FLAG_WRITE;
>
>      while (iov_iter_count(from) > 0) {
> -        u64 len = iov_iter_single_seg_count(from);
> +        u64 len = dio_get_pagevlen(from);
>          size_t start;
>          ssize_t n;
>
> @@ -615,14 +724,15 @@ ceph_sync_direct_write(struct kiocb *iocb, struct
> iov_iter *from, loff_t pos,
>
>          osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
>
> -        n = iov_iter_get_pages_alloc(from, &pages, len, &start);
> -        if (unlikely(n < 0)) {
> -            ret = n;
> +        n = len;
> +        pages = dio_alloc_pagev(from, len, false, &start,
> +            &num_pages);
> +        if (IS_ERR(pages)) {
>              ceph_osdc_put_request(req);
> +            ret = PTR_ERR(pages);
>              break;
>          }
>
> -        num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
>          /*
>           * throw out any page cache pages in this range. this
>           * may block.
> @@ -639,8 +749,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct
> iov_iter *from, loff_t pos,
>          if (!ret)
>              ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>
> -        ceph_put_page_vector(pages, num_pages, false);
> -
> +        dio_free_pagev(pages, num_pages, false);
>          ceph_osdc_put_request(req);
>          if (ret)
>              break;
>
>

applied (with a few modification), thanks

Yan, Zheng
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux