On Mon, Feb 25, 2019 at 8:30 PM <xxhdx1985126@xxxxxxxxx> wrote: > > From: Xuehan Xu <xuxuehan@xxxxxx> > > As for now, concurrent threads may issue concurrent file read reqs > to MDSes, ignoring the fact that the requested file ranges of some > reqs may be included by previous issued reqs. This commit make those > reqs wait for the previous ones to finish, saving the overhead of > issuing them. > I think this change will cause problem in following scenario client 1 client 2 process a read process b write, then ask process c in client 1 to read process c read > Signed-off-by: Xuehan Xu <xuxuehan@xxxxxx> > --- > fs/ceph/file.c | 207 ++++++++++++++++++++++++++++++++++++++++++------ > fs/ceph/inode.c | 3 + > fs/ceph/super.h | 29 ++++++- > 3 files changed, 215 insertions(+), 24 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 189df668b6a0..718ee163dce1 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -557,6 +557,92 @@ enum { > READ_INLINE = 3, > }; > > +int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op) > +{ > + long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout)); > + if (timeleft > 0) > + return op->result; > + else > + return timeleft ? timeleft : -ETIMEDOUT; > +} > + > +bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, > + unsigned long start, unsigned long end, > + bool* repeated_low_endpoint, > + struct ceph_aggregated_read_op** ag_op) > +{ > + struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end); > + bool positive_found = false, negative_found = false; > + while (node_p) { > + if (node_p->start == start) > + *repeated_low_endpoint = true; > + if (node_p->start <= start && > + node_p->last >= end) { > + positive_found = true; > + break; > + } > + > + node_p = interval_tree_iter_next(node_p, start, end); > + } > + > + dout("searched positive tree: found: %d\n", positive_found); > + > + if (!positive_found) { > + node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment, > + ULONG_MAX - end, > + ULONG_MAX - start); > + while (node_p) { > + if (node_p->start <= ULONG_MAX - end && > + node_p->last >= ULONG_MAX - start) { > + negative_found = true; > + break; > + } > + node_p = interval_tree_iter_next(node_p, > + ULONG_MAX - end, > + ULONG_MAX - start); > + } > + } > + > + dout("searched negative tree: found: %d\n", negative_found); > + > + if (positive_found) > + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node); > + else if (negative_found) > + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node); > + > + return positive_found || negative_found; > +} > + > +void register_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment) > +{ > + if (suppliment) { > + interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); > + } else > + interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops); > +} > + > +void unregister_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment) > +{ > + if (suppliment) > + interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); > + else > + interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops); > +} > + > +void ceph_put_aggregated_read_op(struct kref* kref) > +{ > + struct ceph_aggregated_read_op* ag_op = container_of(kref, > + struct ceph_aggregated_read_op, > + kref); > + if (ag_op->num_pages) > + ceph_release_page_vector(ag_op->pages, ag_op->num_pages); > + kfree(ag_op); > +} > + > /* > * Completely synchronous read and write methods. Direct from __user > * buffer to osd, or directly to user pages (if O_DIRECT). > @@ -575,9 +661,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > struct ceph_inode_info *ci = ceph_inode(inode); > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_osd_client *osdc = &fsc->client->osdc; > - ssize_t ret; > - u64 off = iocb->ki_pos; > u64 len = iov_iter_count(to); > + struct page **pages; > + u64 off = iocb->ki_pos; > + int num_pages; > + ssize_t ret; > + bool found_previous_req = false; > + bool repeated_low_endpoint = false; > + bool first_round = true; > + struct ceph_aggregated_read_op *ag_op = NULL; > > dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, > (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); > @@ -595,24 +687,46 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > return ret; > > ret = 0; > - while ((len = iov_iter_count(to)) > 0) { > + if (likely(!iov_iter_is_pipe(to)) && (off + len) < (ULONG_MAX / 2)) { > + mutex_lock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %lld.\n", off, len); > + found_previous_req = find_previous_aggregated_read_op(ci, off, > + off + len, &repeated_low_endpoint, &ag_op); > + if (found_previous_req) { > + dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %lld.\n", off, len); > + kref_get(&ag_op->kref); > + mutex_unlock(&ci->aggregated_ops_lock); > + ret = ceph_wait_for_aggregated_read_op(ag_op); > + dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %lld.\n", off, len); > + } else { > + ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL); > + kref_init(&ag_op->kref); > + ag_op->pos_node.start = off; > + ag_op->pos_node.last = off + len; > + ag_op->neg_node.start = ULONG_MAX - off - len; > + ag_op->neg_node.last = ULONG_MAX - off; > + init_completion(&ag_op->comp); > + register_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %lld.\n", off, len); > + mutex_unlock(&ci->aggregated_ops_lock); > + } > + } > + > + while (!found_previous_req && (len = iov_iter_count(to)) > 0) { > struct ceph_osd_request *req; > - struct page **pages; > - int num_pages; > size_t page_off; > u64 i_size; > bool more; > > req = ceph_osdc_new_request(osdc, &ci->i_layout, > - ci->i_vino, off, &len, 0, 1, > - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, > - NULL, ci->i_truncate_seq, > - ci->i_truncate_size, false); > + ci->i_vino, off, &len, 0, 1, > + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, > + NULL, ci->i_truncate_seq, > + ci->i_truncate_size, false); > if (IS_ERR(req)) { > ret = PTR_ERR(req); > break; > } > - > more = len < iov_iter_count(to); > > if (unlikely(iov_iter_is_pipe(to))) { > @@ -630,34 +744,51 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > more = false; > } > } else { > + int last_num_pages = num_pages; > num_pages = calc_pages_for(off, len); > page_off = off & ~PAGE_MASK; > - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); > - if (IS_ERR(pages)) { > - ceph_osdc_put_request(req); > - ret = PTR_ERR(pages); > - break; > + > + if (first_round) { > + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); > + if (IS_ERR(pages)) { > + ceph_osdc_put_request(req); > + ret = PTR_ERR(pages); > + dout("ceph_sync_read: aggregated read op got err, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + ag_op->result = ret; > + complete_all(&ag_op->comp); > + mutex_lock(&ci->aggregated_ops_lock); > + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + mutex_unlock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); > + break; > + } > + ag_op->pages = pages; > + ag_op->num_pages = num_pages; > + } else { > + pages += last_num_pages; > + if (page_off) > + pages--; > } > } > - > osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, > - false, false); > + false, false); > ret = ceph_osdc_start_request(osdc, req, false); > if (!ret) > ret = ceph_osdc_wait_request(osdc, req); > ceph_osdc_put_request(req); > - > + > i_size = i_size_read(inode); > dout("sync_read %llu~%llu got %zd i_size %llu%s\n", > - off, len, ret, i_size, (more ? " MORE" : "")); > - > + off, len, ret, i_size, (more ? " MORE" : "")); > + > if (ret == -ENOENT) > ret = 0; > if (ret >= 0 && ret < len && (off + ret < i_size)) { > int zlen = min(len - ret, i_size - off - ret); > int zoff = page_off + ret; > dout("sync_read zero gap %llu~%llu\n", > - off + ret, off + ret + zlen); > + off + ret, off + ret + zlen); > ceph_zero_page_vector_range(zoff, zlen, pages); > ret += zlen; > } > @@ -686,12 +817,42 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > break; > } > } > - ceph_release_page_vector(pages, num_pages); > } > > - if (ret <= 0 || off >= i_size || !more) > + if (ret <= 0 || off >= i_size || !more) { > + dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + ag_op->result = ret; > + complete_all(&ag_op->comp); > + mutex_lock(&ci->aggregated_ops_lock); > + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + mutex_unlock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > break; > + } > + first_round = false; > + } > + > + if (found_previous_req) { > + int idx = (off >> PAGE_SHIFT) - (ag_op->pos_node.start >> PAGE_SHIFT); > + size_t left = ret > 0 ? ret : 0, page_off; > + while (left > 0) { > + size_t len, copied; > + page_off = off & ~PAGE_MASK; > + len = min_t(size_t, left, PAGE_SIZE - page_off); > + dout("%s: copy pages after waiting, idx: %d, off: %lld, len: %ld," > + " ag_op: %p, ag_op->pages: %p, ag_op->num_pages: %d\n", > + __func__, idx, off, len, ag_op, ag_op->pages, ag_op->num_pages); > + copied = copy_page_to_iter(ag_op->pages[idx++], > + page_off, len, to); > + off += copied; > + left -= copied; > + if (copied < len) { > + ret = -EFAULT; > + break; > + } > + } > } > + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); > > if (off > iocb->ki_pos) { > if (ret >= 0 && > diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c > index 8705f0645a24..1dfc0afa1dde 100644 > --- a/fs/ceph/inode.c > +++ b/fs/ceph/inode.c > @@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) > spin_lock_init(&ci->i_ceph_lock); > mutex_init(&ci->getattrs_inflight_lock); > mutex_init(&ci->lookups_inflight_lock); > + mutex_init(&ci->aggregated_ops_lock); > > ci->i_version = 0; > ci->i_inline_version = 0; > @@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) > ci->i_caps = RB_ROOT; > ci->getattrs_inflight = RB_ROOT; > ci->lookups_inflight = RB_ROOT; > + ci->aggregated_read_ops = RB_ROOT_CACHED; > + ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED; > ci->i_auth_cap = NULL; > ci->i_dirty_caps = 0; > ci->i_flushing_caps = 0; > diff --git a/fs/ceph/super.h b/fs/ceph/super.h > index abf761f2a122..a7ed8deaf836 100644 > --- a/fs/ceph/super.h > +++ b/fs/ceph/super.h > @@ -16,6 +16,7 @@ > #include <linux/slab.h> > #include <linux/posix_acl.h> > #include <linux/refcount.h> > +#include <linux/interval_tree.h> > > #include <linux/ceph/libceph.h> > > @@ -289,6 +290,31 @@ struct ceph_inode_xattrs_info { > u64 version, index_version; > }; > > +struct ceph_aggregated_read_op { > + struct kref kref; > + struct page** pages; > + int num_pages; > + unsigned long timeout; > + int result; > + struct interval_tree_node pos_node, neg_node; > + struct completion comp; > +}; > + > +extern void ceph_put_aggregated_read_op(struct kref* kref); > + > +extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, > + unsigned long start, unsigned long end, > + bool* repeated_low_endpoint, > + struct ceph_aggregated_read_op** ag_op); > + > +extern void register_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment); > + > +extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment); > + > /* > * Ceph inode. > */ > @@ -296,8 +322,9 @@ struct ceph_inode_info { > struct ceph_vino i_vino; /* ceph ino + snap */ > > spinlock_t i_ceph_lock; > - struct mutex getattrs_inflight_lock, lookups_inflight_lock; > + struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock; > struct rb_root getattrs_inflight, lookups_inflight; > + struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment; > > u64 i_version; > u64 i_inline_version; > -- > 2.19.1 >