diff mbox series

[RFC,v2,3/3] ceph: support copy_file_range file operation

Message ID 20180827134523.6758-4-lhenriques@suse.com (mailing list archive)
State New, archived
Headers show
Series copy_file_range in cephfs kernel client | expand

Commit Message

Luis Henriques Aug. 27, 2018, 1:45 p.m. UTC
This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy lenght is smaller than the
object size.

Signed-off-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/file.c | 225 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 225 insertions(+)

Comments

Yan, Zheng Aug. 29, 2018, 12:01 a.m. UTC | #1
On Mon, Aug 27, 2018 at 6:46 AM Luis Henriques <lhenriques@suse.com> wrote:
>
> This commit implements support for the copy_file_range syscall in cephfs.
> It is implemented using the RADOS 'copy-from' operation, which allows to
> do a remote object copy, without the need to download/upload data from/to
> the OSDs.
>
> Some manual copy may however be required if the source/destination file
> offsets aren't object aligned or if the copy lenght is smaller than the
> object size.
>
> Signed-off-by: Luis Henriques <lhenriques@suse.com>
> ---
>  fs/ceph/file.c | 225 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 225 insertions(+)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index ad0bed99b1d5..8939ec224144 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1,5 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0
>  #include <linux/ceph/ceph_debug.h>
> +#include <linux/ceph/striper.h>
>
>  #include <linux/module.h>
>  #include <linux/sched.h>
> @@ -1820,6 +1821,229 @@ static long ceph_fallocate(struct file *file, int mode,
>         return ret;
>  }
>
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +                                   struct file *dst_file, loff_t dst_off,
> +                                   size_t len, unsigned int flags)
> +{
> +       struct inode *src_inode = file_inode(src_file);
> +       struct inode *dst_inode = file_inode(dst_file);
> +       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +       struct ceph_osd_client *osdc =
> +               &ceph_inode_to_client(src_inode)->client->osdc;
> +       struct ceph_cap_flush *prealloc_cf;
> +       struct ceph_object_locator src_oloc, dst_oloc;
> +       loff_t endoff = 0;
> +       loff_t size;
> +       ssize_t ret = -EIO;
> +       int src_got = 0;
> +       int dst_got = 0;
> +       bool retrying = false;
> +
> +       if (src_inode == dst_inode)
> +               return -EINVAL;
> +       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +               return -EROFS;
> +
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf)
> +               return -ENOMEM;
> +
> +       /* Start by sync'ing the source file */
> +       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +       if (ret < 0)
> +               goto out;
> +
> +       size = i_size_read(src_inode);
> +       /*
> +        * Don't copy beyond source file EOF.  Instead of simply setting lenght
> +        * to (size - src_off), just drop to VFS default implementation, as the
> +        * local i_size may be stale due to other clients writing to the source
> +        * inode.
> +        */
> +       if (src_off + len > size) {
> +               ret = -EOPNOTSUPP;
> +               goto out;
> +       }

we should check src inode's size after getting its caps.


