diff mbox series

[v3,3/3] ceph: support copy_file_range file operation

Message ID 20180906160620.16277-4-lhenriques@suse.com (mailing list archive)
State New, archived
Headers show
Series copy_file_range in cephfs kernel client | expand

Commit Message

Luis Henriques Sept. 6, 2018, 4:06 p.m. UTC
This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy lenght is smaller than the
object size.

Signed-off-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/file.c | 221 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 221 insertions(+)

Comments

Yan, Zheng Sept. 7, 2018, 10:54 a.m. UTC | #1
On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
>
> This commit implements support for the copy_file_range syscall in cephfs.
> It is implemented using the RADOS 'copy-from' operation, which allows to
> do a remote object copy, without the need to download/upload data from/to
> the OSDs.
>
> Some manual copy may however be required if the source/destination file
> offsets aren't object aligned or if the copy lenght is smaller than the
> object size.
>
> Signed-off-by: Luis Henriques <lhenriques@suse.com>
> ---
>  fs/ceph/file.c | 221 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 221 insertions(+)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 92ab20433682..7e9ce177bdc3 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1,5 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0
>  #include <linux/ceph/ceph_debug.h>
> +#include <linux/ceph/striper.h>
>
>  #include <linux/module.h>
>  #include <linux/sched.h>
> @@ -1829,6 +1830,225 @@ static long ceph_fallocate(struct file *file, int mode,
>         return ret;
>  }
>
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +                                   struct file *dst_file, loff_t dst_off,
> +                                   size_t len, unsigned int flags)
> +{
> +       struct inode *src_inode = file_inode(src_file);
> +       struct inode *dst_inode = file_inode(dst_file);
> +       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +       struct ceph_osd_client *osdc =
> +               &ceph_inode_to_client(src_inode)->client->osdc;
> +       struct ceph_cap_flush *prealloc_cf;
> +       struct ceph_object_locator src_oloc, dst_oloc;
> +       loff_t endoff = 0;
> +       loff_t size;
> +       ssize_t ret = -EIO;
> +       int src_got = 0;
> +       int dst_got = 0;
> +       bool retrying = false;
> +
> +       if (src_inode == dst_inode)
> +               return -EINVAL;
> +       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +               return -EROFS;
> +
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf)
> +               return -ENOMEM;
> +
> +       /* Start by sync'ing the source file */
> +       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +       if (ret < 0)
> +               goto out;
> +
> +retry_caps:
> +       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
> +                           endoff, &dst_got, NULL);
> +       if (ret < 0)
> +               goto out;
> +       /*
> +        * We also need to get FILE_RD capabilities for source file as other
> +        * clients may have dirty data in their caches.  And OSDs know nothing
> +        * about caps, so they can't safely do the remote object copies.
> +        *
> +        * However, since we're already holding the FILE_WR capability for the
> +        * source file, we would risk a deadlock by using ceph_get_caps.  Thus,
> +        * we'll do some retry dance instead to try to get both capabilities.
> +        * If everything fails, we just return -EOPNOTSUPP and fallback to the
> +        * VFS default copy_file_range implementation.
> +        */
> +       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
> +                               false, &src_got);
> +       if (ret <= 0) {
> +               if (retrying) {
> +                       ret = -EOPNOTSUPP;
> +                       goto out_dst_caps;
> +               }
> +               /* Start by dropping dsc_ci caps and getting src_ci caps */
> +               ceph_put_cap_refs(dst_ci, dst_got);
> +               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
> +                                   CEPH_CAP_FILE_SHARED,
> +                                   (src_off + len), &src_got, NULL);
> +               if (ret < 0) {
> +                       ret = -EOPNOTSUPP;
> +                       goto out;
> +               }
> +               /*... drop them too, and retry */
> +               ceph_put_cap_refs(src_ci, src_got);
> +               retrying = true;
> +               goto retry_caps;
> +       }
> +
> +       size = i_size_read(src_inode);
> +       /*
> +        * Don't copy beyond source file EOF.  Instead of simply setting lenght
> +        * to (size - src_off), just drop to VFS default implementation, as the
> +        * local i_size may be stale due to other clients writing to the source
> +        * inode.
> +        */
> +       if (src_off + len > size) {
> +               ret = -EOPNOTSUPP;
> +               goto out_caps;
> +       }
> +       size = i_size_read(dst_inode);
> +       endoff = dst_off + len;
> +       ret = inode_newsize_ok(dst_inode, endoff);
> +       if (ret)
> +               goto out_caps;
> +
> +       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
> +               ret = -EDQUOT;
> +               goto out_caps;
> +       }
> +
> +       /* Drop dst file cached pages */
> +       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +                                           dst_off >> PAGE_SHIFT,
> +                                           endoff >> PAGE_SHIFT);
> +       if (ret < 0) {
> +               printk("Failed to invalidate inode pages (%ld)\n", ret);
> +               ret = 0; /* XXX */
> +       }
> +       src_oloc.pool = src_ci->i_layout.pool_id;
> +       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
> +       dst_oloc.pool = dst_ci->i_layout.pool_id;
> +       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
> +       /*
> +        * TODO: should file_start_write/file_end_write be used for the whole
> +        * loop?  Or any other locking?
> +        */
> +       while (len > 0) {
> +               struct ceph_object_id src_oid, dst_oid;
> +               u64 objnum, objoff;
> +               u32 objlen;
> +               size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
> +               int err = 0;
> +
> +               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&src_oid);
> +               ceph_oid_printf(&src_oid, "%llx.%08llx",
> +                               src_ci->i_vino.ino, objnum);
> +
> +               /* Do manual copy if:
> +                *  - source file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                */
> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
> +                       /* Do not copy beyond this object */
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                              &dst_off, copy_len, flags);

