Re: [RFC PATCH 2/2] ceph: support copy_file_range file operation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Aug 21, 2018 at 12:24 AM Luis Henriques <lhenriques@xxxxxxxx> wrote:
>
> This commit implements support for the copy_file_range syscall in cephfs.
> It is implemented using the RADOS 'copy-from' operation, which allows to
> do a remote object copy, without the need to download/upload data from/to
> the OSDs.
>
> Some manual copy may however be required if the source/destination file
> offsets aren't object aligned or if the copy lenght is smaller than the
> object size.
>
> Signed-off-by: Luis Henriques <lhenriques@xxxxxxxx>
> ---
>  fs/ceph/file.c | 182 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 182 insertions(+)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index ad0bed99b1d5..a1ef060b5a0a 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1,5 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0
>  #include <linux/ceph/ceph_debug.h>
> +#include <linux/ceph/striper.h>
>
>  #include <linux/module.h>
>  #include <linux/sched.h>
> @@ -1820,6 +1821,186 @@ static long ceph_fallocate(struct file *file, int mode,
>         return ret;
>  }
>
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +                                   struct file *dst_file, loff_t dst_off,
> +                                   size_t len, unsigned int flags)
> +{
> +       struct inode *src_inode = file_inode(src_file);
> +       struct inode *dst_inode = file_inode(dst_file);
> +       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +       struct ceph_osd_client *osdc =
> +               &ceph_inode_to_client(src_inode)->client->osdc;
> +       struct ceph_cap_flush *prealloc_cf;
> +       struct ceph_object_locator src_oloc, dst_oloc;
> +       loff_t endoff = 0;
> +       loff_t size;
> +       ssize_t ret = -EIO;
> +       int got = 0;
> +
> +       if (src_inode == dst_inode)
> +               return -EINVAL;
> +       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +               return -EROFS;
> +
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf)
> +               return -ENOMEM;
> +
> +       /* Start by sync'ing the source file */
> +       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +       if (ret < 0)
> +               return ret;
> +
> +       size = i_size_read(src_inode);
> +       /* Don't copy beyond source file EOF */
> +       if (src_off + len > size)
> +               len = (size - src_off);
> +       if (!len) {
> +               ret = 0;
> +               goto out;
> +       }
> +       size = i_size_read(dst_inode);
> +       endoff = dst_off + len;
> +       ret = inode_newsize_ok(dst_inode, endoff);
> +       if (ret)
> +               goto out;
> +
> +       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
> +               ret = -EDQUOT;
> +               goto out;
> +       }
> +
> +       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR,
> +                           CEPH_CAP_FILE_BUFFER,
> +                           endoff, &got, NULL);
> +       if (ret < 0)
> +               goto out;
> +

Getting FILE_RD caps for src inode? I think adding another
ceph_get_caps() call here may cause deadlock (shouldn't hold any cap
while waiting).

> +       /* Drop dst file cached pages */
> +       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +                                           dst_off >> PAGE_SHIFT,
> +                                           endoff >> PAGE_SHIFT);
> +       if (ret < 0) {
> +               printk("Failed to invalidate inode pages (%ld)\n", ret);
> +               ret = 0; /* XXX */
> +       }
> +       src_oloc.pool = src_ci->i_layout.pool_id;
> +       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
> +       dst_oloc.pool = dst_ci->i_layout.pool_id;
> +       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
> +       /*
> +        * TODO: should file_start_write/file_end_write be used for the whole
> +        * loop?  Or any other locking?
> +        */
> +       while (len > 0) {
> +               struct ceph_object_id src_oid, dst_oid;
> +               u64 objnum, objoff;
> +               u32 objlen;
> +               size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
> +               int err = 0;
> +
> +               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&src_oid);
> +               ceph_oid_printf(&src_oid, "%llx.%08llx",
> +                               src_ci->i_vino.ino, objnum);
> +
> +               /* Do manual copy if:
> +                *  - source file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                */
> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
> +                       /* Do not copy beyond this object */
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                              &dst_off, copy_len, flags);
> +                       if (err < 0) {
> +                               ret = err;
> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ret += copy_len;
> +                       continue;
> +               }
> +
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&dst_oid);
> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> +                               dst_ci->i_vino.ino, objnum);
> +               /* Again... do a manual copy if:
> +                *  - destination file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                *    (although the object size should be the same for different
> +                *     files in the same filesystem...)
> +                */
> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                             &dst_off, copy_len, flags);
> +                       if (err < 0) {
> +                               ret = err;
> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ceph_oid_destroy(&src_oid);
> +                       ret += copy_len;
> +                       continue;
> +               }
> +               /* Finally... do an object remote copy */
> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino,
> +                                         0, /* XXX src_ci->i_version ? */
> +                                         &src_oid, &src_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
> +                                         dst_ci->i_vino, &dst_oid,
> +                                         &dst_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
> +               if (err) {
> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
> +                       ret = err;
> +                       goto out_caps;
> +               }
> +               len -= copy_len;
> +               src_off += copy_len;
> +               dst_off += copy_len;
> +               ret += copy_len;
> +               ceph_oid_destroy(&src_oid);
> +               ceph_oid_destroy(&dst_oid);
> +       }
> +       /* Let the MDS know about destination object size change */
> +       if (endoff > size) {
> +               int dirty;
> +               int caps_flags = CHECK_CAPS_AUTHONLY;
> +
> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_NODELAY;
> +               if (ceph_inode_set_size(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> +               if (caps_flags)
> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> +               spin_lock(&dst_ci->i_ceph_lock);
> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
> +               dirty = __ceph_mark_dirty_caps(
> +                       dst_ci,
> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
> +                       &prealloc_cf);
> +               spin_unlock(&dst_ci->i_ceph_lock);
> +               if (dirty)
> +                       __mark_inode_dirty(dst_inode, dirty);
> +       }
> +out_caps:
> +       ceph_put_cap_refs(dst_ci, got);
> +out:
> +       ceph_free_cap_flush(prealloc_cf);
> +
> +       return ret;
> +}

There is no i_size check in above function. I think some corner cases
may not be handled properly

> +
>  const struct file_operations ceph_file_fops = {
>         .open = ceph_open,
>         .release = ceph_release,
> @@ -1835,5 +2016,6 @@ const struct file_operations ceph_file_fops = {
>         .unlocked_ioctl = ceph_ioctl,
>         .compat_ioctl   = ceph_ioctl,
>         .fallocate      = ceph_fallocate,
> +       .copy_file_range = ceph_copy_file_range,
>  };
>



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux