From: Xuehan Xu <xuxuehan@xxxxxx> As for now, concurrent threads may issue concurrent file read reqs to MDSes, ignoring the fact that the requested file ranges of some reqs may be included by previous issued reqs. This commit make those reqs wait for the previous ones to finish, saving the overhead of issuing them. Signed-off-by: Xuehan Xu <xuxuehan@xxxxxx> --- fs/ceph/file.c | 207 ++++++++++++++++++++++++++++++++++++++++++------ fs/ceph/inode.c | 3 + fs/ceph/super.h | 29 ++++++- 3 files changed, 215 insertions(+), 24 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 189df668b6a0..718ee163dce1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -557,6 +557,92 @@ enum { READ_INLINE = 3, }; +int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op) +{ + long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout)); + if (timeleft > 0) + return op->result; + else + return timeleft ? timeleft : -ETIMEDOUT; +} + +bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, + unsigned long start, unsigned long end, + bool* repeated_low_endpoint, + struct ceph_aggregated_read_op** ag_op) +{ + struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end); + bool positive_found = false, negative_found = false; + while (node_p) { + if (node_p->start == start) + *repeated_low_endpoint = true; + if (node_p->start <= start && + node_p->last >= end) { + positive_found = true; + break; + } + + node_p = interval_tree_iter_next(node_p, start, end); + } + + dout("searched positive tree: found: %d\n", positive_found); + + if (!positive_found) { + node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment, + ULONG_MAX - end, + ULONG_MAX - start); + while (node_p) { + if (node_p->start <= ULONG_MAX - end && + node_p->last >= ULONG_MAX - start) { + negative_found = true; + break; + } + node_p = interval_tree_iter_next(node_p, + ULONG_MAX - end, + ULONG_MAX - start); + } + } + + dout("searched negative tree: found: %d\n", negative_found); + + if (positive_found) + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node); + else if (negative_found) + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node); + + return positive_found || negative_found; +} + +void register_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment) +{ + if (suppliment) { + interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); + } else + interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops); +} + +void unregister_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment) +{ + if (suppliment) + interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); + else + interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops); +} + +void ceph_put_aggregated_read_op(struct kref* kref) +{ + struct ceph_aggregated_read_op* ag_op = container_of(kref, + struct ceph_aggregated_read_op, + kref); + if (ag_op->num_pages) + ceph_release_page_vector(ag_op->pages, ag_op->num_pages); + kfree(ag_op); +} + /* * Completely synchronous read and write methods. Direct from __user * buffer to osd, or directly to user pages (if O_DIRECT). @@ -575,9 +661,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_osd_client *osdc = &fsc->client->osdc; - ssize_t ret; - u64 off = iocb->ki_pos; u64 len = iov_iter_count(to); + struct page **pages; + u64 off = iocb->ki_pos; + int num_pages; + ssize_t ret; + bool found_previous_req = false; + bool repeated_low_endpoint = false; + bool first_round = true; + struct ceph_aggregated_read_op *ag_op = NULL; dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -595,24 +687,46 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, return ret; ret = 0; - while ((len = iov_iter_count(to)) > 0) { + if (likely(!iov_iter_is_pipe(to)) && (off + len) < (ULONG_MAX / 2)) { + mutex_lock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %lld.\n", off, len); + found_previous_req = find_previous_aggregated_read_op(ci, off, + off + len, &repeated_low_endpoint, &ag_op); + if (found_previous_req) { + dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %lld.\n", off, len); + kref_get(&ag_op->kref); + mutex_unlock(&ci->aggregated_ops_lock); + ret = ceph_wait_for_aggregated_read_op(ag_op); + dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %lld.\n", off, len); + } else { + ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL); + kref_init(&ag_op->kref); + ag_op->pos_node.start = off; + ag_op->pos_node.last = off + len; + ag_op->neg_node.start = ULONG_MAX - off - len; + ag_op->neg_node.last = ULONG_MAX - off; + init_completion(&ag_op->comp); + register_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %lld.\n", off, len); + mutex_unlock(&ci->aggregated_ops_lock); + } + } + + while (!found_previous_req && (len = iov_iter_count(to)) > 0) { struct ceph_osd_request *req; - struct page **pages; - int num_pages; size_t page_off; u64 i_size; bool more; req = ceph_osdc_new_request(osdc, &ci->i_layout, - ci->i_vino, off, &len, 0, 1, - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, - NULL, ci->i_truncate_seq, - ci->i_truncate_size, false); + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); if (IS_ERR(req)) { ret = PTR_ERR(req); break; } - more = len < iov_iter_count(to); if (unlikely(iov_iter_is_pipe(to))) { @@ -630,34 +744,51 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, more = false; } } else { + int last_num_pages = num_pages; num_pages = calc_pages_for(off, len); page_off = off & ~PAGE_MASK; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) { - ceph_osdc_put_request(req); - ret = PTR_ERR(pages); - break; + + if (first_round) { + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + dout("ceph_sync_read: aggregated read op got err, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + ag_op->result = ret; + complete_all(&ag_op->comp); + mutex_lock(&ci->aggregated_ops_lock); + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + mutex_unlock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); + break; + } + ag_op->pages = pages; + ag_op->num_pages = num_pages; + } else { + pages += last_num_pages; + if (page_off) + pages--; } } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, - false, false); + false, false); ret = ceph_osdc_start_request(osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(osdc, req); ceph_osdc_put_request(req); - + i_size = i_size_read(inode); dout("sync_read %llu~%llu got %zd i_size %llu%s\n", - off, len, ret, i_size, (more ? " MORE" : "")); - + off, len, ret, i_size, (more ? " MORE" : "")); + if (ret == -ENOENT) ret = 0; if (ret >= 0 && ret < len && (off + ret < i_size)) { int zlen = min(len - ret, i_size - off - ret); int zoff = page_off + ret; dout("sync_read zero gap %llu~%llu\n", - off + ret, off + ret + zlen); + off + ret, off + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); ret += zlen; } @@ -686,12 +817,42 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, break; } } - ceph_release_page_vector(pages, num_pages); } - if (ret <= 0 || off >= i_size || !more) + if (ret <= 0 || off >= i_size || !more) { + dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + ag_op->result = ret; + complete_all(&ag_op->comp); + mutex_lock(&ci->aggregated_ops_lock); + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + mutex_unlock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); break; + } + first_round = false; + } + + if (found_previous_req) { + int idx = (off >> PAGE_SHIFT) - (ag_op->pos_node.start >> PAGE_SHIFT); + size_t left = ret > 0 ? ret : 0, page_off; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + dout("%s: copy pages after waiting, idx: %d, off: %lld, len: %ld," + " ag_op: %p, ag_op->pages: %p, ag_op->num_pages: %d\n", + __func__, idx, off, len, ag_op, ag_op->pages, ag_op->num_pages); + copied = copy_page_to_iter(ag_op->pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; + break; + } + } } + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); if (off > iocb->ki_pos) { if (ret >= 0 && diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 8705f0645a24..1dfc0afa1dde 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) spin_lock_init(&ci->i_ceph_lock); mutex_init(&ci->getattrs_inflight_lock); mutex_init(&ci->lookups_inflight_lock); + mutex_init(&ci->aggregated_ops_lock); ci->i_version = 0; ci->i_inline_version = 0; @@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_caps = RB_ROOT; ci->getattrs_inflight = RB_ROOT; ci->lookups_inflight = RB_ROOT; + ci->aggregated_read_ops = RB_ROOT_CACHED; + ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED; ci->i_auth_cap = NULL; ci->i_dirty_caps = 0; ci->i_flushing_caps = 0; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index abf761f2a122..a7ed8deaf836 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -16,6 +16,7 @@ #include <linux/slab.h> #include <linux/posix_acl.h> #include <linux/refcount.h> +#include <linux/interval_tree.h> #include <linux/ceph/libceph.h> @@ -289,6 +290,31 @@ struct ceph_inode_xattrs_info { u64 version, index_version; }; +struct ceph_aggregated_read_op { + struct kref kref; + struct page** pages; + int num_pages; + unsigned long timeout; + int result; + struct interval_tree_node pos_node, neg_node; + struct completion comp; +}; + +extern void ceph_put_aggregated_read_op(struct kref* kref); + +extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, + unsigned long start, unsigned long end, + bool* repeated_low_endpoint, + struct ceph_aggregated_read_op** ag_op); + +extern void register_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment); + +extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment); + /* * Ceph inode. */ @@ -296,8 +322,9 @@ struct ceph_inode_info { struct ceph_vino i_vino; /* ceph ino + snap */ spinlock_t i_ceph_lock; - struct mutex getattrs_inflight_lock, lookups_inflight_lock; + struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock; struct rb_root getattrs_inflight, lookups_inflight; + struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment; u64 i_version; u64 i_inline_version; -- 2.19.1