> +       if (!len) {
> +               ret = 0;
> +               goto out;
> +       }
> +       size = i_size_read(dst_inode);
> +       endoff = dst_off + len;
> +       ret = inode_newsize_ok(dst_inode, endoff);
> +       if (ret)
> +               goto out;
> +
> +       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
> +               ret = -EDQUOT;
> +               goto out;
> +       }
> +
> +retry_caps:
> +       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
> +                           endoff, &dst_got, NULL);
> +       if (ret < 0)
> +               goto out;
> +       /*
> +        * We also need to get FILE_RD capabilities for source file as other
> +        * clients may have dirty data in their caches.  And OSDs know nothing
> +        * about caps, so they can't safely do the remote object copies.
> +        *
> +        * However, since we're already holding the FILE_WR capability for the
> +        * source file, we would risk a deadlock by using ceph_get_caps.  Thus,
> +        * we'll do some retry dance instead to try to get both capabilities.
> +        * If everything fails, we just return -EOPNOTSUPP and fallback to the
> +        * VFS default copy_file_range implementation.
> +        */
> +       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
> +                               false, &src_got);
> +       if (ret <= 0) {
> +               if (retrying) {
> +                       ret = -EOPNOTSUPP;
> +                       goto out_dst_caps;
> +               }
> +               /* Start by dropping dsc_ci caps and getting src_ci caps */
> +               ceph_put_cap_refs(dst_ci, dst_got);
> +               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
> +                                   CEPH_CAP_FILE_SHARED,
> +                                   (src_off + len), &src_got, NULL);
> +               if (ret < 0) {
> +                       ret = -EOPNOTSUPP;
> +                       goto out;
> +               }
> +               /*... drop them too, and retry */
> +               ceph_put_cap_refs(src_ci, src_got);
> +               retrying = true;
> +               goto retry_caps;
> +       }
> +
> +       /* Drop dst file cached pages */
> +       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +                                           dst_off >> PAGE_SHIFT,
> +                                           endoff >> PAGE_SHIFT);
> +       if (ret < 0) {
> +               printk("Failed to invalidate inode pages (%ld)\n", ret);
> +               ret = 0; /* XXX */
> +       }
> +       src_oloc.pool = src_ci->i_layout.pool_id;
> +       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
> +       dst_oloc.pool = dst_ci->i_layout.pool_id;
> +       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
> +       /*
> +        * TODO: should file_start_write/file_end_write be used for the whole
> +        * loop?  Or any other locking?
> +        */
> +       while (len > 0) {
> +               struct ceph_object_id src_oid, dst_oid;
> +               u64 objnum, objoff;
> +               u32 objlen;
> +               size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
> +               int err = 0;
> +
> +               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&src_oid);
> +               ceph_oid_printf(&src_oid, "%llx.%08llx",
> +                               src_ci->i_vino.ino, objnum);
> +
> +               /* Do manual copy if:
> +                *  - source file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                */
> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
> +                       /* Do not copy beyond this object */
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                              &dst_off, copy_len, flags);
> +                       if (err < 0) {
> +                               ret = err;
> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ret += copy_len;
> +                       continue;
> +               }
> +
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&dst_oid);
> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> +                               dst_ci->i_vino.ino, objnum);
> +               /* Again... do a manual copy if:
> +                *  - destination file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                *    (although the object size should be the same for different
> +                *     files in the same filesystem...)
> +                */
> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                             &dst_off, copy_len, flags);
> +                       if (err < 0) {
> +                               ret = err;
> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ceph_oid_destroy(&src_oid);
> +                       ret += copy_len;
> +                       continue;
> +               }
> +               /* Finally... do an object remote copy */
> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
> +                                         0, /* XXX src_ci->i_version ? */
> +                                         &src_oid, &src_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
> +                                         dst_ci->i_vino.snap, &dst_oid,
> +                                         &dst_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
> +               if (err) {
> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
> +                       ret = err;
> +                       goto out_caps;
> +               }
> +               len -= copy_len;
> +               src_off += copy_len;
> +               dst_off += copy_len;
> +               ret += copy_len;
> +               ceph_oid_destroy(&src_oid);
> +               ceph_oid_destroy(&dst_oid);
> +       }
> +       /* Let the MDS know about destination object size change */
> +       if (endoff > size) {
> +               int dirty;
> +               int caps_flags = CHECK_CAPS_AUTHONLY;
> +
> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_NODELAY;
> +               if (ceph_inode_set_size(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> +               if (caps_flags)
> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> +               spin_lock(&dst_ci->i_ceph_lock);
> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
> +               dirty = __ceph_mark_dirty_caps(
> +                       dst_ci,
> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
> +                       &prealloc_cf);
> +               spin_unlock(&dst_ci->i_ceph_lock);
> +               if (dirty)
> +                       __mark_inode_dirty(dst_inode, dirty);
> +       }
> +out_caps:
> +       ceph_put_cap_refs(src_ci, src_got);
> +out_dst_caps:
> +       ceph_put_cap_refs(dst_ci, dst_got);
> +out:
> +       ceph_free_cap_flush(prealloc_cf);
> +
> +       return ret;
> +}
> +
>  const struct file_operations ceph_file_fops = {
>         .open = ceph_open,
>         .release = ceph_release,
> @@ -1835,5 +2059,6 @@ const struct file_operations ceph_file_fops = {
>         .unlocked_ioctl = ceph_ioctl,
>         .compat_ioctl   = ceph_ioctl,
>         .fallocate      = ceph_fallocate,
> +       .copy_file_range = ceph_copy_file_range,
>  };
>
Luis Henriques Aug. 29, 2018, 1:44 p.m. UTC | #2
"Yan, Zheng" <ukernel@gmail.com> writes:

> On Mon, Aug 27, 2018 at 6:46 AM Luis Henriques <lhenriques@suse.com> wrote:
<...>
>> +       if (src_off + len > size) {
>> +               ret = -EOPNOTSUPP;
>> +               goto out;
>> +       }
>
> we should check src inode's size after getting its caps.

Doh!  Sure, that make sense.  I'll move that code.  What about the
approach to grab both caps?  Does it make sense to you, to use the
(modified) ceph_try_get_caps function and retry if it fails?

Cheers,
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ad0bed99b1d5..8939ec224144 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -1820,6 +1821,229 @@  static long ceph_fallocate(struct file *file, int mode,
 	return ret;
 }
 
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_osd_client *osdc =
+		&ceph_inode_to_client(src_inode)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	loff_t endoff = 0;
+	loff_t size;
+	ssize_t ret = -EIO;
+	int src_got = 0;
+	int dst_got = 0;
+	bool retrying = false;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		goto out;
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting lenght
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		ret = -EOPNOTSUPP;
+		goto out;
+	}
+	if (!len) {
+		ret = 0;
+		goto out;
+	}
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+	ret = inode_newsize_ok(dst_inode, endoff);
+	if (ret)
+		goto out;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
+		ret = -EDQUOT;
+		goto out;
+	}
+
+retry_caps:
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    endoff, &dst_got, NULL);
+	if (ret < 0)
+		goto out;
+	/*
+	 * We also need to get FILE_RD capabilities for source file as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 *
+	 * However, since we're already holding the FILE_WR capability for the
+	 * source file, we would risk a deadlock by using ceph_get_caps.  Thus,
+	 * we'll do some retry dance instead to try to get both capabilities.
+	 * If everything fails, we just return -EOPNOTSUPP and fallback to the
+	 * VFS default copy_file_range implementation.
+	 */
+	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, &src_got);
+	if (ret <= 0) {
+		if (retrying) {
+			ret = -EOPNOTSUPP;
+			goto out_dst_caps;
+		}
+		/* Start by dropping dsc_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(dst_ci, dst_got);
+		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED,
+				    (src_off + len), &src_got, NULL);
+		if (ret < 0) {
+			ret = -EOPNOTSUPP;
+			goto out;
+		}
+		/*... drop them too, and retry */
+		ceph_put_cap_refs(src_ci, src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		printk("Failed to invalidate inode pages (%ld)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+	/*
+	 * TODO: should file_start_write/file_end_write be used for the whole
+	 * loop?  Or any other locking?
+	 */
+	while (len > 0) {
+		struct ceph_object_id src_oid, dst_oid;
+		u64 objnum, objoff;
+		u32 objlen;
+		size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
+		int err = 0;
+
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, objnum);
+
+		/* Do manual copy if:
+		 *  - source file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 */
+		if (objoff || (copy_len < src_ci->i_layout.object_size)) {
+			/* Do not copy beyond this object */
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					       &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ret += copy_len;
+			continue;
+		}
+
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, objnum);
+		/* Again... do a manual copy if:
+		 *  - destination file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 *    (although the object size should be the same for different
+		 *     files in the same filesystem...)
+		 */
+		if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					      &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ceph_oid_destroy(&src_oid);
+			ret += copy_len;
+			continue;
+		}
+		/* Finally... do an object remote copy */
+		err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
+					  0, /* XXX src_ci->i_version ? */
+					  &src_oid, &src_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
+					  dst_ci->i_vino.snap, &dst_oid,
+					  &dst_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+		if (err) {
+			printk("copy_from returned an error: %d\n", err); /* XXX */
+			ret = err;
+			goto out_caps;
+		}
+		len -= copy_len;
+		src_off += copy_len;
+		dst_off += copy_len;
+		ret += copy_len;
+		ceph_oid_destroy(&src_oid);
+		ceph_oid_destroy(&dst_oid);
+	}
+	/* Let the MDS know about destination object size change */
+	if (endoff > size) {
+		int dirty;
+		int caps_flags = CHECK_CAPS_AUTHONLY;
+
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+		spin_lock(&dst_ci->i_ceph_lock);
+		dst_ci->i_inline_version = CEPH_INLINE_NONE;
+		dirty = __ceph_mark_dirty_caps(
+			dst_ci,
+			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
+			&prealloc_cf);
+		spin_unlock(&dst_ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(dst_inode, dirty);
+	}
+out_caps:
+	ceph_put_cap_refs(src_ci, src_got);
+out_dst_caps:
+	ceph_put_cap_refs(dst_ci, dst_got);
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1835,5 +2059,6 @@  const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };