Re: [PATCH 2/2] ceph: aggregate ceph_sync_read requests

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Feb 25, 2019 at 8:30 PM <xxhdx1985126@xxxxxxxxx> wrote:
>
> From: Xuehan Xu <xuxuehan@xxxxxx>
>
> As for now, concurrent threads may issue concurrent file read reqs
> to MDSes, ignoring the fact that the requested file ranges of some
> reqs may be included by previous issued reqs. This commit make those
> reqs wait for the previous ones to finish, saving the overhead of
> issuing them.
>

I think this change will cause problem in following scenario

client 1                                          client 2
process a read
                                                      process b write,
then ask process c in client 1 to read
process c read



> Signed-off-by: Xuehan Xu <xuxuehan@xxxxxx>
> ---
>  fs/ceph/file.c  | 207 ++++++++++++++++++++++++++++++++++++++++++------
>  fs/ceph/inode.c |   3 +
>  fs/ceph/super.h |  29 ++++++-
>  3 files changed, 215 insertions(+), 24 deletions(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 189df668b6a0..718ee163dce1 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -557,6 +557,92 @@ enum {
>         READ_INLINE =  3,
>  };
>
> +int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op)
> +{
> +       long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout));
> +       if (timeleft > 0)
> +               return op->result;
> +       else
> +               return timeleft ? timeleft : -ETIMEDOUT;
> +}
> +
> +bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                     unsigned long start, unsigned long end,
> +                                     bool* repeated_low_endpoint,
> +                                     struct ceph_aggregated_read_op** ag_op)
> +{
> +       struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end);
> +       bool positive_found = false, negative_found = false;
> +       while (node_p) {
> +               if (node_p->start == start)
> +                       *repeated_low_endpoint = true;
> +               if (node_p->start <= start &&
> +                               node_p->last >= end) {
> +                       positive_found = true;
> +                       break;
> +               }
> +
> +               node_p = interval_tree_iter_next(node_p, start, end);
> +       }
> +
> +       dout("searched positive tree: found: %d\n", positive_found);
> +
> +       if (!positive_found) {
> +               node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment,
> +                               ULONG_MAX - end,
> +                               ULONG_MAX - start);
> +               while (node_p) {
> +                       if (node_p->start <= ULONG_MAX - end &&
> +                                       node_p->last >= ULONG_MAX - start) {
> +                               negative_found = true;
> +                               break;
> +                       }
> +                       node_p = interval_tree_iter_next(node_p,
> +                                       ULONG_MAX - end,
> +                                       ULONG_MAX - start);
> +               }
> +       }
> +
> +       dout("searched negative tree: found: %d\n", negative_found);
> +
> +       if (positive_found)
> +               *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node);
> +       else if (negative_found)
> +               *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node);
> +
> +       return positive_found || negative_found;
> +}
> +
> +void register_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                struct ceph_aggregated_read_op* ag_op,
> +                                bool suppliment)
> +{
> +       if (suppliment) {
> +               interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
> +       } else
> +               interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops);
> +}
> +
> +void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                  struct ceph_aggregated_read_op* ag_op,
> +                                  bool suppliment)
> +{
> +       if (suppliment)
> +               interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
> +       else
> +               interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops);
> +}
> +
> +void ceph_put_aggregated_read_op(struct kref* kref)
> +{
> +       struct ceph_aggregated_read_op* ag_op = container_of(kref,
> +                       struct ceph_aggregated_read_op,
> +                       kref);
> +       if (ag_op->num_pages)
> +               ceph_release_page_vector(ag_op->pages, ag_op->num_pages);
> +       kfree(ag_op);
> +}
> +
>  /*
>   * Completely synchronous read and write methods.  Direct from __user
>   * buffer to osd, or directly to user pages (if O_DIRECT).
> @@ -575,9 +661,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>         struct ceph_inode_info *ci = ceph_inode(inode);
>         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>         struct ceph_osd_client *osdc = &fsc->client->osdc;
> -       ssize_t ret;
> -       u64 off = iocb->ki_pos;
>         u64 len = iov_iter_count(to);
> +       struct page **pages;
> +       u64 off = iocb->ki_pos;
> +       int num_pages;
> +       ssize_t ret;
> +       bool found_previous_req = false;
> +       bool repeated_low_endpoint = false;
> +       bool first_round = true;
> +       struct ceph_aggregated_read_op *ag_op = NULL;
>
>         dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
>              (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
> @@ -595,24 +687,46 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>                 return ret;
>
>         ret = 0;
> -       while ((len = iov_iter_count(to)) > 0) {
> +       if (likely(!iov_iter_is_pipe(to)) && (off + len) < (ULONG_MAX / 2)) {
> +               mutex_lock(&ci->aggregated_ops_lock);
> +               dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %lld.\n", off, len);
> +               found_previous_req = find_previous_aggregated_read_op(ci, off,
> +                               off + len, &repeated_low_endpoint, &ag_op);
> +               if (found_previous_req) {
> +                       dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %lld.\n", off, len);
> +                       kref_get(&ag_op->kref);
> +                       mutex_unlock(&ci->aggregated_ops_lock);
> +                       ret = ceph_wait_for_aggregated_read_op(ag_op);
> +                       dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %lld.\n", off, len);
> +               } else {
> +                       ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL);
> +                       kref_init(&ag_op->kref);
> +                       ag_op->pos_node.start = off;
> +                       ag_op->pos_node.last = off + len;
> +                       ag_op->neg_node.start = ULONG_MAX - off - len;
> +                       ag_op->neg_node.last = ULONG_MAX - off;
> +                       init_completion(&ag_op->comp);
> +                       register_aggregated_read_op(ci, ag_op, repeated_low_endpoint);
> +                       dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %lld.\n", off, len);
> +                       mutex_unlock(&ci->aggregated_ops_lock);
> +               }
> +       }
> +
> +       while (!found_previous_req && (len = iov_iter_count(to)) > 0) {
>                 struct ceph_osd_request *req;
> -               struct page **pages;
> -               int num_pages;
>                 size_t page_off;
>                 u64 i_size;
>                 bool more;
>
>                 req = ceph_osdc_new_request(osdc, &ci->i_layout,
> -                                       ci->i_vino, off, &len, 0, 1,
> -                                       CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
> -                                       NULL, ci->i_truncate_seq,
> -                                       ci->i_truncate_size, false);
> +                               ci->i_vino, off, &len, 0, 1,
> +                               CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
> +                               NULL, ci->i_truncate_seq,
> +                               ci->i_truncate_size, false);
>                 if (IS_ERR(req)) {
>                         ret = PTR_ERR(req);
>                         break;
>                 }
> -
>                 more = len < iov_iter_count(to);
>
>                 if (unlikely(iov_iter_is_pipe(to))) {
> @@ -630,34 +744,51 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>                                 more = false;
>                         }
>                 } else {
> +                       int last_num_pages = num_pages;
>                         num_pages = calc_pages_for(off, len);
>                         page_off = off & ~PAGE_MASK;
> -                       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
> -                       if (IS_ERR(pages)) {
> -                               ceph_osdc_put_request(req);
> -                               ret = PTR_ERR(pages);
> -                               break;
> +
> +                       if (first_round) {
> +                               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
> +                               if (IS_ERR(pages)) {
> +                                       ceph_osdc_put_request(req);
> +                                       ret = PTR_ERR(pages);
> +                                       dout("ceph_sync_read: aggregated read op got err, off: %lld, len: %lld, ret: %ld.\n", off, len, ret);
> +                                       ag_op->result = ret;
> +                                       complete_all(&ag_op->comp);
> +                                       mutex_lock(&ci->aggregated_ops_lock);
> +                                       unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint);
> +                                       mutex_unlock(&ci->aggregated_ops_lock);
> +                                       dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret);
> +                                       kref_put(&ag_op->kref, ceph_put_aggregated_read_op);
> +                                       break;
> +                               }
> +                               ag_op->pages = pages;
> +                               ag_op->num_pages = num_pages;
> +                       } else {
> +                               pages += last_num_pages;
> +                               if (page_off)
> +                                       pages--;
>                         }
>                 }
> -
>                 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
> -                                                false, false);
> +                               false, false);
>                 ret = ceph_osdc_start_request(osdc, req, false);
>                 if (!ret)
>                         ret = ceph_osdc_wait_request(osdc, req);
>                 ceph_osdc_put_request(req);
> -
> +
>                 i_size = i_size_read(inode);
>                 dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
> -                    off, len, ret, i_size, (more ? " MORE" : ""));
> -
> +                               off, len, ret, i_size, (more ? " MORE" : ""));
> +
>                 if (ret == -ENOENT)
>                         ret = 0;
>                 if (ret >= 0 && ret < len && (off + ret < i_size)) {
>                         int zlen = min(len - ret, i_size - off - ret);
>                         int zoff = page_off + ret;
>                         dout("sync_read zero gap %llu~%llu\n",
> -                             off + ret, off + ret + zlen);
> +                                       off + ret, off + ret + zlen);
>                         ceph_zero_page_vector_range(zoff, zlen, pages);
>                         ret += zlen;
>                 }
> @@ -686,12 +817,42 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
>                                         break;
>                                 }
>                         }
> -                       ceph_release_page_vector(pages, num_pages);
>                 }
>
> -               if (ret <= 0 || off >= i_size || !more)
> +               if (ret <= 0 || off >= i_size || !more) {
> +                       dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %lld, ret: %ld.\n", off, len, ret);
> +                       ag_op->result = ret;
> +                       complete_all(&ag_op->comp);
> +                       mutex_lock(&ci->aggregated_ops_lock);
> +                       unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint);
> +                       mutex_unlock(&ci->aggregated_ops_lock);
> +                       dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret);
>                         break;
> +               }
> +               first_round = false;
> +       }
> +
> +       if (found_previous_req) {
> +               int idx = (off >> PAGE_SHIFT) - (ag_op->pos_node.start >> PAGE_SHIFT);
> +               size_t left = ret > 0 ? ret : 0, page_off;
> +               while (left > 0) {
> +                       size_t len, copied;
> +                       page_off = off & ~PAGE_MASK;
> +                       len = min_t(size_t, left, PAGE_SIZE - page_off);
> +                       dout("%s: copy pages after waiting, idx: %d, off: %lld, len: %ld,"
> +                                       " ag_op: %p, ag_op->pages: %p, ag_op->num_pages: %d\n",
> +                                       __func__, idx, off, len, ag_op, ag_op->pages, ag_op->num_pages);
> +                       copied = copy_page_to_iter(ag_op->pages[idx++],
> +                                       page_off, len, to);
> +                       off += copied;
> +                       left -= copied;
> +                       if (copied < len) {
> +                               ret = -EFAULT;
> +                               break;
> +                       }
> +               }
>         }
> +       kref_put(&ag_op->kref, ceph_put_aggregated_read_op);
>
>         if (off > iocb->ki_pos) {
>                 if (ret >= 0 &&
> diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
> index 8705f0645a24..1dfc0afa1dde 100644
> --- a/fs/ceph/inode.c
> +++ b/fs/ceph/inode.c
> @@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
>         spin_lock_init(&ci->i_ceph_lock);
>         mutex_init(&ci->getattrs_inflight_lock);
>         mutex_init(&ci->lookups_inflight_lock);
> +       mutex_init(&ci->aggregated_ops_lock);
>
>         ci->i_version = 0;
>         ci->i_inline_version = 0;
> @@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
>         ci->i_caps = RB_ROOT;
>         ci->getattrs_inflight = RB_ROOT;
>         ci->lookups_inflight = RB_ROOT;
> +       ci->aggregated_read_ops = RB_ROOT_CACHED;
> +       ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED;
>         ci->i_auth_cap = NULL;
>         ci->i_dirty_caps = 0;
>         ci->i_flushing_caps = 0;
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index abf761f2a122..a7ed8deaf836 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -16,6 +16,7 @@
>  #include <linux/slab.h>
>  #include <linux/posix_acl.h>
>  #include <linux/refcount.h>
> +#include <linux/interval_tree.h>
>
>  #include <linux/ceph/libceph.h>
>
> @@ -289,6 +290,31 @@ struct ceph_inode_xattrs_info {
>         u64 version, index_version;
>  };
>
> +struct ceph_aggregated_read_op {
> +       struct kref kref;
> +       struct page** pages;
> +       int num_pages;
> +       unsigned long timeout;
> +       int result;
> +       struct interval_tree_node pos_node, neg_node;
> +       struct completion comp;
> +};
> +
> +extern void ceph_put_aggregated_read_op(struct kref* kref);
> +
> +extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                            unsigned long start, unsigned long end,
> +                                            bool* repeated_low_endpoint,
> +                                            struct ceph_aggregated_read_op** ag_op);
> +
> +extern void register_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                       struct ceph_aggregated_read_op* ag_op,
> +                                       bool suppliment);
> +
> +extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
> +                                         struct ceph_aggregated_read_op* ag_op,
> +                                         bool suppliment);
> +
>  /*
>   * Ceph inode.
>   */
> @@ -296,8 +322,9 @@ struct ceph_inode_info {
>         struct ceph_vino i_vino;   /* ceph ino + snap */
>
>         spinlock_t i_ceph_lock;
> -       struct mutex getattrs_inflight_lock, lookups_inflight_lock;
> +       struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock;
>         struct rb_root getattrs_inflight, lookups_inflight;
> +       struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment;
>
>         u64 i_version;
>         u64 i_inline_version;
> --
> 2.19.1
>



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux