diff mbox series

[v7,3/4] ceph: support copy_file_range file operation

Message ID 20181015154600.13251-4-lhenriques@suse.com (mailing list archive)
State New, archived
Headers show
Series copy_file_range in cephfs kernel client | expand

Commit Message

Luis Henriques Oct. 15, 2018, 3:45 p.m. UTC
This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy length is smaller than the
object size.

Signed-off-by: Luis Henriques <lhenriques@suse.com>
---
 fs/ceph/file.c | 294 ++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 293 insertions(+), 1 deletion(-)

Comments

Yan, Zheng Oct. 16, 2018, 10:51 p.m. UTC | #1
On Mon, Oct 15, 2018 at 11:47 PM Luis Henriques <lhenriques@suse.com> wrote:
>
> This commit implements support for the copy_file_range syscall in cephfs.
> It is implemented using the RADOS 'copy-from' operation, which allows to
> do a remote object copy, without the need to download/upload data from/to
> the OSDs.
>
> Some manual copy may however be required if the source/destination file
> offsets aren't object aligned or if the copy length is smaller than the
> object size.
>
> Signed-off-by: Luis Henriques <lhenriques@suse.com>
> ---
>  fs/ceph/file.c | 294 ++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 293 insertions(+), 1 deletion(-)
>
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 92ab20433682..0942f45995a1 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1,5 +1,6 @@
>  // SPDX-License-Identifier: GPL-2.0
>  #include <linux/ceph/ceph_debug.h>
> +#include <linux/ceph/striper.h>
>
>  #include <linux/module.h>
>  #include <linux/sched.h>
> @@ -1829,6 +1830,297 @@ static long ceph_fallocate(struct file *file, int mode,
>         return ret;
>  }
>
> +/*
> + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
> + * src_ci.  Two attempts are made to obtain both caps, and an error is return if
> + * this fails; zero is returned on success.
> + */
> +static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
> +                         loff_t src_endoff, int *src_got,
> +                         struct ceph_inode_info *dst_ci,
> +                         loff_t dst_endoff, int *dst_got)
> +{
> +       int ret = 0;
> +       bool retrying = false;
> +
> +retry_caps:
> +       ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
> +                           dst_endoff, dst_got, NULL);
> +       if (ret < 0)
> +               return ret;
> +
> +       /*
> +        * Since we're already holding the FILE_WR capability for the dst file,
> +        * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
> +        * retry dance instead to try to get both capabilities.
> +        */
> +       ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
> +                               false, src_got);
> +       if (ret <= 0) {
> +               /* Start by dropping dst_ci caps and getting src_ci caps */
> +               ceph_put_cap_refs(dst_ci, *dst_got);
> +               if (retrying) {
> +                       if (!ret)
> +                               /* ceph_try_get_caps masks EAGAIN */
> +                               ret = -EAGAIN;
> +                       return ret;
> +               }
> +               ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
> +                                   CEPH_CAP_FILE_SHARED, src_endoff,
> +                                   src_got, NULL);
> +               if (ret < 0)
> +                       return ret;
> +               /*... drop src_ci caps too, and retry */
> +               ceph_put_cap_refs(src_ci, *src_got);
> +               retrying = true;
> +               goto retry_caps;
> +       }
> +       return ret;
> +}
> +
> +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
> +                          struct ceph_inode_info *dst_ci, int dst_got)
> +{
> +       ceph_put_cap_refs(src_ci, src_got);
> +       ceph_put_cap_refs(dst_ci, dst_got);
> +}
> +
> +/*
> + * This function does several size-related checks, returning an error if:
> + *  - source file is smaller than off+len
> + *  - destination file size is not OK (inode_newsize_ok())
> + *  - max bytes quotas is exceeded
> + */
> +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
> +                          loff_t src_off, loff_t dst_off, size_t len)
> +{
> +       loff_t size, endoff;
> +
> +       size = i_size_read(src_inode);
> +       /*
> +        * Don't copy beyond source file EOF.  Instead of simply setting length
> +        * to (size - src_off), just drop to VFS default implementation, as the
> +        * local i_size may be stale due to other clients writing to the source
> +        * inode.
> +        */
> +       if (src_off + len > size) {
> +               dout("Copy beyond EOF (%llu + %ld > %llu)\n",
> +                    src_off, len, size);
> +               return -EOPNOTSUPP;
> +       }
> +       size = i_size_read(dst_inode);
> +
> +       endoff = dst_off + len;
> +       if (inode_newsize_ok(dst_inode, endoff))
> +               return -EOPNOTSUPP;
> +
> +       if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
> +               return -EDQUOT;
> +
> +       return 0;
> +}
> +
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +                                   struct file *dst_file, loff_t dst_off,
> +                                   size_t len, unsigned int flags)
> +{
> +       struct inode *src_inode = file_inode(src_file);
> +       struct inode *dst_inode = file_inode(dst_file);
> +       struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +       struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +       struct ceph_cap_flush *prealloc_cf;
> +       struct ceph_object_locator src_oloc, dst_oloc;
> +       struct ceph_object_id src_oid, dst_oid;
> +       loff_t endoff = 0, size;
> +       ssize_t ret = -EIO;
> +       u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
> +       u32 src_objlen, dst_objlen, object_size;
> +       int src_got = 0, dst_got = 0, err, dirty;
> +       bool do_final_copy = false;
> +
> +       if (src_inode == dst_inode)
> +               return -EINVAL;
> +       if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +               return -EROFS;
> +
> +       /*
> +        * Some of the checks below will return -EOPNOTSUPP, which will force a
> +        * fallback to the default VFS copy_file_range implementation.  This is
> +        * desirable in several cases (for ex, the 'len' is smaller than the
> +        * size of the objects, or in cases where that would be more
> +        * efficient).
> +        */
> +
> +       if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
> +           (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
> +           (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
> +               return -EOPNOTSUPP;
> +
> +       if (len < src_ci->i_layout.object_size)
> +               return -EOPNOTSUPP; /* no remote copy will be done */
> +
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf)
> +               return -ENOMEM;
> +
> +       /* Start by sync'ing the source file */
> +       ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +       if (ret < 0)
> +               goto out;
> +
> +       /*
> +        * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
> +        * clients may have dirty data in their caches.  And OSDs know nothing
> +        * about caps, so they can't safely do the remote object copies.
> +        */
> +       err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
> +                            dst_ci, (dst_off + len), &dst_got);
> +       if (err < 0) {
> +               dout("get_rd_wr_caps returned %d\n", err);
> +               ret = -EOPNOTSUPP;
> +               goto out;
> +       }
> +
> +       ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
> +       if (ret < 0)
> +               goto out_caps;
> +
> +       size = i_size_read(dst_inode);
> +       endoff = dst_off + len;
> +
> +       /* Drop dst file cached pages */
> +       ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +                                           dst_off >> PAGE_SHIFT,
> +                                           endoff >> PAGE_SHIFT);
> +       if (ret < 0) {
> +               dout("Failed to invalidate inode pages (%ld)\n", ret);
> +               ret = 0; /* XXX */
> +       }
> +       src_oloc.pool = src_ci->i_layout.pool_id;
> +       src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
> +       dst_oloc.pool = dst_ci->i_layout.pool_id;
> +       dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
> +
> +       ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                     src_ci->i_layout.object_size,
> +                                     &src_objnum, &src_objoff, &src_objlen);
> +       ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                     dst_ci->i_layout.object_size,
> +                                     &dst_objnum, &dst_objoff, &dst_objlen);
> +       /* object-level offsets need to the same */
> +       if (src_objoff != dst_objoff) {
> +               ret = -EOPNOTSUPP;
> +               goto out_caps;
> +       }
> +
> +       /*
> +        * Do a manual copy if the object offset isn't object aligned.
> +        * 'src_objlen' contains the bytes left until the end of the object,
> +        * starting at the src_off
> +        */
> +       if (src_objoff) {
> +               /*
> +                * we need to temporarily drop all caps as we'll be calling
> +                * {read,write}_iter, which will get caps again.
> +                */
> +               put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
> +               ret = do_splice_direct(src_file, &src_off, dst_file,
> +                                      &dst_off, src_objlen, flags);
> +               if (ret < 0) {
> +                       dout("do_splice_direct returned %d\n", err);
> +                       goto out;
> +               }
> +               len -= ret;
> +               err = get_rd_wr_caps(src_ci, (src_off + len),
> +                                    &src_got, dst_ci,
> +                                    (dst_off + len), &dst_got);
> +               if (err < 0)
> +                       goto out;
> +               err = is_file_size_ok(src_inode, dst_inode,
> +                                     src_off, dst_off, len);
> +               if (err < 0)
> +                       goto out_caps;
> +       }
> +       object_size = src_ci->i_layout.object_size;
> +       while (len >= object_size) {
> +               ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
> +                                             object_size, &src_objnum,
> +                                             &src_objoff, &src_objlen);
> +               ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
> +                                             object_size, &dst_objnum,
> +                                             &dst_objoff, &dst_objlen);
> +               ceph_oid_init(&src_oid);
> +               ceph_oid_printf(&src_oid, "%llx.%08llx",
> +                               src_ci->i_vino.ino, src_objnum);
> +               ceph_oid_init(&dst_oid);
> +               ceph_oid_printf(&dst_oid, "%llx.%08llx",
> +                               dst_ci->i_vino.ino, dst_objnum);
> +               /* Do an object remote copy */
> +               err = ceph_osdc_copy_from(
> +                       &ceph_inode_to_client(src_inode)->client->osdc,
> +                       src_ci->i_vino.snap, 0,
> +                       &src_oid, &src_oloc,
> +                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
> +                       CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
> +                       &dst_oid, &dst_oloc,
> +                       CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
> +                       CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
> +               if (err) {
> +                       dout("ceph_osdc_copy_from returned %d\n", err);
> +                       if (!ret)
> +                               ret = err;
> +                       goto out_caps;
> +               }
> +               len -= object_size;
> +               src_off += object_size;
> +               dst_off += object_size;
> +               ret += object_size;
> +       }
> +
> +       if (len)
> +               /* We still need one final local copy */
> +               do_final_copy = true;
> +
> +       file_update_time(dst_file);
> +       if (endoff > size) {
> +               int caps_flags = 0;
> +
> +               /* Let the MDS know about dst file size change */
> +               if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_NODELAY;
> +               if (ceph_inode_set_size(dst_inode, endoff))
> +                       caps_flags |= CHECK_CAPS_AUTHONLY;
> +               if (caps_flags)
> +                       ceph_check_caps(dst_ci, caps_flags, NULL);
> +       }
> +       /* Mark Fw dirty */
> +       spin_lock(&dst_ci->i_ceph_lock);
> +       dst_ci->i_inline_version = CEPH_INLINE_NONE;
> +       dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
> +       spin_unlock(&dst_ci->i_ceph_lock);
> +       if (dirty)
> +               __mark_inode_dirty(dst_inode, dirty);
> +
> +out_caps:
> +       put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
> +
> +       if (do_final_copy) {
> +               err = do_splice_direct(src_file, &src_off, dst_file,
> +                                      &dst_off, len, flags);
> +               if (err < 0) {
> +                       dout("do_splice_direct returned %d\n", err);
> +                       goto out;
> +               }
> +               len -= err;
> +               ret += err;
> +       }
> +
> +out:
> +       ceph_free_cap_flush(prealloc_cf);
> +
> +       return ret;
> +}
> +
>  const struct file_operations ceph_file_fops = {
>         .open = ceph_open,
>         .release = ceph_release,
> @@ -1844,5 +2136,5 @@ const struct file_operations ceph_file_fops = {
>         .unlocked_ioctl = ceph_ioctl,
>         .compat_ioctl   = ceph_ioctl,
>         .fallocate      = ceph_fallocate,
> +       .copy_file_range = ceph_copy_file_range,
>  };
> -

Looks good to me

Regards
Yan, Zheng
Luis Henriques Oct. 22, 2018, 2:38 p.m. UTC | #2
Luis Henriques <lhenriques@suse.com> writes:
<snip>
> +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> +				    struct file *dst_file, loff_t dst_off,
> +				    size_t len, unsigned int flags)
> +{
> +	struct inode *src_inode = file_inode(src_file);
> +	struct inode *dst_inode = file_inode(dst_file);
> +	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> +	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> +	struct ceph_cap_flush *prealloc_cf;
> +	struct ceph_object_locator src_oloc, dst_oloc;
> +	struct ceph_object_id src_oid, dst_oid;
> +	loff_t endoff = 0, size;
> +	ssize_t ret = -EIO;
> +	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
> +	u32 src_objlen, dst_objlen, object_size;
> +	int src_got = 0, dst_got = 0, err, dirty;
> +	bool do_final_copy = false;
> +
> +	if (src_inode == dst_inode)
> +		return -EINVAL;
> +	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> +		return -EROFS;
> +
> +	/*
> +	 * Some of the checks below will return -EOPNOTSUPP, which will force a
> +	 * fallback to the default VFS copy_file_range implementation.  This is
> +	 * desirable in several cases (for ex, the 'len' is smaller than the
> +	 * size of the objects, or in cases where that would be more
> +	 * efficient).
> +	 */
> +
> +	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
> +	    (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
> +	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
> +		return -EOPNOTSUPP;
> +
> +	if (len < src_ci->i_layout.object_size)
> +		return -EOPNOTSUPP; /* no remote copy will be done */
> +
> +	prealloc_cf = ceph_alloc_cap_flush();
> +	if (!prealloc_cf)
> +		return -ENOMEM;
> +
> +	/* Start by sync'ing the source file */
> +	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> +	if (ret < 0)
> +		goto out;
> +
> +	/*
> +	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
> +	 * clients may have dirty data in their caches.  And OSDs know nothing
> +	 * about caps, so they can't safely do the remote object copies.
> +	 */
> +	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
> +			     dst_ci, (dst_off + len), &dst_got);
> +	if (err < 0) {
> +		dout("get_rd_wr_caps returned %d\n", err);
> +		ret = -EOPNOTSUPP;
> +		goto out;
> +	}
> +
> +	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
> +	if (ret < 0)
> +		goto out_caps;
> +
> +	size = i_size_read(dst_inode);
> +	endoff = dst_off + len;
> +
> +	/* Drop dst file cached pages */
> +	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> +					    dst_off >> PAGE_SHIFT,
> +					    endoff >> PAGE_SHIFT);
> +	if (ret < 0) {
> +		dout("Failed to invalidate inode pages (%ld)\n", ret);
> +		ret = 0; /* XXX */
> +	}

