diff mbox series

[RFC,v5,3/4] ceph: support copy_file_range file operation

Message ID 20181009104806.6821-4-lhenriques@suse.com (mailing list archive)
State New, archived
Headers show
Series copy_file_range in cephfs kernel client | expand

Commit Message

Luis Henriques Oct. 9, 2018, 10:48 a.m. UTC
This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy lenght is smaller than the
object size.

Signed-off-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/file.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 296 insertions(+), 1 deletion(-)

Comments

Yan, Zheng Oct. 10, 2018, 2:02 a.m. UTC | #1
On Tue, Oct 9, 2018 at 6:49 PM Luis Henriques <lhenriques@suse.com> wrote:
>
> This commit implements support for the copy_file_range syscall in cephfs.
> It is implemented using the RADOS 'copy-from' operation, which allows to
> do a remote object copy, without the need to download/upload data from/to
> the OSDs.
>
> Some manual copy may however be required if the source/destination file
> offsets aren't object aligned or if the copy lenght is smaller than the
> object size.
>
> Signed-off-by: Luis Henriques <lhenriques@suse.com>
> ---
>  fs/ceph/file.c | 297 ++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 296 insertions(+), 1 deletion(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 92ab20433682..ee4a390dd772 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1,5 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0
>  #include <linux/ceph/ceph_debug.h>
> +#include <linux/ceph/striper.h>
>
>  #include <linux/module.h>
>  #include <linux/sched.h>
> @@ -1829,6 +1830,300 @@ static long ceph_fallocate(struct file *file, int mode,
>         return ret;
>  }
>
> +/*
> + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
> + * src_ci.  Two attempts are made to obtain both caps, and an error is return if
> + * this fails; zero is returned on success.
> + */
> +static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
> +                         loff_t src_endoff, int *src_got,
> +                         struct ceph_inode_info *dst_ci,
> +                         loff_t dst_endoff, int *dst_got)
> +{
> +       int ret = 0;
> +       bool retrying = false;
> +
> +retry_caps:
> +       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
> +                           dst_endoff, dst_got, NULL);
> +       if (ret < 0)
> +               return ret;
> +
> +       /*
> +        * Since we're already holding the FILE_WR capability for the dst file,
> +        * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
> +        * retry dance instead to try to get both capabilities.
> +        */
> +       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
> +                               false, src_got);
> +       if (ret <= 0) {
> +               /* Start by dropping dst_ci caps and getting src_ci caps */
> +               ceph_put_cap_refs(dst_ci, *dst_got);
> +               if (retrying) {
> +                       if (!ret)
> +                               /* ceph_try_get_caps masks EAGAIN */
> +                               ret = -EAGAIN;
> +                       return ret;
> +               }
> +               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
> +                                   CEPH_CAP_FILE_SHARED, src_endoff,
> +                                   src_got, NULL);
> +               if (ret < 0)
> +                       return ret;
> +               /*... drop src_ci caps too, and retry */
> +               ceph_put_cap_refs(src_ci, *src_got);
> +               retrying = true;
> +               goto retry_caps;
> +       }
> +       return ret;
> +}
> +
> +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
> +                          struct ceph_inode_info *dst_ci, int dst_got)
> +{
> +       ceph_put_cap_refs(src_ci, src_got);
> +       ceph_put_cap_refs(dst_ci, dst_got);
> +}
> +
> +/*
> + * This function does several size-related checks, returning an error if:
> + *  - source file is smaller than off+len
> + *  - destination file size is OK (inode_newsize_ok())
> + *  - quotas won't be exceeded
> + */
> +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
> +                          loff_t src_off, loff_t dst_off, size_t len)
> +{
> +       loff_t size, endoff;
> +
> +       size = i_size_read(src_inode);
> +       /*
> +        * Don't copy beyond source file EOF.  Instead of simply setting lenght
> +        * to (size - src_off), just drop to VFS default implementation, as the
> +        * local i_size may be stale due to other clients writing to the source
> +        * inode.
> +        */
> +       if (src_off + len > size) {
> +               dout("Copy beyond EOF (%llu + %ld > %llu)\n",
> +                    src_off, len, size);
> +               return -EOPNOTSUPP;
> +       }
> +       size = i_size_read(dst_inode);
> +
> +       endoff = dst_off + len;
> +       if(inode_newsize_ok(dst_inode, endoff))
> +               return -EOPNOTSUPP;
> +
> +       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
> +               return -EDQUOT;
> +
> +       return 0;
> +}
> +
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +                                   struct file *dst_file, loff_t dst_off,
> +                                   size_t len, unsigned int flags)
> +{
> +       struct inode *src_inode = file_inode(src_file);
> +       struct inode *dst_inode = file_inode(dst_file);
> +       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +       struct ceph_cap_flush *prealloc_cf;
> +       struct ceph_object_locator src_oloc, dst_oloc;
> +       struct ceph_object_id src_oid, dst_oid;
> +       loff_t endoff = 0, size;
> +       ssize_t ret = -EIO;
> +       u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
> +       u32 src_objlen, dst_objlen, object_size;
> +       int src_got = 0, dst_got = 0, err, dirty;
> +
> +       if (src_inode == dst_inode)
> +               return -EINVAL;
> +       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +               return -EROFS;
> +
> +       /*
> +        * Some of the checks below will return -EOPNOTSUPP, which will force a
> +        * fallback to the default VFS copy_file_range implementation.  This is
> +        * desirable in several cases (for ex, the 'len' is smaller than the
> +        * size of the objects, or in cases where that would be more
> +        * efficient).
> +        */
> +
> +       if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
> +           (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
> +           (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
> +               return -EOPNOTSUPP;
> +
> +       if (len < src_ci->i_layout.object_size)
> +               return -EOPNOTSUPP; /* no remote copy will be done */
> +
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf)
> +               return -ENOMEM;
> +
> +       /* Start by sync'ing the source file */
> +       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +       if (ret < 0)
> +               goto out;
> +
> +       /*
> +        * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
> +        * clients may have dirty data in their caches.  And OSDs know nothing
> +        * about caps, so they can't safely do the remote object copies.
> +        */
> +       err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
> +                            dst_ci, (dst_off + len), &dst_got);
> +       if (err < 0) {
> +               dout("get_rd_wr_caps returned %d\n", err);
> +               ret = -EOPNOTSUPP;
> +               goto out;
> +       }
> +
> +       ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
> +       if (ret < 0)
> +               goto out_caps;
> +
> +       size = i_size_read(dst_inode);
> +       endoff = dst_off + len;
> +
> +       /* Drop dst file cached pages */
> +       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +                                           dst_off >> PAGE_SHIFT,
> +                                           endoff >> PAGE_SHIFT);
> +       if (ret < 0) {
> +               dout("Failed to invalidate inode pages (%ld)\n", ret);
> +               ret = 0; /* XXX */
> +       }
> +       src_oloc.pool = src_ci->i_layout.pool_id;
> +       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
> +       dst_oloc.pool = dst_ci->i_layout.pool_id;
> +       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
> +
> +       ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                     src_ci->i_layout.object_size,
> +                                     &src_objnum, &src_objoff, &src_objlen);
> +       ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                     dst_ci->i_layout.object_size,
> +                                     &dst_objnum, &dst_objoff, &dst_objlen);
> +       /* object-level offsets need to the same */
> +       if (src_objoff != dst_objoff) {
> +               ret = -EOPNOTSUPP;
> +               goto out_caps;
> +       }
> +
> +       /*
> +        * Do a manual copy if the object offset isn't object aligned.
> +        * 'src_objlen' contains the bytes left until the end of the object,
> +        * starting at the src_off
> +        */
> +       if (src_objoff) {
> +               /*
> +                * we need to temporarily drop all caps as we'll be calling
> +                * {read,write}_iter, which will get caps again.
> +                */
> +               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
> +               ret = do_splice_direct(src_file, &src_off, dst_file,
> +                                      &dst_off, src_objlen, flags);
> +               if (ret < 0) {
> +                       dout("do_splice_direct returned %d\n", err);
> +                       goto out;
> +               }
> +               len -= ret;
> +               err = get_rd_wr_caps(src_ci, (src_off + len),
> +                                    &src_got, dst_ci,
> +                                    (dst_off + len), &dst_got);
> +               if (err < 0)
> +                       goto out;
> +               err = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
> +               if (err < 0)
> +                       goto out_caps;
> +       }
> +       object_size = src_ci->i_layout.object_size;
> +       while (len >= object_size) {
> +               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                             object_size, &src_objnum,
> +                                             &src_objoff, &src_objlen);
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             object_size, &dst_objnum,
> +                                             &dst_objoff, &dst_objlen);
> +               ceph_oid_init(&src_oid);
> +               ceph_oid_printf(&src_oid, "%llx.%08llx",
> +                               src_ci->i_vino.ino, src_objnum);
> +               ceph_oid_init(&dst_oid);
> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> +                               dst_ci->i_vino.ino, dst_objnum);
> +               /* Do an object remote copy */
> +               err = ceph_osdc_copy_from(
> +                       &ceph_inode_to_client(src_inode)->client->osdc,
> +                       src_ci->i_vino.snap, 0,
> +                       &src_oid, &src_oloc,
> +                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
> +                       CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
> +                       dst_ci->i_vino.snap, &dst_oid, &dst_oloc,
> +                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
> +                       CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
> +               ceph_oid_destroy(&src_oid);
> +               ceph_oid_destroy(&dst_oid);
> +               if (err) {
> +                       dout("ceph_osdc_copy_from returned %d\n", err);
> +                       goto out_caps;
> +               }
> +               len -= object_size;
> +               src_off += object_size;
> +               dst_off += object_size;
> +               ret += object_size;
> +       }
> +
> +       if (len) {
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             len, &dst_objnum, &dst_objoff,
> +                                             &dst_objlen);
> +               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
> +               err = do_splice_direct(src_file, &src_off, dst_file,
> +                                      &dst_off, len, flags);
> +               if (err < 0) {
> +                       dout("do_splice_direct returned %d\n", err);
> +                       goto out;
> +               }
> +               len -= err;
> +               ret += err;
> +               err = get_rd_wr_caps(src_ci, (src_off + len),
> +                                    &src_got, dst_ci,
> +                                    (dst_off + len), &dst_got);
> +               if (err < 0) {
> +                       goto out;
> +               }
> +       }
> +
> +       file_update_time(dst_file);
> +       if (endoff > size) {
> +               int caps_flags = 0;
> +
> +               /* Let the MDS know about dst file size change */
> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_NODELAY;
> +               if (ceph_inode_set_size(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> +               if (caps_flags)
> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> +       }
> +       /* Mark Fw dirty */
> +       spin_lock(&dst_ci->i_ceph_lock);
> +       dst_ci->i_inline_version = CEPH_INLINE_NONE;
> +       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
> +       spin_unlock(&dst_ci->i_ceph_lock);
> +       if (dirty)
> +               __mark_inode_dirty(dst_inode, dirty);


Above code (from file_update_time) should be placed before code block
of the second do_splice_direct(). We need to mark caps dirty before
droping cap reference.

Otherwise, the code looks great.

Regards
Yan, Zheng

> +
> +out_caps:
> +       put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
> +out:
> +       ceph_free_cap_flush(prealloc_cf);
> +
> +       return ret;
> +}
> +
>  const struct file_operations ceph_file_fops = {
>         .open = ceph_open,
>         .release = ceph_release,
> @@ -1844,5 +2139,5 @@ const struct file_operations ceph_file_fops = {
>         .unlocked_ioctl = ceph_ioctl,
>         .compat_ioctl   = ceph_ioctl,
>         .fallocate      = ceph_fallocate,
> +       .copy_file_range = ceph_copy_file_range,
>  };
> -
Luis Henriques Oct. 12, 2018, 9:12 a.m. UTC | #2
"Yan, Zheng" <ukernel@gmail.com> writes:

> On Tue, Oct 9, 2018 at 6:49 PM Luis Henriques <lhenriques@suse.com> wrote:
<snip>

>> +       file_update_time(dst_file);
>> +       if (endoff > size) {
>> +               int caps_flags = 0;
>> +
>> +               /* Let the MDS know about dst file size change */
>> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
>> +                       caps_flags |= CHECK_CAPS_NODELAY;
>> +               if (ceph_inode_set_size(dst_inode, endoff))
>> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
>> +               if (caps_flags)
>> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
>> +       }
>> +       /* Mark Fw dirty */
>> +       spin_lock(&dst_ci->i_ceph_lock);
>> +       dst_ci->i_inline_version = CEPH_INLINE_NONE;
>> +       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
>> +       spin_unlock(&dst_ci->i_ceph_lock);
>> +       if (dirty)
>> +               __mark_inode_dirty(dst_inode, dirty);
>
>
> Above code (from file_update_time) should be placed before code block
> of the second do_splice_direct(). We need to mark caps dirty before
> droping cap reference.

So, in that case I guess I'll also need to replace 'endoff' by 'dst_off'
in that code block.  Ok, I'll change that, thanks.

Cheers,
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 92ab20433682..ee4a390dd772 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -1829,6 +1830,300 @@  static long ceph_fallocate(struct file *file, int mode,
 	return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+			  loff_t src_endoff, int *src_got,
+			  struct ceph_inode_info *dst_ci,
+			  loff_t dst_endoff, int *dst_got)
+{
+	int ret = 0;
+	bool retrying = false;
+
+retry_caps:
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    dst_endoff, dst_got, NULL);
+	if (ret < 0)
+		return ret;
+
+	/*
+	 * Since we're already holding the FILE_WR capability for the dst file,
+	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+	 * retry dance instead to try to get both capabilities.
+	 */
+	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, src_got);
+	if (ret <= 0) {
+		/* Start by dropping dst_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(dst_ci, *dst_got);
+		if (retrying) {
+			if (!ret)
+				/* ceph_try_get_caps masks EAGAIN */
+				ret = -EAGAIN;
+			return ret;
+		}
+		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED, src_endoff,
+				    src_got, NULL);
+		if (ret < 0)
+			return ret;
+		/*... drop src_ci caps too, and retry */
+		ceph_put_cap_refs(src_ci, *src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+	return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+			   struct ceph_inode_info *dst_ci, int dst_got)
+{
+	ceph_put_cap_refs(src_ci, src_got);
+	ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is OK (inode_newsize_ok())
+ *  - quotas won't be exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+			   loff_t src_off, loff_t dst_off, size_t len)
+{
+	loff_t size, endoff;
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting lenght
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		dout("Copy beyond EOF (%llu + %ld > %llu)\n",
+		     src_off, len, size);
+		return -EOPNOTSUPP;
+	}
+	size = i_size_read(dst_inode);
+
+	endoff = dst_off + len;
+	if(inode_newsize_ok(dst_inode, endoff))
+		return -EOPNOTSUPP;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+		return -EDQUOT;
+
+	return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	struct ceph_object_id src_oid, dst_oid;
+	loff_t endoff = 0, size;
+	ssize_t ret = -EIO;
+	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+	u32 src_objlen, dst_objlen, object_size;
+	int src_got = 0, dst_got = 0, err, dirty;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	/*
+	 * Some of the checks below will return -EOPNOTSUPP, which will force a
+	 * fallback to the default VFS copy_file_range implementation.  This is
+	 * desirable in several cases (for ex, the 'len' is smaller than the
+	 * size of the objects, or in cases where that would be more
+	 * efficient).
+	 */
+
+	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+	    (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+		return -EOPNOTSUPP;
+
+	if (len < src_ci->i_layout.object_size)
+		return -EOPNOTSUPP; /* no remote copy will be done */
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		goto out;
+
+	/*
+	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 */
+	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+			     dst_ci, (dst_off + len), &dst_got);
+	if (err < 0) {
+		dout("get_rd_wr_caps returned %d\n", err);
+		ret = -EOPNOTSUPP;
+		goto out;
+	}
+
+	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+	if (ret < 0)
+		goto out_caps;
+
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		dout("Failed to invalidate inode pages (%ld)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+	ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+				      src_ci->i_layout.object_size,
+				      &src_objnum, &src_objoff, &src_objlen);
+	ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+				      dst_ci->i_layout.object_size,
+				      &dst_objnum, &dst_objoff, &dst_objlen);
+	/* object-level offsets need to the same */
+	if (src_objoff != dst_objoff) {
+		ret = -EOPNOTSUPP;
+		goto out_caps;
+	}
+
+	/*
+	 * Do a manual copy if the object offset isn't object aligned.
+	 * 'src_objlen' contains the bytes left until the end of the object,
+	 * starting at the src_off
+	 */
+	if (src_objoff) {
+		/*
+		 * we need to temporarily drop all caps as we'll be calling
+		 * {read,write}_iter, which will get caps again.
+		 */
+		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+		ret = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, src_objlen, flags);
+		if (ret < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= ret;
+		err = get_rd_wr_caps(src_ci, (src_off + len),
+				     &src_got, dst_ci,
+				     (dst_off + len), &dst_got);
+		if (err < 0)
+			goto out;
+		err = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+		if (err < 0)
+			goto out_caps;
+	}
+	object_size = src_ci->i_layout.object_size;
+	while (len >= object_size) {
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      object_size, &src_objnum,
+					      &src_objoff, &src_objlen);
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      object_size, &dst_objnum,
+					      &dst_objoff, &dst_objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, src_objnum);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, dst_objnum);
+		/* Do an object remote copy */
+		err = ceph_osdc_copy_from(
+			&ceph_inode_to_client(src_inode)->client->osdc,
+			src_ci->i_vino.snap, 0,
+			&src_oid, &src_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
+			dst_ci->i_vino.snap, &dst_oid, &dst_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+		ceph_oid_destroy(&src_oid);
+		ceph_oid_destroy(&dst_oid);
+		if (err) {
+			dout("ceph_osdc_copy_from returned %d\n", err);
+			goto out_caps;
+		}
+		len -= object_size;
+		src_off += object_size;
+		dst_off += object_size;
+		ret += object_size;
+	}
+
+	if (len) {
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      len, &dst_objnum, &dst_objoff,
+					      &dst_objlen);
+		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+		err = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, len, flags);
+		if (err < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= err;
+		ret += err;
+		err = get_rd_wr_caps(src_ci, (src_off + len),
+				     &src_got, dst_ci,
+				     (dst_off + len), &dst_got);
+		if (err < 0) {
+			goto out;
+		}
+	}
+
+	file_update_time(dst_file);
+	if (endoff > size) {
+		int caps_flags = 0;
+
+		/* Let the MDS know about dst file size change */
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+	}
+	/* Mark Fw dirty */
+	spin_lock(&dst_ci->i_ceph_lock);
+	dst_ci->i_inline_version = CEPH_INLINE_NONE;
+	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+	spin_unlock(&dst_ci->i_ceph_lock);
+	if (dirty)
+		__mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+	put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1844,5 +2139,5 @@  const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };
-