We should release caps refs before calling do_splice_direct().
do_splice_direct calls read_iter and write_iter, which will get caps
refs again. Besides, do_splice_direct() may copy less data than
copy_len.  I missed this in previous review, Sorry.


> +                       if (err < 0) {
> +                               ret = err;

If we have copied same data, then error happened, we should return
size of copied data.

> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ret += copy_len;
> +                       continue;
> +               }
> +
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             copy_len, &objnum, &objoff,
> +                                             &objlen);
> +               ceph_oid_init(&dst_oid);
> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> +                               dst_ci->i_vino.ino, objnum);
> +               /* Again... do a manual copy if:
> +                *  - destination file offset isn't object aligned, or
> +                *  - copy length is smaller than object size
> +                *    (although the object size should be the same for different
> +                *     files in the same filesystem...)
> +                */
> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
> +                       if (copy_len > objlen)
> +                               copy_len = objlen;
> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> +                                             &dst_off, copy_len, flags);
> +                       if (err < 0) {
> +                               ret = err;
> +                               goto out_caps;
> +                       }
> +                       len -= copy_len;
> +                       ceph_oid_destroy(&src_oid);

how about dst_oid.  I think we can set src_oid/dst_oid just before
ceph_osdc_copy_from
> +                       ret += copy_len;
> +                       continue;
> +               }
> +               /* Finally... do an object remote copy */
> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
> +                                         0, /* XXX src_ci->i_version ? */
> +                                         &src_oid, &src_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
> +                                         dst_ci->i_vino.snap, &dst_oid,
> +                                         &dst_oloc,
> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
> +               if (err) {
> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
> +                       ret = err;
> +                       goto out_caps;
> +               }
> +               len -= copy_len;
> +               src_off += copy_len;
> +               dst_off += copy_len;
> +               ret += copy_len;
> +               ceph_oid_destroy(&src_oid);
> +               ceph_oid_destroy(&dst_oid);
> +       }
> +       /* Let the MDS know about destination object size change */
> +       if (endoff > size) {

I think we should file time (call file_update_time()), and mark Fw
dirty even endoff <= size

> +               int dirty;
> +               int caps_flags = CHECK_CAPS_AUTHONLY;
> +
> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_NODELAY;
> +               if (ceph_inode_set_size(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> +               if (caps_flags)
> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> +               spin_lock(&dst_ci->i_ceph_lock);
> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
> +               dirty = __ceph_mark_dirty_caps(
> +                       dst_ci,
> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,

don't add CEPH_CAP_FILE_BUFFER

Regards
Yan, Zheng

> +                       &prealloc_cf);
> +               spin_unlock(&dst_ci->i_ceph_lock);
> +               if (dirty)
> +                       __mark_inode_dirty(dst_inode, dirty);
> +       }
> +out_caps:
> +       ceph_put_cap_refs(src_ci, src_got);
> +out_dst_caps:
> +       ceph_put_cap_refs(dst_ci, dst_got);
> +out:
> +       ceph_free_cap_flush(prealloc_cf);
> +
> +       return ret;
> +}
> +
>  const struct file_operations ceph_file_fops = {
>         .open = ceph_open,
>         .release = ceph_release,
> @@ -1844,5 +2064,6 @@ const struct file_operations ceph_file_fops = {
>         .unlocked_ioctl = ceph_ioctl,
>         .compat_ioctl   = ceph_ioctl,
>         .fallocate      = ceph_fallocate,
> +       .copy_file_range = ceph_copy_file_range,
>  };
>
Luis Henriques Sept. 7, 2018, 4:03 p.m. UTC | #2
"Yan, Zheng" <ukernel@gmail.com> writes:

> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
<...>

>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
>> +                       /* Do not copy beyond this object */
>> +                       if (copy_len > objlen)
>> +                               copy_len = objlen;
>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>> +                                              &dst_off, copy_len, flags);
>
> We should release caps refs before calling do_splice_direct().
> do_splice_direct calls read_iter and write_iter, which will get caps
> refs again.

That's... annoying.  I could release caps, call do_slice_direct and get
the caps again but this would introduce a race window.

I could also re-write this loop so that it would:

- do all the required do_splice_direct calls
- get the caps
- loop doing all the remote object copies
- release caps

But the whole operation would still not be atomic.  Do you have any
suggestion on how to make it atomic?

(Regarding all the other comments, I agree with them and I'll fix them
in a later version.  Thanks a lot for the review!)

Cheers,
Yan, Zheng Sept. 10, 2018, 2:06 a.m. UTC | #3
> On Sep 8, 2018, at 00:03, Luis Henriques <lhenriques@suse.com> wrote:
> 
> "Yan, Zheng" <ukernel@gmail.com> writes:
> 
>> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
> <...>
> 
>>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
>>> +                       /* Do not copy beyond this object */
>>> +                       if (copy_len > objlen)
>>> +                               copy_len = objlen;
>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>> +                                              &dst_off, copy_len, flags);
>> 
>> We should release caps refs before calling do_splice_direct().
>> do_splice_direct calls read_iter and write_iter, which will get caps
>> refs again.
> 
> That's... annoying.  I could release caps, call do_slice_direct and get
> the caps again but this would introduce a race window.
> 
> I could also re-write this loop so that it would:
> 
> - do all the required do_splice_direct calls
> - get the caps
> - loop doing all the remote object copies
> - release caps
> 
> But the whole operation would still not be atomic.  Do you have any
> suggestion on how to make it atomic?

The default implementation (using do_splice_direct) is not atomic. I think we just need to ensure write on single object is atomic.

Regards
Yan, Zheng

> 
> (Regarding all the other comments, I agree with them and I'll fix them
> in a later version.  Thanks a lot for the review!)
> 
> Cheers,
> -- 
> Luis
> 
>> Besides, do_splice_direct() may copy less data than copy_len.  I
>> missed this in previous review, Sorry.
>> 
>> 
>>> +                       if (err < 0) {
>>> +                               ret = err;
>> 
>> If we have copied same data, then error happened, we should return
>> size of copied data.
>> 
>>> +                               goto out_caps;
>>> +                       }
>>> +                       len -= copy_len;
>>> +                       ret += copy_len;
>>> +                       continue;
>>> +               }
>>> +
>>> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
>>> +                                             copy_len, &objnum, &objoff,
>>> +                                             &objlen);
>>> +               ceph_oid_init(&dst_oid);
>>> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
>>> +                               dst_ci->i_vino.ino, objnum);
>>> +               /* Again... do a manual copy if:
>>> +                *  - destination file offset isn't object aligned, or
>>> +                *  - copy length is smaller than object size
>>> +                *    (although the object size should be the same for different
>>> +                *     files in the same filesystem...)
>>> +                */
>>> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
>>> +                       if (copy_len > objlen)
>>> +                               copy_len = objlen;
>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>> +                                             &dst_off, copy_len, flags);
>>> +                       if (err < 0) {
>>> +                               ret = err;
>>> +                               goto out_caps;
>>> +                       }
>>> +                       len -= copy_len;
>>> +                       ceph_oid_destroy(&src_oid);
>> 
>> how about dst_oid.  I think we can set src_oid/dst_oid just before
>> ceph_osdc_copy_from
>>> +                       ret += copy_len;
>>> +                       continue;
>>> +               }
>>> +               /* Finally... do an object remote copy */
>>> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
>>> +                                         0, /* XXX src_ci->i_version ? */
>>> +                                         &src_oid, &src_oloc,
>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
>>> +                                         dst_ci->i_vino.snap, &dst_oid,
>>> +                                         &dst_oloc,
>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
>>> +               if (err) {
>>> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
>>> +                       ret = err;
>>> +                       goto out_caps;
>>> +               }
>>> +               len -= copy_len;
>>> +               src_off += copy_len;
>>> +               dst_off += copy_len;
>>> +               ret += copy_len;
>>> +               ceph_oid_destroy(&src_oid);
>>> +               ceph_oid_destroy(&dst_oid);
>>> +       }
>>> +       /* Let the MDS know about destination object size change */
>>> +       if (endoff > size) {
>> 
>> I think we should file time (call file_update_time()), and mark Fw
>> dirty even endoff <= size
>> 
>>> +               int dirty;
>>> +               int caps_flags = CHECK_CAPS_AUTHONLY;
>>> +
>>> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
>>> +                       caps_flags |= CHECK_CAPS_NODELAY;
>>> +               if (ceph_inode_set_size(dst_inode, endoff))
>>> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
>>> +               if (caps_flags)
>>> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
>>> +               spin_lock(&dst_ci->i_ceph_lock);
>>> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
>>> +               dirty = __ceph_mark_dirty_caps(
>>> +                       dst_ci,
>>> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
>> 
>> don't add CEPH_CAP_FILE_BUFFER
>> 
>> Regards
>> Yan, Zheng
>> 
>>> +                       &prealloc_cf);
>>> +               spin_unlock(&dst_ci->i_ceph_lock);
>>> +               if (dirty)
>>> +                       __mark_inode_dirty(dst_inode, dirty);
>>> +       }
>>> +out_caps:
>>> +       ceph_put_cap_refs(src_ci, src_got);
>>> +out_dst_caps:
>>> +       ceph_put_cap_refs(dst_ci, dst_got);
>>> +out:
>>> +       ceph_free_cap_flush(prealloc_cf);
>>> +
>>> +       return ret;
>>> +}
>>> +
>>> const struct file_operations ceph_file_fops = {
>>>        .open = ceph_open,
>>>        .release = ceph_release,
>>> @@ -1844,5 +2064,6 @@ const struct file_operations ceph_file_fops = {
>>>        .unlocked_ioctl = ceph_ioctl,
>>>        .compat_ioctl   = ceph_ioctl,
>>>        .fallocate      = ceph_fallocate,
>>> +       .copy_file_range = ceph_copy_file_range,
>>> };
>>> 
>>
Luis Henriques Sept. 10, 2018, 9:36 a.m. UTC | #4
"Yan, Zheng" <zyan@redhat.com> writes:

>> On Sep 8, 2018, at 00:03, Luis Henriques <lhenriques@suse.com> wrote:
>> 
>> "Yan, Zheng" <ukernel@gmail.com> writes:
>> 
>>> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
>> <...>
>> 
>>>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
>>>> +                       /* Do not copy beyond this object */
>>>> +                       if (copy_len > objlen)
>>>> +                               copy_len = objlen;
>>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>>> +                                              &dst_off, copy_len, flags);
>>> 
>>> We should release caps refs before calling do_splice_direct().
>>> do_splice_direct calls read_iter and write_iter, which will get caps
>>> refs again.
>> 
>> That's... annoying.  I could release caps, call do_slice_direct and get
>> the caps again but this would introduce a race window.
>> 
>> I could also re-write this loop so that it would:
>> 
>> - do all the required do_splice_direct calls
>> - get the caps
>> - loop doing all the remote object copies
>> - release caps
>> 
>> But the whole operation would still not be atomic.  Do you have any
>> suggestion on how to make it atomic?
>
> The default implementation (using do_splice_direct) is not atomic. I
> think we just need to ensure write on single object is atomic.

