[RFC PATCH 2/2] ceph: support copy_file_range file operation

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy lenght is smaller than the
object size.

Signed-off-by: Luis Henriques <lhenriques@xxxxxxxx>
---
 fs/ceph/file.c | 182 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 182 insertions(+)

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ad0bed99b1d5..a1ef060b5a0a 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -1820,6 +1821,186 @@ static long ceph_fallocate(struct file *file, int mode,
 	return ret;
 }
 
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_osd_client *osdc =
+		&ceph_inode_to_client(src_inode)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	loff_t endoff = 0;
+	loff_t size;
+	ssize_t ret = -EIO;
+	int got = 0;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		return ret;
+
+	size = i_size_read(src_inode);
+	/* Don't copy beyond source file EOF */
+	if (src_off + len > size)
+		len = (size - src_off);
+	if (!len) {
+		ret = 0;
+		goto out;
+	}
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+	ret = inode_newsize_ok(dst_inode, endoff);
+	if (ret)
+		goto out;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
+		ret = -EDQUOT;
+		goto out;
+	}
+
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR,
+			    CEPH_CAP_FILE_BUFFER,
+			    endoff, &got, NULL);
+	if (ret < 0)
+		goto out;
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		printk("Failed to invalidate inode pages (%ld)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+	/*
+	 * TODO: should file_start_write/file_end_write be used for the whole
+	 * loop?  Or any other locking?
+	 */
+	while (len > 0) {
+		struct ceph_object_id src_oid, dst_oid;
+		u64 objnum, objoff;
+		u32 objlen;
+		size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
+		int err = 0;
+
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, objnum);
+
+		/* Do manual copy if:
+		 *  - source file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 */
+		if (objoff || (copy_len < src_ci->i_layout.object_size)) {
+			/* Do not copy beyond this object */
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					       &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ret += copy_len;
+			continue;
+		}
+
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, objnum);
+		/* Again... do a manual copy if:
+		 *  - destination file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 *    (although the object size should be the same for different
+		 *     files in the same filesystem...)
+		 */
+		if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					      &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ceph_oid_destroy(&src_oid);
+			ret += copy_len;
+			continue;
+		}
+		/* Finally... do an object remote copy */
+		err = ceph_osdc_copy_from(osdc, src_ci->i_vino,
+					  0, /* XXX src_ci->i_version ? */
+					  &src_oid, &src_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
+					  dst_ci->i_vino, &dst_oid,
+					  &dst_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+		if (err) {
+			printk("copy_from returned an error: %d\n", err); /* XXX */
+			ret = err;
+			goto out_caps;
+		}
+		len -= copy_len;
+		src_off += copy_len;
+		dst_off += copy_len;
+		ret += copy_len;
+		ceph_oid_destroy(&src_oid);
+		ceph_oid_destroy(&dst_oid);
+	}
+	/* Let the MDS know about destination object size change */
+	if (endoff > size) {
+		int dirty;
+		int caps_flags = CHECK_CAPS_AUTHONLY;
+
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+		spin_lock(&dst_ci->i_ceph_lock);
+		dst_ci->i_inline_version = CEPH_INLINE_NONE;
+		dirty = __ceph_mark_dirty_caps(
+			dst_ci,
+			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
+			&prealloc_cf);
+		spin_unlock(&dst_ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(dst_inode, dirty);
+	}
+out_caps:
+	ceph_put_cap_refs(dst_ci, got);
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1835,5 +2016,6 @@ const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };
 



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux