On Tue, Aug 21, 2018 at 12:24 AM Luis Henriques <lhenriques@xxxxxxxx> wrote: > > This commit implements support for the copy_file_range syscall in cephfs. > It is implemented using the RADOS 'copy-from' operation, which allows to > do a remote object copy, without the need to download/upload data from/to > the OSDs. > > Some manual copy may however be required if the source/destination file > offsets aren't object aligned or if the copy lenght is smaller than the > object size. > > Signed-off-by: Luis Henriques <lhenriques@xxxxxxxx> > --- > fs/ceph/file.c | 182 +++++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 182 insertions(+) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index ad0bed99b1d5..a1ef060b5a0a 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -1,5 +1,6 @@ > // SPDX-License-Identifier: GPL-2.0 > #include <linux/ceph/ceph_debug.h> > +#include <linux/ceph/striper.h> > > #include <linux/module.h> > #include <linux/sched.h> > @@ -1820,6 +1821,186 @@ static long ceph_fallocate(struct file *file, int mode, > return ret; > } > > +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, > + struct file *dst_file, loff_t dst_off, > + size_t len, unsigned int flags) > +{ > + struct inode *src_inode = file_inode(src_file); > + struct inode *dst_inode = file_inode(dst_file); > + struct ceph_inode_info *src_ci = ceph_inode(src_inode); > + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); > + struct ceph_osd_client *osdc = > + &ceph_inode_to_client(src_inode)->client->osdc; > + struct ceph_cap_flush *prealloc_cf; > + struct ceph_object_locator src_oloc, dst_oloc; > + loff_t endoff = 0; > + loff_t size; > + ssize_t ret = -EIO; > + int got = 0; > + > + if (src_inode == dst_inode) > + return -EINVAL; > + if (ceph_snap(dst_inode) != CEPH_NOSNAP) > + return -EROFS; > + > + prealloc_cf = ceph_alloc_cap_flush(); > + if (!prealloc_cf) > + return -ENOMEM; > + > + /* Start by sync'ing the source file */ > + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); > + if (ret < 0) > + return ret; > + > + size = i_size_read(src_inode); > + /* Don't copy beyond source file EOF */ > + if (src_off + len > size) > + len = (size - src_off); > + if (!len) { > + ret = 0; > + goto out; > + } > + size = i_size_read(dst_inode); > + endoff = dst_off + len; > + ret = inode_newsize_ok(dst_inode, endoff); > + if (ret) > + goto out; > + > + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) { > + ret = -EDQUOT; > + goto out; > + } > + > + ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, > + CEPH_CAP_FILE_BUFFER, > + endoff, &got, NULL); > + if (ret < 0) > + goto out; > + Getting FILE_RD caps for src inode? I think adding another ceph_get_caps() call here may cause deadlock (shouldn't hold any cap while waiting). > + /* Drop dst file cached pages */ > + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, > + dst_off >> PAGE_SHIFT, > + endoff >> PAGE_SHIFT); > + if (ret < 0) { > + printk("Failed to invalidate inode pages (%ld)\n", ret); > + ret = 0; /* XXX */ > + } > + src_oloc.pool = src_ci->i_layout.pool_id; > + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); > + dst_oloc.pool = dst_ci->i_layout.pool_id; > + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); > + /* > + * TODO: should file_start_write/file_end_write be used for the whole > + * loop? Or any other locking? > + */ > + while (len > 0) { > + struct ceph_object_id src_oid, dst_oid; > + u64 objnum, objoff; > + u32 objlen; > + size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len); > + int err = 0; > + > + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, > + copy_len, &objnum, &objoff, > + &objlen); > + ceph_oid_init(&src_oid); > + ceph_oid_printf(&src_oid, "%llx.%08llx", > + src_ci->i_vino.ino, objnum); > + > + /* Do manual copy if: > + * - source file offset isn't object aligned, or > + * - copy length is smaller than object size > + */ > + if (objoff || (copy_len < src_ci->i_layout.object_size)) { > + /* Do not copy beyond this object */ > + if (copy_len > objlen) > + copy_len = objlen; > + err = do_splice_direct(src_file, &src_off, dst_file, > + &dst_off, copy_len, flags); > + if (err < 0) { > + ret = err; > + goto out_caps; > + } > + len -= copy_len; > + ret += copy_len; > + continue; > + } > + > + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, > + copy_len, &objnum, &objoff, > + &objlen); > + ceph_oid_init(&dst_oid); > + ceph_oid_printf(&dst_oid, "%llx.%08llx", > + dst_ci->i_vino.ino, objnum); > + /* Again... do a manual copy if: > + * - destination file offset isn't object aligned, or > + * - copy length is smaller than object size > + * (although the object size should be the same for different > + * files in the same filesystem...) > + */ > + if (objoff || (copy_len < dst_ci->i_layout.object_size)) { > + if (copy_len > objlen) > + copy_len = objlen; > + err = do_splice_direct(src_file, &src_off, dst_file, > + &dst_off, copy_len, flags); > + if (err < 0) { > + ret = err; > + goto out_caps; > + } > + len -= copy_len; > + ceph_oid_destroy(&src_oid); > + ret += copy_len; > + continue; > + } > + /* Finally... do an object remote copy */ > + err = ceph_osdc_copy_from(osdc, src_ci->i_vino, > + 0, /* XXX src_ci->i_version ? */ > + &src_oid, &src_oloc, > + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED, > + dst_ci->i_vino, &dst_oid, > + &dst_oloc, > + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); > + if (err) { > + printk("copy_from returned an error: %d\n", err); /* XXX */ > + ret = err; > + goto out_caps; > + } > + len -= copy_len; > + src_off += copy_len; > + dst_off += copy_len; > + ret += copy_len; > + ceph_oid_destroy(&src_oid); > + ceph_oid_destroy(&dst_oid); > + } > + /* Let the MDS know about destination object size change */ > + if (endoff > size) { > + int dirty; > + int caps_flags = CHECK_CAPS_AUTHONLY; > + > + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) > + caps_flags |= CHECK_CAPS_NODELAY; > + if (ceph_inode_set_size(dst_inode, endoff)) > + caps_flags |= CHECK_CAPS_AUTHONLY; > + if (caps_flags) > + ceph_check_caps(dst_ci, caps_flags, NULL); > + spin_lock(&dst_ci->i_ceph_lock); > + dst_ci->i_inline_version = CEPH_INLINE_NONE; > + dirty = __ceph_mark_dirty_caps( > + dst_ci, > + CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER, > + &prealloc_cf); > + spin_unlock(&dst_ci->i_ceph_lock); > + if (dirty) > + __mark_inode_dirty(dst_inode, dirty); > + } > +out_caps: > + ceph_put_cap_refs(dst_ci, got); > +out: > + ceph_free_cap_flush(prealloc_cf); > + > + return ret; > +} There is no i_size check in above function. I think some corner cases may not be handled properly > + > const struct file_operations ceph_file_fops = { > .open = ceph_open, > .release = ceph_release, > @@ -1835,5 +2016,6 @@ const struct file_operations ceph_file_fops = { > .unlocked_ioctl = ceph_ioctl, > .compat_ioctl = ceph_ioctl, > .fallocate = ceph_fallocate, > + .copy_file_range = ceph_copy_file_range, > }; >