Well... yep, that probably makes sense.  In that case, I'll rework the
code to do the get+release caps in the right places.  Again, thanks for
your valuable input!

Cheers,
Gregory Farnum Sept. 11, 2018, 5:47 p.m. UTC | #5
On Sun, Sep 9, 2018 at 7:06 PM, Yan, Zheng <zyan@redhat.com> wrote:
>
>
>> On Sep 8, 2018, at 00:03, Luis Henriques <lhenriques@suse.com> wrote:
>>
>> "Yan, Zheng" <ukernel@gmail.com> writes:
>>
>>> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
>> <...>
>>
>>>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
>>>> +                       /* Do not copy beyond this object */
>>>> +                       if (copy_len > objlen)
>>>> +                               copy_len = objlen;
>>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>>> +                                              &dst_off, copy_len, flags);
>>>
>>> We should release caps refs before calling do_splice_direct().
>>> do_splice_direct calls read_iter and write_iter, which will get caps
>>> refs again.
>>
>> That's... annoying.  I could release caps, call do_slice_direct and get
>> the caps again but this would introduce a race window.
>>
>> I could also re-write this loop so that it would:
>>
>> - do all the required do_splice_direct calls
>> - get the caps
>> - loop doing all the remote object copies
>> - release caps
>>
>> But the whole operation would still not be atomic.  Do you have any
>> suggestion on how to make it atomic?
>
> The default implementation (using do_splice_direct) is not atomic. I think we just need to ensure write on single object is atomic.

I didn't see anything specific about atomicity in the copy_file_range
man page, but the idea that it wouldn't be atomic definitely seems
weird to me. Are we sure this is okay? Is the kernel's default
implementation doing something non-atomic that would be atomic if we
were a local FS?
-Greg