I believe I found an issue with this code: if we try to copy into a file
that was just written, the remote copied data will be overwritten by our
buffered writes once they are flushed.  When this happens we can see the
above message in the kernel (with a -EBUSY error).

Here's the 3 options I see to fixes this:

1. Add another call to file_write_and_wait_range(), with the dst_file.
   I believe this souldn't introduce a huge overhead for the cases where
   the file isn't buffered.
2. Add the call to file_write_and_wait_range() only if
   invalidate_inode_pages2_range fails.  We'll need to drop/get caps for
   this.
3. All of the above.  I'm not sure we need to do both, but I guess we'll
   need to at least fail the operation and return the error if
   invalidate_inode_pages2_range fails.

Please let me know what do you think.  And if you want me to send a
patch to fix this or if you rather have me sending the whole thing
again.

Cheers,
Yan, Zheng Oct. 23, 2018, 12:12 a.m. UTC | #3
On Mon, Oct 22, 2018 at 11:25 PM Luis Henriques <lhenriques@suse.com> wrote:
>
> Luis Henriques <lhenriques@suse.com> writes:
> <snip>
> > +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
> > +                                 struct file *dst_file, loff_t dst_off,
> > +                                 size_t len, unsigned int flags)
> > +{
> > +     struct inode *src_inode = file_inode(src_file);
> > +     struct inode *dst_inode = file_inode(dst_file);
> > +     struct ceph_inode_info *src_ci = ceph_inode(src_inode);
> > +     struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
> > +     struct ceph_cap_flush *prealloc_cf;
> > +     struct ceph_object_locator src_oloc, dst_oloc;
> > +     struct ceph_object_id src_oid, dst_oid;
> > +     loff_t endoff = 0, size;
> > +     ssize_t ret = -EIO;
> > +     u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
> > +     u32 src_objlen, dst_objlen, object_size;
> > +     int src_got = 0, dst_got = 0, err, dirty;
> > +     bool do_final_copy = false;
> > +
> > +     if (src_inode == dst_inode)
> > +             return -EINVAL;
> > +     if (ceph_snap(dst_inode) != CEPH_NOSNAP)
> > +             return -EROFS;
> > +
> > +     /*
> > +      * Some of the checks below will return -EOPNOTSUPP, which will force a
> > +      * fallback to the default VFS copy_file_range implementation.  This is
> > +      * desirable in several cases (for ex, the 'len' is smaller than the
> > +      * size of the objects, or in cases where that would be more
> > +      * efficient).
> > +      */
> > +
> > +     if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
> > +         (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
> > +         (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
> > +             return -EOPNOTSUPP;
> > +
> > +     if (len < src_ci->i_layout.object_size)
> > +             return -EOPNOTSUPP; /* no remote copy will be done */
> > +
> > +     prealloc_cf = ceph_alloc_cap_flush();
> > +     if (!prealloc_cf)
> > +             return -ENOMEM;
> > +
> > +     /* Start by sync'ing the source file */
> > +     ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
> > +     if (ret < 0)
> > +             goto out;
> > +
> > +     /*
> > +      * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
> > +      * clients may have dirty data in their caches.  And OSDs know nothing
> > +      * about caps, so they can't safely do the remote object copies.
> > +      */
> > +     err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
> > +                          dst_ci, (dst_off + len), &dst_got);
> > +     if (err < 0) {
> > +             dout("get_rd_wr_caps returned %d\n", err);
> > +             ret = -EOPNOTSUPP;
> > +             goto out;
> > +     }
> > +
> > +     ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
> > +     if (ret < 0)
> > +             goto out_caps;
> > +
> > +     size = i_size_read(dst_inode);
> > +     endoff = dst_off + len;
> > +
> > +     /* Drop dst file cached pages */
> > +     ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
> > +                                         dst_off >> PAGE_SHIFT,
> > +                                         endoff >> PAGE_SHIFT);
> > +     if (ret < 0) {
> > +             dout("Failed to invalidate inode pages (%ld)\n", ret);
> > +             ret = 0; /* XXX */
> > +     }
>
> I believe I found an issue with this code: if we try to copy into a file
> that was just written, the remote copied data will be overwritten by our
> buffered writes once they are flushed.  When this happens we can see the
> above message in the kernel (with a -EBUSY error).
>
> Here's the 3 options I see to fixes this:
>
> 1. Add another call to file_write_and_wait_range(), with the dst_file.
>    I believe this souldn't introduce a huge overhead for the cases where
>    the file isn't buffered.
> 2. Add the call to file_write_and_wait_range() only if
>    invalidate_inode_pages2_range fails.  We'll need to drop/get caps for
>    this.
> 3. All of the above.  I'm not sure we need to do both, but I guess we'll
>    need to at least fail the operation and return the error if
>    invalidate_inode_pages2_range fails.
>
> Please let me know what do you think.  And if you want me to send a
> patch to fix this or if you rather have me sending the whole thing
> again.
>

the direct write code calls both filemap_write_and_wait_range and
invalidate_inode_pages2_range. I think we should do the same here
(option 1).

Regards
Yan, Zheng


> Cheers,
> --
> Luis
Luis Henriques Oct. 23, 2018, 8:33 a.m. UTC | #4
"Yan, Zheng" <ukernel@gmail.com> writes:

> On Mon, Oct 22, 2018 at 11:25 PM Luis Henriques <lhenriques@suse.com> wrote:
>>
>> Luis Henriques <lhenriques@suse.com> writes:
>> <snip>
>> > +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
>> > +                                 struct file *dst_file, loff_t dst_off,
>> > +                                 size_t len, unsigned int flags)
>> > +{
>> > +     struct inode *src_inode = file_inode(src_file);
>> > +     struct inode *dst_inode = file_inode(dst_file);
>> > +     struct ceph_inode_info *src_ci = ceph_inode(src_inode);
>> > +     struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
>> > +     struct ceph_cap_flush *prealloc_cf;
>> > +     struct ceph_object_locator src_oloc, dst_oloc;
>> > +     struct ceph_object_id src_oid, dst_oid;
>> > +     loff_t endoff = 0, size;
>> > +     ssize_t ret = -EIO;
>> > +     u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
>> > +     u32 src_objlen, dst_objlen, object_size;
>> > +     int src_got = 0, dst_got = 0, err, dirty;
>> > +     bool do_final_copy = false;
>> > +
>> > +     if (src_inode == dst_inode)
>> > +             return -EINVAL;
>> > +     if (ceph_snap(dst_inode) != CEPH_NOSNAP)
>> > +             return -EROFS;
>> > +
>> > +     /*
>> > +      * Some of the checks below will return -EOPNOTSUPP, which will force a
>> > +      * fallback to the default VFS copy_file_range implementation.  This is
>> > +      * desirable in several cases (for ex, the 'len' is smaller than the
>> > +      * size of the objects, or in cases where that would be more
>> > +      * efficient).
>> > +      */
>> > +
>> > +     if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
>> > +         (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
>> > +         (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
>> > +             return -EOPNOTSUPP;
>> > +
>> > +     if (len < src_ci->i_layout.object_size)
>> > +             return -EOPNOTSUPP; /* no remote copy will be done */
>> > +
>> > +     prealloc_cf = ceph_alloc_cap_flush();
>> > +     if (!prealloc_cf)
>> > +             return -ENOMEM;
>> > +
>> > +     /* Start by sync'ing the source file */
>> > +     ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
>> > +     if (ret < 0)
>> > +             goto out;
>> > +
>> > +     /*
>> > +      * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
>> > +      * clients may have dirty data in their caches.  And OSDs know nothing
>> > +      * about caps, so they can't safely do the remote object copies.
>> > +      */
>> > +     err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
>> > +                          dst_ci, (dst_off + len), &dst_got);
>> > +     if (err < 0) {
>> > +             dout("get_rd_wr_caps returned %d\n", err);
>> > +             ret = -EOPNOTSUPP;
>> > +             goto out;
>> > +     }
>> > +
>> > +     ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
>> > +     if (ret < 0)
>> > +             goto out_caps;
>> > +
>> > +     size = i_size_read(dst_inode);
>> > +     endoff = dst_off + len;
>> > +
>> > +     /* Drop dst file cached pages */
>> > +     ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
>> > +                                         dst_off >> PAGE_SHIFT,
>> > +                                         endoff >> PAGE_SHIFT);
>> > +     if (ret < 0) {
>> > +             dout("Failed to invalidate inode pages (%ld)\n", ret);
>> > +             ret = 0; /* XXX */
>> > +     }
>>
>> I believe I found an issue with this code: if we try to copy into a file
>> that was just written, the remote copied data will be overwritten by our
>> buffered writes once they are flushed.  When this happens we can see the
>> above message in the kernel (with a -EBUSY error).
>>
>> Here's the 3 options I see to fixes this:
>>
>> 1. Add another call to file_write_and_wait_range(), with the dst_file.
>>    I believe this souldn't introduce a huge overhead for the cases where
>>    the file isn't buffered.
>> 2. Add the call to file_write_and_wait_range() only if
>>    invalidate_inode_pages2_range fails.  We'll need to drop/get caps for
>>    this.
>> 3. All of the above.  I'm not sure we need to do both, but I guess we'll
>>    need to at least fail the operation and return the error if
>>    invalidate_inode_pages2_range fails.
>>
>> Please let me know what do you think.  And if you want me to send a
>> patch to fix this or if you rather have me sending the whole thing
>> again.
>>
>
> the direct write code calls both filemap_write_and_wait_range and
> invalidate_inode_pages2_range. I think we should do the same here
> (option 1).

Ok, great.  I'll send out a fix for that in a bit.

Cheers,
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 92ab20433682..0942f45995a1 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1,5 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -1829,6 +1830,297 @@  static long ceph_fallocate(struct file *file, int mode,
 	return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+			  loff_t src_endoff, int *src_got,
+			  struct ceph_inode_info *dst_ci,
+			  loff_t dst_endoff, int *dst_got)
+{
+	int ret = 0;
+	bool retrying = false;
+
+retry_caps:
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    dst_endoff, dst_got, NULL);
+	if (ret < 0)
+		return ret;
+
+	/*
+	 * Since we're already holding the FILE_WR capability for the dst file,
+	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+	 * retry dance instead to try to get both capabilities.
+	 */
+	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, src_got);
+	if (ret <= 0) {
+		/* Start by dropping dst_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(dst_ci, *dst_got);
+		if (retrying) {
+			if (!ret)
+				/* ceph_try_get_caps masks EAGAIN */
+				ret = -EAGAIN;
+			return ret;
+		}
+		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED, src_endoff,
+				    src_got, NULL);
+		if (ret < 0)
+			return ret;
+		/*... drop src_ci caps too, and retry */
+		ceph_put_cap_refs(src_ci, *src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+	return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+			   struct ceph_inode_info *dst_ci, int dst_got)
+{
+	ceph_put_cap_refs(src_ci, src_got);
+	ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+			   loff_t src_off, loff_t dst_off, size_t len)
+{
+	loff_t size, endoff;
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting length
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		dout("Copy beyond EOF (%llu + %ld > %llu)\n",
+		     src_off, len, size);
+		return -EOPNOTSUPP;
+	}
+	size = i_size_read(dst_inode);
+
+	endoff = dst_off + len;
+	if (inode_newsize_ok(dst_inode, endoff))
+		return -EOPNOTSUPP;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+		return -EDQUOT;
+
+	return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	struct ceph_object_id src_oid, dst_oid;
+	loff_t endoff = 0, size;
+	ssize_t ret = -EIO;
+	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+	u32 src_objlen, dst_objlen, object_size;
+	int src_got = 0, dst_got = 0, err, dirty;
+	bool do_final_copy = false;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	/*
+	 * Some of the checks below will return -EOPNOTSUPP, which will force a
+	 * fallback to the default VFS copy_file_range implementation.  This is
+	 * desirable in several cases (for ex, the 'len' is smaller than the
+	 * size of the objects, or in cases where that would be more
+	 * efficient).
+	 */
+
+	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+	    (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+		return -EOPNOTSUPP;
+
+	if (len < src_ci->i_layout.object_size)
+		return -EOPNOTSUPP; /* no remote copy will be done */
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		goto out;
+
+	/*
+	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 */
+	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+			     dst_ci, (dst_off + len), &dst_got);
+	if (err < 0) {
+		dout("get_rd_wr_caps returned %d\n", err);
+		ret = -EOPNOTSUPP;
+		goto out;
+	}
+
+	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+	if (ret < 0)
+		goto out_caps;
+
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		dout("Failed to invalidate inode pages (%ld)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+	ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+				      src_ci->i_layout.object_size,
+				      &src_objnum, &src_objoff, &src_objlen);
+	ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+				      dst_ci->i_layout.object_size,
+				      &dst_objnum, &dst_objoff, &dst_objlen);
+	/* object-level offsets need to the same */
+	if (src_objoff != dst_objoff) {
+		ret = -EOPNOTSUPP;
+		goto out_caps;
+	}
+
+	/*
+	 * Do a manual copy if the object offset isn't object aligned.
+	 * 'src_objlen' contains the bytes left until the end of the object,
+	 * starting at the src_off
+	 */
+	if (src_objoff) {
+		/*
+		 * we need to temporarily drop all caps as we'll be calling
+		 * {read,write}_iter, which will get caps again.
+		 */
+		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+		ret = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, src_objlen, flags);
+		if (ret < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= ret;
+		err = get_rd_wr_caps(src_ci, (src_off + len),
+				     &src_got, dst_ci,
+				     (dst_off + len), &dst_got);
+		if (err < 0)
+			goto out;
+		err = is_file_size_ok(src_inode, dst_inode,
+				      src_off, dst_off, len);
+		if (err < 0)
+			goto out_caps;
+	}
+	object_size = src_ci->i_layout.object_size;
+	while (len >= object_size) {
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      object_size, &src_objnum,
+					      &src_objoff, &src_objlen);
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      object_size, &dst_objnum,
+					      &dst_objoff, &dst_objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, src_objnum);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, dst_objnum);
+		/* Do an object remote copy */
+		err = ceph_osdc_copy_from(
+			&ceph_inode_to_client(src_inode)->client->osdc,
+			src_ci->i_vino.snap, 0,
+			&src_oid, &src_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+			&dst_oid, &dst_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+		if (err) {
+			dout("ceph_osdc_copy_from returned %d\n", err);
+			if (!ret)
+				ret = err;
+			goto out_caps;
+		}
+		len -= object_size;
+		src_off += object_size;
+		dst_off += object_size;
+		ret += object_size;
+	}
+
+	if (len)
+		/* We still need one final local copy */
+		do_final_copy = true;
+
+	file_update_time(dst_file);
+	if (endoff > size) {
+		int caps_flags = 0;
+
+		/* Let the MDS know about dst file size change */
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+	}
+	/* Mark Fw dirty */
+	spin_lock(&dst_ci->i_ceph_lock);
+	dst_ci->i_inline_version = CEPH_INLINE_NONE;
+	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+	spin_unlock(&dst_ci->i_ceph_lock);
+	if (dirty)
+		__mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+	put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+	if (do_final_copy) {
+		err = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, len, flags);
+		if (err < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= err;
+		ret += err;
+	}
+
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1844,5 +2136,5 @@  const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };
-