>
> Regards
> Yan, Zheng
>
>>
>> (Regarding all the other comments, I agree with them and I'll fix them
>> in a later version.  Thanks a lot for the review!)
>>
>> Cheers,
>> --
>> Luis
>>
>>> Besides, do_splice_direct() may copy less data than copy_len.  I
>>> missed this in previous review, Sorry.
>>>
>>>
>>>> +                       if (err < 0) {
>>>> +                               ret = err;
>>>
>>> If we have copied same data, then error happened, we should return
>>> size of copied data.
>>>
>>>> +                               goto out_caps;
>>>> +                       }
>>>> +                       len -= copy_len;
>>>> +                       ret += copy_len;
>>>> +                       continue;
>>>> +               }
>>>> +
>>>> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
>>>> +                                             copy_len, &objnum, &objoff,
>>>> +                                             &objlen);
>>>> +               ceph_oid_init(&dst_oid);
>>>> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
>>>> +                               dst_ci->i_vino.ino, objnum);
>>>> +               /* Again... do a manual copy if:
>>>> +                *  - destination file offset isn't object aligned, or
>>>> +                *  - copy length is smaller than object size
>>>> +                *    (although the object size should be the same for different
>>>> +                *     files in the same filesystem...)
>>>> +                */
>>>> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
>>>> +                       if (copy_len > objlen)
>>>> +                               copy_len = objlen;
>>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>>> +                                             &dst_off, copy_len, flags);
>>>> +                       if (err < 0) {
>>>> +                               ret = err;
>>>> +                               goto out_caps;
>>>> +                       }
>>>> +                       len -= copy_len;
>>>> +                       ceph_oid_destroy(&src_oid);
>>>
>>> how about dst_oid.  I think we can set src_oid/dst_oid just before
>>> ceph_osdc_copy_from
>>>> +                       ret += copy_len;
>>>> +                       continue;
>>>> +               }
>>>> +               /* Finally... do an object remote copy */
>>>> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
>>>> +                                         0, /* XXX src_ci->i_version ? */
>>>> +                                         &src_oid, &src_oloc,
>>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
>>>> +                                         dst_ci->i_vino.snap, &dst_oid,
>>>> +                                         &dst_oloc,
>>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
>>>> +               if (err) {
>>>> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
>>>> +                       ret = err;
>>>> +                       goto out_caps;
>>>> +               }
>>>> +               len -= copy_len;
>>>> +               src_off += copy_len;
>>>> +               dst_off += copy_len;
>>>> +               ret += copy_len;
>>>> +               ceph_oid_destroy(&src_oid);
>>>> +               ceph_oid_destroy(&dst_oid);
>>>> +       }
>>>> +       /* Let the MDS know about destination object size change */
>>>> +       if (endoff > size) {
>>>
>>> I think we should file time (call file_update_time()), and mark Fw
>>> dirty even endoff <= size
>>>
>>>> +               int dirty;
>>>> +               int caps_flags = CHECK_CAPS_AUTHONLY;
>>>> +
>>>> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
>>>> +                       caps_flags |= CHECK_CAPS_NODELAY;
>>>> +               if (ceph_inode_set_size(dst_inode, endoff))
>>>> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
>>>> +               if (caps_flags)
>>>> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
>>>> +               spin_lock(&dst_ci->i_ceph_lock);
>>>> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
>>>> +               dirty = __ceph_mark_dirty_caps(
>>>> +                       dst_ci,
>>>> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
>>>
>>> don't add CEPH_CAP_FILE_BUFFER
>>>
>>> Regards
>>> Yan, Zheng
>>>
>>>> +                       &prealloc_cf);
>>>> +               spin_unlock(&dst_ci->i_ceph_lock);
>>>> +               if (dirty)
>>>> +                       __mark_inode_dirty(dst_inode, dirty);
>>>> +       }
>>>> +out_caps:
>>>> +       ceph_put_cap_refs(src_ci, src_got);
>>>> +out_dst_caps:
>>>> +       ceph_put_cap_refs(dst_ci, dst_got);
>>>> +out:
>>>> +       ceph_free_cap_flush(prealloc_cf);
>>>> +
>>>> +       return ret;
>>>> +}
>>>> +
>>>> const struct file_operations ceph_file_fops = {
>>>>        .open = ceph_open,
>>>>        .release = ceph_release,
>>>> @@ -1844,5 +2064,6 @@ const struct file_operations ceph_file_fops = {
>>>>        .unlocked_ioctl = ceph_ioctl,
>>>>        .compat_ioctl   = ceph_ioctl,
>>>>        .fallocate      = ceph_fallocate,
>>>> +       .copy_file_range = ceph_copy_file_range,
>>>> };
>>>>
>>>
>
Yan, Zheng Sept. 12, 2018, 1:18 a.m. UTC | #6
On Wed, Sep 12, 2018 at 1:47 AM Gregory Farnum <gfarnum@redhat.com> wrote:
>
> On Sun, Sep 9, 2018 at 7:06 PM, Yan, Zheng <zyan@redhat.com> wrote:
> >
> >
> >> On Sep 8, 2018, at 00:03, Luis Henriques <lhenriques@suse.com> wrote:
> >>
> >> "Yan, Zheng" <ukernel@gmail.com> writes:
> >>
> >>> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
> >> <...>
> >>
> >>>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
> >>>> +                       /* Do not copy beyond this object */
> >>>> +                       if (copy_len > objlen)
> >>>> +                               copy_len = objlen;
> >>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> >>>> +                                              &dst_off, copy_len, flags);
> >>>
> >>> We should release caps refs before calling do_splice_direct().
> >>> do_splice_direct calls read_iter and write_iter, which will get caps
> >>> refs again.
> >>
> >> That's... annoying.  I could release caps, call do_slice_direct and get
> >> the caps again but this would introduce a race window.
> >>
> >> I could also re-write this loop so that it would:
> >>
> >> - do all the required do_splice_direct calls
> >> - get the caps
> >> - loop doing all the remote object copies
> >> - release caps
> >>
> >> But the whole operation would still not be atomic.  Do you have any
> >> suggestion on how to make it atomic?
> >
> > The default implementation (using do_splice_direct) is not atomic. I think we just need to ensure write on single object is atomic.
>
> I didn't see anything specific about atomicity in the copy_file_range
> man page, but the idea that it wouldn't be atomic definitely seems
> weird to me. Are we sure this is okay? Is the kernel's default
> implementation doing something non-atomic that would be atomic if we
> were a local FS?
> -Greg
>

The default implementation just calls filesystem's read/write
callback. It's not atomic when copy size is larger than kernel
internal buffer size.

Regards
Yan, Zheng

> >
> > Regards
> > Yan, Zheng
> >
> >>
> >> (Regarding all the other comments, I agree with them and I'll fix them
> >> in a later version.  Thanks a lot for the review!)
> >>
> >> Cheers,
> >> --
> >> Luis
> >>
> >>> Besides, do_splice_direct() may copy less data than copy_len.  I
> >>> missed this in previous review, Sorry.
> >>>
> >>>
> >>>> +                       if (err < 0) {
> >>>> +                               ret = err;
> >>>
> >>> If we have copied same data, then error happened, we should return
> >>> size of copied data.
> >>>
> >>>> +                               goto out_caps;
> >>>> +                       }
> >>>> +                       len -= copy_len;
> >>>> +                       ret += copy_len;
> >>>> +                       continue;
> >>>> +               }
> >>>> +
> >>>> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> >>>> +                                             copy_len, &objnum, &objoff,
> >>>> +                                             &objlen);
> >>>> +               ceph_oid_init(&dst_oid);
> >>>> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> >>>> +                               dst_ci->i_vino.ino, objnum);
> >>>> +               /* Again... do a manual copy if:
> >>>> +                *  - destination file offset isn't object aligned, or
> >>>> +                *  - copy length is smaller than object size
> >>>> +                *    (although the object size should be the same for different
> >>>> +                *     files in the same filesystem...)
> >>>> +                */
> >>>> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
> >>>> +                       if (copy_len > objlen)
> >>>> +                               copy_len = objlen;
> >>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
> >>>> +                                             &dst_off, copy_len, flags);
> >>>> +                       if (err < 0) {
> >>>> +                               ret = err;
> >>>> +                               goto out_caps;
> >>>> +                       }
> >>>> +                       len -= copy_len;
> >>>> +                       ceph_oid_destroy(&src_oid);
> >>>
> >>> how about dst_oid.  I think we can set src_oid/dst_oid just before
> >>> ceph_osdc_copy_from
> >>>> +                       ret += copy_len;
> >>>> +                       continue;
> >>>> +               }
> >>>> +               /* Finally... do an object remote copy */
> >>>> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
> >>>> +                                         0, /* XXX src_ci->i_version ? */
> >>>> +                                         &src_oid, &src_oloc,
> >>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
> >>>> +                                         dst_ci->i_vino.snap, &dst_oid,
> >>>> +                                         &dst_oloc,
> >>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
> >>>> +               if (err) {
> >>>> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
> >>>> +                       ret = err;
> >>>> +                       goto out_caps;
> >>>> +               }
> >>>> +               len -= copy_len;
> >>>> +               src_off += copy_len;
> >>>> +               dst_off += copy_len;
> >>>> +               ret += copy_len;
> >>>> +               ceph_oid_destroy(&src_oid);
> >>>> +               ceph_oid_destroy(&dst_oid);
> >>>> +       }
> >>>> +       /* Let the MDS know about destination object size change */
> >>>> +       if (endoff > size) {
> >>>
> >>> I think we should file time (call file_update_time()), and mark Fw
> >>> dirty even endoff <= size
> >>>
> >>>> +               int dirty;
> >>>> +               int caps_flags = CHECK_CAPS_AUTHONLY;
> >>>> +
> >>>> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> >>>> +                       caps_flags |= CHECK_CAPS_NODELAY;
> >>>> +               if (ceph_inode_set_size(dst_inode, endoff))
> >>>> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> >>>> +               if (caps_flags)
> >>>> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> >>>> +               spin_lock(&dst_ci->i_ceph_lock);
> >>>> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
> >>>> +               dirty = __ceph_mark_dirty_caps(
> >>>> +                       dst_ci,
> >>>> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
> >>>
> >>> don't add CEPH_CAP_FILE_BUFFER
> >>>
> >>> Regards
> >>> Yan, Zheng
> >>>
> >>>> +                       &prealloc_cf);
> >>>> +               spin_unlock(&dst_ci->i_ceph_lock);
> >>>> +               if (dirty)
> >>>> +                       __mark_inode_dirty(dst_inode, dirty);
> >>>> +       }
> >>>> +out_caps:
> >>>> +       ceph_put_cap_refs(src_ci, src_got);
> >>>> +out_dst_caps:
> >>>> +       ceph_put_cap_refs(dst_ci, dst_got);
> >>>> +out:
> >>>> +       ceph_free_cap_flush(prealloc_cf);
> >>>> +
> >>>> +       return ret;
> >>>> +}
> >>>> +
> >>>> const struct file_operations ceph_file_fops = {
> >>>>        .open = ceph_open,
> >>>>        .release = ceph_release,
> >>>> @@ -1844,5 +2064,6 @@ const struct file_operations ceph_file_fops = {
> >>>>        .unlocked_ioctl = ceph_ioctl,
> >>>>        .compat_ioctl   = ceph_ioctl,
> >>>>        .fallocate      = ceph_fallocate,
> >>>> +       .copy_file_range = ceph_copy_file_range,
> >>>> };
> >>>>
> >>>
> >
Ric Wheeler Sept. 12, 2018, 11:23 a.m. UTC | #7
On 09/11/2018 09:18 PM, Yan, Zheng wrote:
> On Wed, Sep 12, 2018 at 1:47 AM Gregory Farnum <gfarnum@redhat.com> wrote:
>> On Sun, Sep 9, 2018 at 7:06 PM, Yan, Zheng <zyan@redhat.com> wrote:
>>>
>>>> On Sep 8, 2018, at 00:03, Luis Henriques <lhenriques@suse.com> wrote:
>>>>
>>>> "Yan, Zheng" <ukernel@gmail.com> writes:
>>>>
>>>>> On Fri, Sep 7, 2018 at 12:08 AM Luis Henriques <lhenriques@suse.com> wrote:
>>>> <...>
>>>>
>>>>>> +               if (objoff || (copy_len < src_ci->i_layout.object_size)) {
>>>>>> +                       /* Do not copy beyond this object */
>>>>>> +                       if (copy_len > objlen)
>>>>>> +                               copy_len = objlen;
>>>>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>>>>> +                                              &dst_off, copy_len, flags);
>>>>> We should release caps refs before calling do_splice_direct().
>>>>> do_splice_direct calls read_iter and write_iter, which will get caps
>>>>> refs again.
>>>> That's... annoying.  I could release caps, call do_slice_direct and get
>>>> the caps again but this would introduce a race window.
>>>>
>>>> I could also re-write this loop so that it would:
>>>>
>>>> - do all the required do_splice_direct calls
>>>> - get the caps
>>>> - loop doing all the remote object copies
>>>> - release caps
>>>>
>>>> But the whole operation would still not be atomic.  Do you have any
>>>> suggestion on how to make it atomic?
>>> The default implementation (using do_splice_direct) is not atomic. I think we just need to ensure write on single object is atomic.
>> I didn't see anything specific about atomicity in the copy_file_range
>> man page, but the idea that it wouldn't be atomic definitely seems
>> weird to me. Are we sure this is okay? Is the kernel's default
>> implementation doing something non-atomic that would be atomic if we
>> were a local FS?
>> -Greg
>>
> The default implementation just calls filesystem's read/write
> callback. It's not atomic when copy size is larger than kernel
> internal buffer size.
>
> Regards
> Yan, Zheng

I don't know of any expectation that this is atomic in any implementation.

Regards,

Ric

>
>>> Regards
>>> Yan, Zheng
>>>
>>>> (Regarding all the other comments, I agree with them and I'll fix them
>>>> in a later version.  Thanks a lot for the review!)
>>>>
>>>> Cheers,
>>>> --
>>>> Luis
>>>>
>>>>> Besides, do_splice_direct() may copy less data than copy_len.  I
>>>>> missed this in previous review, Sorry.
>>>>>
>>>>>
>>>>>> +                       if (err < 0) {
>>>>>> +                               ret = err;
>>>>> If we have copied same data, then error happened, we should return
>>>>> size of copied data.
>>>>>
>>>>>> +                               goto out_caps;
>>>>>> +                       }
>>>>>> +                       len -= copy_len;
>>>>>> +                       ret += copy_len;
>>>>>> +                       continue;
>>>>>> +               }
>>>>>> +
>>>>>> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
>>>>>> +                                             copy_len, &objnum, &objoff,
>>>>>> +                                             &objlen);
>>>>>> +               ceph_oid_init(&dst_oid);
>>>>>> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
>>>>>> +                               dst_ci->i_vino.ino, objnum);
>>>>>> +               /* Again... do a manual copy if:
>>>>>> +                *  - destination file offset isn't object aligned, or
>>>>>> +                *  - copy length is smaller than object size
>>>>>> +                *    (although the object size should be the same for different
>>>>>> +                *     files in the same filesystem...)
>>>>>> +                */
>>>>>> +               if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
>>>>>> +                       if (copy_len > objlen)
>>>>>> +                               copy_len = objlen;
>>>>>> +                       err = do_splice_direct(src_file, &src_off, dst_file,
>>>>>> +                                             &dst_off, copy_len, flags);
>>>>>> +                       if (err < 0) {
>>>>>> +                               ret = err;
>>>>>> +                               goto out_caps;
>>>>>> +                       }
>>>>>> +                       len -= copy_len;
>>>>>> +                       ceph_oid_destroy(&src_oid);
>>>>> how about dst_oid.  I think we can set src_oid/dst_oid just before
>>>>> ceph_osdc_copy_from
>>>>>> +                       ret += copy_len;
>>>>>> +                       continue;
>>>>>> +               }
>>>>>> +               /* Finally... do an object remote copy */
>>>>>> +               err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
>>>>>> +                                         0, /* XXX src_ci->i_version ? */
>>>>>> +                                         &src_oid, &src_oloc,
>>>>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
>>>>>> +                                         dst_ci->i_vino.snap, &dst_oid,
>>>>>> +                                         &dst_oloc,
>>>>>> +                                         CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
>>>>>> +               if (err) {
>>>>>> +                       printk("copy_from returned an error: %d\n", err); /* XXX */
>>>>>> +                       ret = err;
>>>>>> +                       goto out_caps;
>>>>>> +               }
>>>>>> +               len -= copy_len;
>>>>>> +               src_off += copy_len;
>>>>>> +               dst_off += copy_len;
>>>>>> +               ret += copy_len;
>>>>>> +               ceph_oid_destroy(&src_oid);
>>>>>> +               ceph_oid_destroy(&dst_oid);
>>>>>> +       }
>>>>>> +       /* Let the MDS know about destination object size change */
>>>>>> +       if (endoff > size) {
>>>>> I think we should file time (call file_update_time()), and mark Fw
>>>>> dirty even endoff <= size
>>>>>
>>>>>> +               int dirty;
>>>>>> +               int caps_flags = CHECK_CAPS_AUTHONLY;
>>>>>> +
>>>>>> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
>>>>>> +                       caps_flags |= CHECK_CAPS_NODELAY;
>>>>>> +               if (ceph_inode_set_size(dst_inode, endoff))
>>>>>> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
>>>>>> +               if (caps_flags)
>>>>>> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
>>>>>> +               spin_lock(&dst_ci->i_ceph_lock);
>>>>>> +               dst_ci->i_inline_version = CEPH_INLINE_NONE;
>>>>>> +               dirty = __ceph_mark_dirty_caps(
>>>>>> +                       dst_ci,
>>>>>> +                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
>>>>> don't add CEPH_CAP_FILE_BUFFER
>>>>>
>>>>> Regards
>>>>> Yan, Zheng
>>>>>
>>>>>> +                       &prealloc_cf);
>>>>>> +               spin_unlock(&dst_ci->i_ceph_lock);
>>>>>> +               if (dirty)
>>>>>> +                       __mark_inode_dirty(dst_inode, dirty);
>>>>>> +       }
>>>>>> +out_caps:
>>>>>> +       ceph_put_cap_refs(src_ci, src_got);
>>>>>> +out_dst_caps:
>>>>>> +       ceph_put_cap_refs(dst_ci, dst_got);
>>>>>> +out:
>>>>>> +       ceph_free_cap_flush(prealloc_cf);
>>>>>> +
>>>>>> +       return ret;
>>>>>> +}
>>>>>> +
>>>>>> const struct file_operations ceph_file_fops = {
>>>>>>         .open = ceph_open,
>>>>>>         .release = ceph_release,
>>>>>> @@ -1844,5 +2064,6 @@ const struct file_operations ceph_file_fops = {
>>>>>>         .unlocked_ioctl = ceph_ioctl,
>>>>>>         .compat_ioctl   = ceph_ioctl,
>>>>>>         .fallocate      = ceph_fallocate,
>>>>>> +       .copy_file_range = ceph_copy_file_range,
>>>>>> };
>>>>>>
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 92ab20433682..7e9ce177bdc3 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -1829,6 +1830,225 @@  static long ceph_fallocate(struct file *file, int mode,
 	return ret;
 }
 
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_osd_client *osdc =
+		&ceph_inode_to_client(src_inode)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	loff_t endoff = 0;
+	loff_t size;
+	ssize_t ret = -EIO;
+	int src_got = 0;
+	int dst_got = 0;
+	bool retrying = false;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		goto out;
+
+retry_caps:
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    endoff, &dst_got, NULL);
+	if (ret < 0)
+		goto out;
+	/*
+	 * We also need to get FILE_RD capabilities for source file as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 *
+	 * However, since we're already holding the FILE_WR capability for the
+	 * source file, we would risk a deadlock by using ceph_get_caps.  Thus,
+	 * we'll do some retry dance instead to try to get both capabilities.
+	 * If everything fails, we just return -EOPNOTSUPP and fallback to the
+	 * VFS default copy_file_range implementation.
+	 */
+	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, &src_got);
+	if (ret <= 0) {
+		if (retrying) {
+			ret = -EOPNOTSUPP;
+			goto out_dst_caps;
+		}
+		/* Start by dropping dsc_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(dst_ci, dst_got);
+		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED,
+				    (src_off + len), &src_got, NULL);
+		if (ret < 0) {
+			ret = -EOPNOTSUPP;
+			goto out;
+		}
+		/*... drop them too, and retry */
+		ceph_put_cap_refs(src_ci, src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting lenght
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		ret = -EOPNOTSUPP;
+		goto out_caps;
+	}
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+	ret = inode_newsize_ok(dst_inode, endoff);
+	if (ret)
+		goto out_caps;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) {
+		ret = -EDQUOT;
+		goto out_caps;
+	}
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		printk("Failed to invalidate inode pages (%ld)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+	/*
+	 * TODO: should file_start_write/file_end_write be used for the whole
+	 * loop?  Or any other locking?
+	 */
+	while (len > 0) {
+		struct ceph_object_id src_oid, dst_oid;
+		u64 objnum, objoff;
+		u32 objlen;
+		size_t copy_len = min_t(size_t, src_ci->i_layout.object_size, len);
+		int err = 0;
+
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, objnum);
+
+		/* Do manual copy if:
+		 *  - source file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 */
+		if (objoff || (copy_len < src_ci->i_layout.object_size)) {
+			/* Do not copy beyond this object */
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					       &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ret += copy_len;
+			continue;
+		}
+
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      copy_len, &objnum, &objoff,
+					      &objlen);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, objnum);
+		/* Again... do a manual copy if:
+		 *  - destination file offset isn't object aligned, or
+		 *  - copy length is smaller than object size
+		 *    (although the object size should be the same for different
+		 *     files in the same filesystem...)
+		 */
+		if (objoff || (copy_len < dst_ci->i_layout.object_size)) {
+			if (copy_len > objlen)
+				copy_len = objlen;
+			err = do_splice_direct(src_file, &src_off, dst_file,
+					      &dst_off, copy_len, flags);
+			if (err < 0) {
+				ret = err;
+				goto out_caps;
+			}
+			len -= copy_len;
+			ceph_oid_destroy(&src_oid);
+			ret += copy_len;
+			continue;
+		}
+		/* Finally... do an object remote copy */
+		err = ceph_osdc_copy_from(osdc, src_ci->i_vino.snap,
+					  0, /* XXX src_ci->i_version ? */
+					  &src_oid, &src_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED,
+					  dst_ci->i_vino.snap, &dst_oid,
+					  &dst_oloc,
+					  CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL|CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+		if (err) {
+			printk("copy_from returned an error: %d\n", err); /* XXX */
+			ret = err;
+			goto out_caps;
+		}
+		len -= copy_len;
+		src_off += copy_len;
+		dst_off += copy_len;
+		ret += copy_len;
+		ceph_oid_destroy(&src_oid);
+		ceph_oid_destroy(&dst_oid);
+	}
+	/* Let the MDS know about destination object size change */
+	if (endoff > size) {
+		int dirty;
+		int caps_flags = CHECK_CAPS_AUTHONLY;
+
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+		spin_lock(&dst_ci->i_ceph_lock);
+		dst_ci->i_inline_version = CEPH_INLINE_NONE;
+		dirty = __ceph_mark_dirty_caps(
+			dst_ci,
+			CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER,
+			&prealloc_cf);
+		spin_unlock(&dst_ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(dst_inode, dirty);
+	}
+out_caps:
+	ceph_put_cap_refs(src_ci, src_got);
+out_dst_caps:
+	ceph_put_cap_refs(dst_ci, dst_got);
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1844,5 +2064,6 @@  const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };