[v2,3/5] ceph: fix potential races in ceph_uninline_data
diff mbox series

Message ID 20190711184136.19779-4-jlayton@kernel.org
State New
Headers show
Series
  • ceph: fix races when uninlining data
Related show

Commit Message

Jeff Layton July 11, 2019, 6:41 p.m. UTC
The current code will do the uninlining but it relies on the callers to
set the i_inline_version appropriately afterward. Multiple tasks can end
up attempting to uninline the data, and overwrite changes that follow
the first uninlining.

Protect against competing uninlining attempts by having the callers take
the i_truncate_mutex and then have ceph_uninline_data update the version
itself before dropping it. This means that we also need to have
ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if
any of them are newly dirty.

We can't mark the caps dirty though unless we actually have them, so
move the uninlining after the point where Fw caps are acquired in all of
the callers.

Finally, since we are doing a lockless check first in all cases, just
move that into ceph_uninline_data as well, and have the callers call it
unconditionally.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++----------------
 fs/ceph/file.c |  39 +++++++---------
 2 files changed, 97 insertions(+), 61 deletions(-)

Comments

Yan, Zheng July 12, 2019, 2:44 a.m. UTC | #1
On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@kernel.org> wrote:
>
> The current code will do the uninlining but it relies on the callers to
> set the i_inline_version appropriately afterward. Multiple tasks can end
> up attempting to uninline the data, and overwrite changes that follow
> the first uninlining.
>
> Protect against competing uninlining attempts by having the callers take
> the i_truncate_mutex and then have ceph_uninline_data update the version
> itself before dropping it. This means that we also need to have
> ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if
> any of them are newly dirty.
>
> We can't mark the caps dirty though unless we actually have them, so
> move the uninlining after the point where Fw caps are acquired in all of
> the callers.
>
> Finally, since we are doing a lockless check first in all cases, just
> move that into ceph_uninline_data as well, and have the callers call it
> unconditionally.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++----------------
>  fs/ceph/file.c |  39 +++++++---------
>  2 files changed, 97 insertions(+), 61 deletions(-)
>
> diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> index 038678963cf9..4606da82da6f 100644
> --- a/fs/ceph/addr.c
> +++ b/fs/ceph/addr.c
> @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
>         loff_t off = page_offset(page);
>         loff_t size = i_size_read(inode);
>         size_t len;
> -       int want, got, err;
> +       int want, got, err, dirty;
>         sigset_t oldset;
>         vm_fault_t ret = VM_FAULT_SIGBUS;
>
> @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
>
>         ceph_block_sigs(&oldset);
>
> -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> -               err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> -               if (err < 0)
> -                       goto out_free;
> -       }
> -
>         if (off + PAGE_SIZE <= size)
>                 len = PAGE_SIZE;
>         else
> @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
>         if (err < 0)
>                 goto out_free;
>
> +       err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> +       if (err < 0)
> +               goto out_put_caps;
> +       dirty = err;
> +
>         dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
>              inode, off, len, ceph_cap_string(got));
>
> @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
>
>         if (ret == VM_FAULT_LOCKED ||
>             ci->i_inline_version != CEPH_INLINE_NONE) {
> -               int dirty;
>                 spin_lock(&ci->i_ceph_lock);
> -               ci->i_inline_version = CEPH_INLINE_NONE;
> -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> -                                              &prealloc_cf);
> +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> +                                               &prealloc_cf);
>                 spin_unlock(&ci->i_ceph_lock);
>                 if (dirty)
>                         __mark_inode_dirty(inode, dirty);
> @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
>
>         dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
>              inode, off, len, ceph_cap_string(got), ret);
> +out_put_caps:
>         ceph_put_cap_refs(ci, got);
>  out_free:
>         ceph_restore_sigs(&oldset);
> @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
>         }
>  }
>
> +/**
> + * ceph_uninline_data - convert an inlined file to uninlined
> + * @inode: inode to be uninlined
> + * @page: optional pointer to first page in file
> + *
> + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex
> + * to serialize callers and prevent races. Returns either a negative error code
> + * or a positive set of I_DIRTY_* flags that the caller should apply when
> + * dirtying the inode.
> + */
>  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>  {
>         struct ceph_inode_info *ci = ceph_inode(inode);
>         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>         struct ceph_osd_request *req;
> +       struct ceph_cap_flush *prealloc_cf = NULL;
>         struct page *page = NULL;
>         u64 len, inline_version;
> -       int err = 0;
> +       int ret = 0;
>         bool from_pagecache = false;
>         bool allocated_page = false;
>
> +       /* Do a lockless check first -- paired with i_ceph_lock for changes */
> +       inline_version = READ_ONCE(ci->i_inline_version);
> +       if (likely(inline_version == CEPH_INLINE_NONE))
> +               return 0;
> +
> +       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> +            inode, ceph_vinop(inode), inline_version);
> +
> +       mutex_lock(&ci->i_truncate_mutex);
> +
> +       /* Double check the version after taking mutex */
>         spin_lock(&ci->i_ceph_lock);
>         inline_version = ci->i_inline_version;
>         spin_unlock(&ci->i_ceph_lock);
>
> -       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> -            inode, ceph_vinop(inode), inline_version);
> +       /* If someone beat us to the uninlining then just return. */
> +       if (inline_version == CEPH_INLINE_NONE)
> +               goto out;
>
> -       if (inline_version == 1 || /* initial version, no data */
I'd like to avoid this optimization. check i_size instead.

> -           inline_version == CEPH_INLINE_NONE)
> +       prealloc_cf = ceph_alloc_cap_flush();
> +       if (!prealloc_cf) {
> +               ret = -ENOMEM;
>                 goto out;
> +       }
> +
> +       /*
> +        * Handle the initial version (1) as a a special case: switch the
> +        * version to CEPH_INLINE_NONE, but we don't need to do any uninlining
> +        * in that case since there is no data yet.
> +        */
> +       if (inline_version == 1)
> +               goto out_set_vers;

ditto

>
>         if (provided_page) {
>                 page = provided_page;
> @@ -1703,20 +1734,20 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>         } else {
>                 page = __page_cache_alloc(GFP_NOFS);
>                 if (!page) {
> -                       err = -ENOMEM;
> +                       ret = -ENOMEM;
>                         goto out;
>                 }
>                 allocated_page = true;
>                 lock_page(page);
> -               err = __ceph_do_getattr(inode, page,
> +               ret = __ceph_do_getattr(inode, page,
>                                         CEPH_STAT_CAP_INLINE_DATA, true);
> -               if (err < 0) {
> +               if (ret < 0) {
>                         /* no inline data */
> -                       if (err == -ENODATA)
> -                               err = 0;
> +                       if (ret == -ENODATA)
> +                               ret = 0;
>                         goto out;
>                 }
> -               len = err;
> +               len = ret;
>         }
>
>         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> @@ -1724,16 +1755,16 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>                                     CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
>                                     NULL, 0, 0, false);
>         if (IS_ERR(req)) {
> -               err = PTR_ERR(req);
> +               ret = PTR_ERR(req);
>                 goto out;
>         }
>
>         req->r_mtime = inode->i_mtime;
> -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -       if (!err)
> -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +       if (!ret)
> +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>         ceph_osdc_put_request(req);
> -       if (err < 0)
> +       if (ret < 0)
>                 goto out;
>
>         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> @@ -1742,7 +1773,7 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>                                     NULL, ci->i_truncate_seq,
>                                     ci->i_truncate_size, false);
>         if (IS_ERR(req)) {
> -               err = PTR_ERR(req);
> +               ret = PTR_ERR(req);
>                 goto out;
>         }
>
> @@ -1750,12 +1781,12 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>
>         {
>                 __le64 xattr_buf = cpu_to_le64(inline_version);
> -               err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
> +               ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
>                                             "inline_version", &xattr_buf,
>                                             sizeof(xattr_buf),
>                                             CEPH_OSD_CMPXATTR_OP_GT,
>                                             CEPH_OSD_CMPXATTR_MODE_U64);
> -               if (err)
> +               if (ret)
>                         goto out_put;
>         }
>
> @@ -1763,22 +1794,31 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>                 char xattr_buf[32];
>                 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
>                                          "%llu", inline_version);
> -               err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
> +               ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
>                                             "inline_version",
>                                             xattr_buf, xattr_len, 0, 0);
> -               if (err)
> +               if (ret)
>                         goto out_put;
>         }
>
>         req->r_mtime = inode->i_mtime;
> -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -       if (!err)
> -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +       if (!ret)
> +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  out_put:
>         ceph_osdc_put_request(req);
> -       if (err == -ECANCELED)
> -               err = 0;
> +       if (ret == -ECANCELED)
> +               ret = 0;
> +out_set_vers:
> +       if (!ret) {
> +               spin_lock(&ci->i_ceph_lock);
> +               ci->i_inline_version = CEPH_INLINE_NONE;
> +               ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> +                                            &prealloc_cf);
> +               spin_unlock(&ci->i_ceph_lock);
> +       }
>  out:
> +       mutex_unlock(&ci->i_truncate_mutex);
>         if (page) {
>                 unlock_page(page);
>                 if (from_pagecache)
> @@ -1786,10 +1826,11 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
>                 else if (allocated_page)
>                         __free_pages(page, 0);
>         }
> +       ceph_free_cap_flush(prealloc_cf);
>
>         dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
> -            inode, ceph_vinop(inode), inline_version, err);
> -       return err;
> +            inode, ceph_vinop(inode), inline_version, ret);
> +       return ret;
>  }
>
>  static const struct vm_operations_struct ceph_vmops = {
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 7bb090fa99d3..252aac44b8ce 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -1387,7 +1387,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
>         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>         struct ceph_cap_flush *prealloc_cf;
>         ssize_t count, written = 0;
> -       int err, want, got;
> +       int err, want, got, dirty;
>         loff_t pos;
>         loff_t limit = max(i_size_read(inode), fsc->max_file_size);
>
> @@ -1438,12 +1438,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
>
>         inode_inc_iversion_raw(inode);
>
> -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> -               err = ceph_uninline_data(inode, NULL);
> -               if (err < 0)
> -                       goto out;
> -       }
> -
>         /* FIXME: not complete since it doesn't account for being at quota */
>         if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) {
>                 err = -ENOSPC;
> @@ -1462,6 +1456,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
>         if (err < 0)
>                 goto out;
>
> +       err = ceph_uninline_data(inode, NULL);
> +       if (err < 0)
> +               goto out_put_caps;
> +       dirty = err;
> +
>         dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
>              inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>
> @@ -1510,12 +1509,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
>         }
>
>         if (written >= 0) {
> -               int dirty;
> -
>                 spin_lock(&ci->i_ceph_lock);
> -               ci->i_inline_version = CEPH_INLINE_NONE;
> -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> -                                              &prealloc_cf);
> +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> +                                               &prealloc_cf);
>                 spin_unlock(&ci->i_ceph_lock);
>                 if (dirty)
>                         __mark_inode_dirty(inode, dirty);
> @@ -1526,6 +1522,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
>         dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
>              inode, ceph_vinop(inode), pos, (unsigned)count,
>              ceph_cap_string(got));
> +out_put_caps:
>         ceph_put_cap_refs(ci, got);
>
>         if (written == -EOLDSNAPC) {
> @@ -1762,12 +1759,6 @@ static long ceph_fallocate(struct file *file, int mode,
>                 goto unlock;
>         }
>
> -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> -               ret = ceph_uninline_data(inode, NULL);
> -               if (ret < 0)
> -                       goto unlock;
> -       }
> -
>         size = i_size_read(inode);
>
>         /* Are we punching a hole beyond EOF? */
> @@ -1785,19 +1776,23 @@ static long ceph_fallocate(struct file *file, int mode,
>         if (ret < 0)
>                 goto unlock;
>
> +       ret = ceph_uninline_data(inode, NULL);
> +       if (ret < 0)
> +               goto out_put_caps;
> +       dirty = ret;
> +
>         ceph_zero_pagecache_range(inode, offset, length);
>         ret = ceph_zero_objects(inode, offset, length);
>
>         if (!ret) {
>                 spin_lock(&ci->i_ceph_lock);
> -               ci->i_inline_version = CEPH_INLINE_NONE;
> -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> -                                              &prealloc_cf);
> +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> +                                               &prealloc_cf);
>                 spin_unlock(&ci->i_ceph_lock);
>                 if (dirty)
>                         __mark_inode_dirty(inode, dirty);
>         }
> -
> +out_put_caps:
>         ceph_put_cap_refs(ci, got);
>  unlock:
>         inode_unlock(inode);
> --
> 2.21.0
>
Jeff Layton July 12, 2019, 1:30 p.m. UTC | #2
On Fri, 2019-07-12 at 10:44 +0800, Yan, Zheng wrote:
> On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@kernel.org> wrote:
> > The current code will do the uninlining but it relies on the callers to
> > set the i_inline_version appropriately afterward. Multiple tasks can end
> > up attempting to uninline the data, and overwrite changes that follow
> > the first uninlining.
> > 
> > Protect against competing uninlining attempts by having the callers take
> > the i_truncate_mutex and then have ceph_uninline_data update the version
> > itself before dropping it. This means that we also need to have
> > ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if
> > any of them are newly dirty.
> > 
> > We can't mark the caps dirty though unless we actually have them, so
> > move the uninlining after the point where Fw caps are acquired in all of
> > the callers.
> > 
> > Finally, since we are doing a lockless check first in all cases, just
> > move that into ceph_uninline_data as well, and have the callers call it
> > unconditionally.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >  fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++----------------
> >  fs/ceph/file.c |  39 +++++++---------
> >  2 files changed, 97 insertions(+), 61 deletions(-)
> > 
> > diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> > index 038678963cf9..4606da82da6f 100644
> > --- a/fs/ceph/addr.c
> > +++ b/fs/ceph/addr.c
> > @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> >         loff_t off = page_offset(page);
> >         loff_t size = i_size_read(inode);
> >         size_t len;
> > -       int want, got, err;
> > +       int want, got, err, dirty;
> >         sigset_t oldset;
> >         vm_fault_t ret = VM_FAULT_SIGBUS;
> > 
> > @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         ceph_block_sigs(&oldset);
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > -               if (err < 0)
> > -                       goto out_free;
> > -       }
> > -
> >         if (off + PAGE_SIZE <= size)
> >                 len = PAGE_SIZE;
> >         else
> > @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> >         if (err < 0)
> >                 goto out_free;
> > 
> > +       err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > +       if (err < 0)
> > +               goto out_put_caps;
> > +       dirty = err;
> > +
> >         dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
> >              inode, off, len, ceph_cap_string(got));
> > 
> > @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         if (ret == VM_FAULT_LOCKED ||
> >             ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               int dirty;
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> > @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
> >              inode, off, len, ceph_cap_string(got), ret);
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> >  out_free:
> >         ceph_restore_sigs(&oldset);
> > @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
> >         }
> >  }
> > 
> > +/**
> > + * ceph_uninline_data - convert an inlined file to uninlined
> > + * @inode: inode to be uninlined
> > + * @page: optional pointer to first page in file
> > + *
> > + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex
> > + * to serialize callers and prevent races. Returns either a negative error code
> > + * or a positive set of I_DIRTY_* flags that the caller should apply when
> > + * dirtying the inode.
> > + */
> >  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >  {
> >         struct ceph_inode_info *ci = ceph_inode(inode);
> >         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> >         struct ceph_osd_request *req;
> > +       struct ceph_cap_flush *prealloc_cf = NULL;
> >         struct page *page = NULL;
> >         u64 len, inline_version;
> > -       int err = 0;
> > +       int ret = 0;
> >         bool from_pagecache = false;
> >         bool allocated_page = false;
> > 
> > +       /* Do a lockless check first -- paired with i_ceph_lock for changes */
> > +       inline_version = READ_ONCE(ci->i_inline_version);
> > +       if (likely(inline_version == CEPH_INLINE_NONE))
> > +               return 0;
> > +
> > +       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > +            inode, ceph_vinop(inode), inline_version);
> > +
> > +       mutex_lock(&ci->i_truncate_mutex);
> > +
> > +       /* Double check the version after taking mutex */
> >         spin_lock(&ci->i_ceph_lock);
> >         inline_version = ci->i_inline_version;
> >         spin_unlock(&ci->i_ceph_lock);
> > 
> > -       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > -            inode, ceph_vinop(inode), inline_version);
> > +       /* If someone beat us to the uninlining then just return. */
> > +       if (inline_version == CEPH_INLINE_NONE)
> > +               goto out;
> > 
> > -       if (inline_version == 1 || /* initial version, no data */
> I'd like to avoid this optimization. check i_size instead.
> 

This one is being removed already...

> > -           inline_version == CEPH_INLINE_NONE)
> > +       prealloc_cf = ceph_alloc_cap_flush();
> > +       if (!prealloc_cf) {
> > +               ret = -ENOMEM;
> >                 goto out;
> > +       }
> > +
> > +       /*
> > +        * Handle the initial version (1) as a a special case: switch the
> > +        * version to CEPH_INLINE_NONE, but we don't need to do any uninlining
> > +        * in that case since there is no data yet.
> > +        */
> > +       if (inline_version == 1)
> > +               goto out_set_vers;
> 
> ditto
> 

Fixed this up in my tree, and removed the places that set the
i_inline_version to CEPH_INLINE_NONE in aio completion and cfr
codepaths. I went ahead and pushed the result to the ceph-client/testing 
branch.

Thanks to you and Luis for the review so far!

> >         if (provided_page) {
> >                 page = provided_page;
> > @@ -1703,20 +1734,20 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >         } else {
> >                 page = __page_cache_alloc(GFP_NOFS);
> >                 if (!page) {
> > -                       err = -ENOMEM;
> > +                       ret = -ENOMEM;
> >                         goto out;
> >                 }
> >                 allocated_page = true;
> >                 lock_page(page);
> > -               err = __ceph_do_getattr(inode, page,
> > +               ret = __ceph_do_getattr(inode, page,
> >                                         CEPH_STAT_CAP_INLINE_DATA, true);
> > -               if (err < 0) {
> > +               if (ret < 0) {
> >                         /* no inline data */
> > -                       if (err == -ENODATA)
> > -                               err = 0;
> > +                       if (ret == -ENODATA)
> > +                               ret = 0;
> >                         goto out;
> >                 }
> > -               len = err;
> > +               len = ret;
> >         }
> > 
> >         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> > @@ -1724,16 +1755,16 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                                     CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
> >                                     NULL, 0, 0, false);
> >         if (IS_ERR(req)) {
> > -               err = PTR_ERR(req);
> > +               ret = PTR_ERR(req);
> >                 goto out;
> >         }
> > 
> >         req->r_mtime = inode->i_mtime;
> > -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > -       if (!err)
> > -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> > +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > +       if (!ret)
> > +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> >         ceph_osdc_put_request(req);
> > -       if (err < 0)
> > +       if (ret < 0)
> >                 goto out;
> > 
> >         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> > @@ -1742,7 +1773,7 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                                     NULL, ci->i_truncate_seq,
> >                                     ci->i_truncate_size, false);
> >         if (IS_ERR(req)) {
> > -               err = PTR_ERR(req);
> > +               ret = PTR_ERR(req);
> >                 goto out;
> >         }
> > 
> > @@ -1750,12 +1781,12 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> > 
> >         {
> >                 __le64 xattr_buf = cpu_to_le64(inline_version);
> > -               err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
> > +               ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
> >                                             "inline_version", &xattr_buf,
> >                                             sizeof(xattr_buf),
> >                                             CEPH_OSD_CMPXATTR_OP_GT,
> >                                             CEPH_OSD_CMPXATTR_MODE_U64);
> > -               if (err)
> > +               if (ret)
> >                         goto out_put;
> >         }
> > 
> > @@ -1763,22 +1794,31 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                 char xattr_buf[32];
> >                 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
> >                                          "%llu", inline_version);
> > -               err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
> > +               ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
> >                                             "inline_version",
> >                                             xattr_buf, xattr_len, 0, 0);
> > -               if (err)
> > +               if (ret)
> >                         goto out_put;
> >         }
> > 
> >         req->r_mtime = inode->i_mtime;
> > -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > -       if (!err)
> > -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> > +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > +       if (!ret)
> > +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> >  out_put:
> >         ceph_osdc_put_request(req);
> > -       if (err == -ECANCELED)
> > -               err = 0;
> > +       if (ret == -ECANCELED)
> > +               ret = 0;
> > +out_set_vers:
> > +       if (!ret) {
> > +               spin_lock(&ci->i_ceph_lock);
> > +               ci->i_inline_version = CEPH_INLINE_NONE;
> > +               ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                            &prealloc_cf);
> > +               spin_unlock(&ci->i_ceph_lock);
> > +       }
> >  out:
> > +       mutex_unlock(&ci->i_truncate_mutex);
> >         if (page) {
> >                 unlock_page(page);
> >                 if (from_pagecache)
> > @@ -1786,10 +1826,11 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                 else if (allocated_page)
> >                         __free_pages(page, 0);
> >         }
> > +       ceph_free_cap_flush(prealloc_cf);
> > 
> >         dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
> > -            inode, ceph_vinop(inode), inline_version, err);
> > -       return err;
> > +            inode, ceph_vinop(inode), inline_version, ret);
> > +       return ret;
> >  }
> > 
> >  static const struct vm_operations_struct ceph_vmops = {
> > diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> > index 7bb090fa99d3..252aac44b8ce 100644
> > --- a/fs/ceph/file.c
> > +++ b/fs/ceph/file.c
> > @@ -1387,7 +1387,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> >         struct ceph_cap_flush *prealloc_cf;
> >         ssize_t count, written = 0;
> > -       int err, want, got;
> > +       int err, want, got, dirty;
> >         loff_t pos;
> >         loff_t limit = max(i_size_read(inode), fsc->max_file_size);
> > 
> > @@ -1438,12 +1438,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> > 
> >         inode_inc_iversion_raw(inode);
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               err = ceph_uninline_data(inode, NULL);
> > -               if (err < 0)
> > -                       goto out;
> > -       }
> > -
> >         /* FIXME: not complete since it doesn't account for being at quota */
> >         if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) {
> >                 err = -ENOSPC;
> > @@ -1462,6 +1456,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         if (err < 0)
> >                 goto out;
> > 
> > +       err = ceph_uninline_data(inode, NULL);
> > +       if (err < 0)
> > +               goto out_put_caps;
> > +       dirty = err;
> > +
> >         dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
> >              inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
> > 
> > @@ -1510,12 +1509,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         }
> > 
> >         if (written >= 0) {
> > -               int dirty;
> > -
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> > @@ -1526,6 +1522,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
> >              inode, ceph_vinop(inode), pos, (unsigned)count,
> >              ceph_cap_string(got));
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> > 
> >         if (written == -EOLDSNAPC) {
> > @@ -1762,12 +1759,6 @@ static long ceph_fallocate(struct file *file, int mode,
> >                 goto unlock;
> >         }
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               ret = ceph_uninline_data(inode, NULL);
> > -               if (ret < 0)
> > -                       goto unlock;
> > -       }
> > -
> >         size = i_size_read(inode);
> > 
> >         /* Are we punching a hole beyond EOF? */
> > @@ -1785,19 +1776,23 @@ static long ceph_fallocate(struct file *file, int mode,
> >         if (ret < 0)
> >                 goto unlock;
> > 
> > +       ret = ceph_uninline_data(inode, NULL);
> > +       if (ret < 0)
> > +               goto out_put_caps;
> > +       dirty = ret;
> > +
> >         ceph_zero_pagecache_range(inode, offset, length);
> >         ret = ceph_zero_objects(inode, offset, length);
> > 
> >         if (!ret) {
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> >         }
> > -
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> >  unlock:
> >         inode_unlock(inode);
> > --
> > 2.21.0
> >
Jeff Layton July 12, 2019, 4:33 p.m. UTC | #3
On Fri, 2019-07-12 at 09:30 -0400, Jeff Layton wrote:
> On Fri, 2019-07-12 at 10:44 +0800, Yan, Zheng wrote:
> > On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@kernel.org> wrote:
> > > The current code will do the uninlining but it relies on the callers to
> > > set the i_inline_version appropriately afterward. Multiple tasks can end
> > > up attempting to uninline the data, and overwrite changes that follow
> > > the first uninlining.
> > > 
> > > Protect against competing uninlining attempts by having the callers take
> > > the i_truncate_mutex and then have ceph_uninline_data update the version
> > > itself before dropping it. This means that we also need to have
> > > ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if
> > > any of them are newly dirty.
> > > 
> > > We can't mark the caps dirty though unless we actually have them, so
> > > move the uninlining after the point where Fw caps are acquired in all of
> > > the callers.
> > > 
> > > Finally, since we are doing a lockless check first in all cases, just
> > > move that into ceph_uninline_data as well, and have the callers call it
> > > unconditionally.
> > > 
> > > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > > ---
> > >  fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++----------------
> > >  fs/ceph/file.c |  39 +++++++---------
> > >  2 files changed, 97 insertions(+), 61 deletions(-)
> > > 
> > > diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> > > index 038678963cf9..4606da82da6f 100644
> > > --- a/fs/ceph/addr.c
> > > +++ b/fs/ceph/addr.c
> > > @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > >         loff_t off = page_offset(page);
> > >         loff_t size = i_size_read(inode);
> > >         size_t len;
> > > -       int want, got, err;
> > > +       int want, got, err, dirty;
> > >         sigset_t oldset;
> > >         vm_fault_t ret = VM_FAULT_SIGBUS;
> > > 
> > > @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > > 
> > >         ceph_block_sigs(&oldset);
> > > 
> > > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > > -               err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > > -               if (err < 0)
> > > -                       goto out_free;
> > > -       }
> > > -
> > >         if (off + PAGE_SIZE <= size)
> > >                 len = PAGE_SIZE;
> > >         else
> > > @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > >         if (err < 0)
> > >                 goto out_free;
> > > 
> > > +       err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > > +       if (err < 0)
> > > +               goto out_put_caps;
> > > +       dirty = err;
> > > +
> > >         dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
> > >              inode, off, len, ceph_cap_string(got));
> > > 
> > > @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > > 
> > >         if (ret == VM_FAULT_LOCKED ||
> > >             ci->i_inline_version != CEPH_INLINE_NONE) {
> > > -               int dirty;
> > >                 spin_lock(&ci->i_ceph_lock);
> > > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > > -                                              &prealloc_cf);
> > > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > > +                                               &prealloc_cf);
> > >                 spin_unlock(&ci->i_ceph_lock);
> > >                 if (dirty)
> > >                         __mark_inode_dirty(inode, dirty);
> > > @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > > 
> > >         dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
> > >              inode, off, len, ceph_cap_string(got), ret);
> > > +out_put_caps:
> > >         ceph_put_cap_refs(ci, got);
> > >  out_free:
> > >         ceph_restore_sigs(&oldset);
> > > @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
> > >         }
> > >  }
> > > 
> > > +/**
> > > + * ceph_uninline_data - convert an inlined file to uninlined
> > > + * @inode: inode to be uninlined
> > > + * @page: optional pointer to first page in file
> > > + *
> > > + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex
> > > + * to serialize callers and prevent races. Returns either a negative error code
> > > + * or a positive set of I_DIRTY_* flags that the caller should apply when
> > > + * dirtying the inode.
> > > + */
> > >  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> > >  {
> > >         struct ceph_inode_info *ci = ceph_inode(inode);
> > >         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> > >         struct ceph_osd_request *req;
> > > +       struct ceph_cap_flush *prealloc_cf = NULL;
> > >         struct page *page = NULL;
> > >         u64 len, inline_version;
> > > -       int err = 0;
> > > +       int ret = 0;
> > >         bool from_pagecache = false;
> > >         bool allocated_page = false;
> > > 
> > > +       /* Do a lockless check first -- paired with i_ceph_lock for changes */
> > > +       inline_version = READ_ONCE(ci->i_inline_version);
> > > +       if (likely(inline_version == CEPH_INLINE_NONE))
> > > +               return 0;
> > > +
> > > +       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > > +            inode, ceph_vinop(inode), inline_version);
> > > +
> > > +       mutex_lock(&ci->i_truncate_mutex);
> > > +
> > > +       /* Double check the version after taking mutex */
> > >         spin_lock(&ci->i_ceph_lock);
> > >         inline_version = ci->i_inline_version;
> > >         spin_unlock(&ci->i_ceph_lock);
> > > 
> > > -       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > > -            inode, ceph_vinop(inode), inline_version);
> > > +       /* If someone beat us to the uninlining then just return. */
> > > +       if (inline_version == CEPH_INLINE_NONE)
> > > +               goto out;
> > > 
> > > -       if (inline_version == 1 || /* initial version, no data */
> > I'd like to avoid this optimization. check i_size instead.
> > 
> 
> This one is being removed already...
> 
> > > -           inline_version == CEPH_INLINE_NONE)
> > > +       prealloc_cf = ceph_alloc_cap_flush();
> > > +       if (!prealloc_cf) {
> > > +               ret = -ENOMEM;
> > >                 goto out;
> > > +       }
> > > +
> > > +       /*
> > > +        * Handle the initial version (1) as a a special case: switch the
> > > +        * version to CEPH_INLINE_NONE, but we don't need to do any uninlining
> > > +        * in that case since there is no data yet.
> > > +        */
> > > +       if (inline_version == 1)
> > > +               goto out_set_vers;
> > 
> > ditto
> > 
> 
> Fixed this up in my tree, and removed the places that set the
> i_inline_version to CEPH_INLINE_NONE in aio completion and cfr
> codepaths. I went ahead and pushed the result to the ceph-client/testing 
> branch.
> 
> Thanks to you and Luis for the review so far!
> 

Dang...I was going to do that, but I hit a deadlock this morning in
testing on xfstest generic/198:

[  160.393788] kworker/11:1    D    0   177      2 0x80004000
[  160.394953] Workqueue: ceph-msgr ceph_con_workfn [libceph]
[  160.396105] Call Trace:
[  160.396849]  ? __schedule+0x29f/0x680
[  160.397762]  schedule+0x33/0x90
[  160.398576]  io_schedule+0x12/0x40
[  160.399436]  __lock_page+0x13a/0x230
[  160.400329]  ? file_fdatawait_range+0x20/0x20
[  160.401326]  pagecache_get_page+0x196/0x370
[  160.402293]  ceph_fill_inline_data+0x190/0x2d0 [ceph]
[  160.403381]  handle_cap_grant+0x465/0xcd0 [ceph]
[  160.404409]  ceph_handle_caps+0xb1f/0x1900 [ceph]
[  160.405442]  dispatch+0x5a6/0x1220 [ceph]
[  160.406387]  ceph_con_workfn+0xd48/0x27e0 [libceph]
[  160.407432]  ? __switch_to_asm+0x40/0x70
[  160.408349]  ? __switch_to_asm+0x34/0x70
[  160.409264]  ? __switch_to_asm+0x40/0x70
[  160.410162]  ? __switch_to_asm+0x34/0x70
[  160.411048]  ? __switch_to_asm+0x40/0x70
[  160.411967]  ? __switch_to_asm+0x40/0x70
[  160.412850]  ? __switch_to+0x152/0x440
[  160.413697]  ? __switch_to_asm+0x34/0x70
[  160.414550]  process_one_work+0x19d/0x380
[  160.415415]  worker_thread+0x50/0x3b0
[  160.416236]  kthread+0xfb/0x130
[  160.416981]  ? process_one_work+0x380/0x380
[  160.417884]  ? kthread_park+0x90/0x90
[  160.418708]  ret_from_fork+0x22/0x40

[  160.435539] aiodio_sparse2  D    0  1615   1474 0x00004000
[  160.436575] Call Trace:
[  160.437226]  ? __schedule+0x29f/0x680
[  160.438020]  schedule+0x33/0x90
[  160.438751]  schedule_preempt_disabled+0xa/0x10
[  160.439652]  __mutex_lock.isra.0+0x25a/0x4b0
[  160.440533]  ceph_check_caps+0x66a/0xa80 [ceph]
[  160.441450]  ? __mark_inode_dirty+0xd2/0x370
[  160.442336]  ceph_put_cap_refs+0x1f5/0x350 [ceph]
[  160.443285]  ? __ceph_mark_dirty_caps+0x14e/0x270 [ceph]
[  160.444312]  ceph_page_mkwrite+0x1d2/0x360 [ceph]
[  160.445252]  do_page_mkwrite+0x30/0xc0
[  160.446067]  __handle_mm_fault+0x1020/0x1ac0
[  160.446941]  handle_mm_fault+0xc4/0x1f0
[  160.447777]  do_user_addr_fault+0x1f6/0x450
[  160.448638]  do_page_fault+0x33/0x120
[  160.449445]  ? async_page_fault+0x8/0x30
[  160.450298]  async_page_fault+0x1e/0x30

I think page_mkwrite has locked the page, and is trying to lock the
s_mutex, but handle_cap_grant already holds that mutex and is trying to
lock the page. So, an ABBA deadlock...

The interesting bit is that this does not seem to involve any of the
code that I touched. I think the changes I've made may be exposing this
somehow, possibly changing how caps are being handed out or something.

I'm still looking how best to fix it.

Patch
diff mbox series

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 038678963cf9..4606da82da6f 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1531,7 +1531,7 @@  static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 	loff_t off = page_offset(page);
 	loff_t size = i_size_read(inode);
 	size_t len;
-	int want, got, err;
+	int want, got, err, dirty;
 	sigset_t oldset;
 	vm_fault_t ret = VM_FAULT_SIGBUS;
 
@@ -1541,12 +1541,6 @@  static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 
 	ceph_block_sigs(&oldset);
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		err = ceph_uninline_data(inode, off == 0 ? page : NULL);
-		if (err < 0)
-			goto out_free;
-	}
-
 	if (off + PAGE_SIZE <= size)
 		len = PAGE_SIZE;
 	else
@@ -1565,6 +1559,11 @@  static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 	if (err < 0)
 		goto out_free;
 
+	err = ceph_uninline_data(inode, off == 0 ? page : NULL);
+	if (err < 0)
+		goto out_put_caps;
+	dirty = err;
+
 	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
 	     inode, off, len, ceph_cap_string(got));
 
@@ -1591,11 +1590,9 @@  static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 
 	if (ret == VM_FAULT_LOCKED ||
 	    ci->i_inline_version != CEPH_INLINE_NONE) {
-		int dirty;
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &prealloc_cf);
+		dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					        &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1603,6 +1600,7 @@  static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 
 	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
 	     inode, off, len, ceph_cap_string(got), ret);
+out_put_caps:
 	ceph_put_cap_refs(ci, got);
 out_free:
 	ceph_restore_sigs(&oldset);
@@ -1656,27 +1654,60 @@  void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
 	}
 }
 
+/**
+ * ceph_uninline_data - convert an inlined file to uninlined
+ * @inode: inode to be uninlined
+ * @page: optional pointer to first page in file
+ *
+ * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex
+ * to serialize callers and prevent races. Returns either a negative error code
+ * or a positive set of I_DIRTY_* flags that the caller should apply when
+ * dirtying the inode.
+ */
 int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_osd_request *req;
+	struct ceph_cap_flush *prealloc_cf = NULL;
 	struct page *page = NULL;
 	u64 len, inline_version;
-	int err = 0;
+	int ret = 0;
 	bool from_pagecache = false;
 	bool allocated_page = false;
 
+	/* Do a lockless check first -- paired with i_ceph_lock for changes */
+	inline_version = READ_ONCE(ci->i_inline_version);
+	if (likely(inline_version == CEPH_INLINE_NONE))
+		return 0;
+
+	dout("uninline_data %p %llx.%llx inline_version %llu\n",
+	     inode, ceph_vinop(inode), inline_version);
+
+	mutex_lock(&ci->i_truncate_mutex);
+
+	/* Double check the version after taking mutex */
 	spin_lock(&ci->i_ceph_lock);
 	inline_version = ci->i_inline_version;
 	spin_unlock(&ci->i_ceph_lock);
 
-	dout("uninline_data %p %llx.%llx inline_version %llu\n",
-	     inode, ceph_vinop(inode), inline_version);
+	/* If someone beat us to the uninlining then just return. */
+	if (inline_version == CEPH_INLINE_NONE)
+		goto out;
 
-	if (inline_version == 1 || /* initial version, no data */
-	    inline_version == CEPH_INLINE_NONE)
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf) {
+		ret = -ENOMEM;
 		goto out;
+	}
+
+	/*
+	 * Handle the initial version (1) as a a special case: switch the
+	 * version to CEPH_INLINE_NONE, but we don't need to do any uninlining
+	 * in that case since there is no data yet.
+	 */
+	if (inline_version == 1)
+		goto out_set_vers;
 
 	if (provided_page) {
 		page = provided_page;
@@ -1703,20 +1734,20 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 	} else {
 		page = __page_cache_alloc(GFP_NOFS);
 		if (!page) {
-			err = -ENOMEM;
+			ret = -ENOMEM;
 			goto out;
 		}
 		allocated_page = true;
 		lock_page(page);
-		err = __ceph_do_getattr(inode, page,
+		ret = __ceph_do_getattr(inode, page,
 					CEPH_STAT_CAP_INLINE_DATA, true);
-		if (err < 0) {
+		if (ret < 0) {
 			/* no inline data */
-			if (err == -ENODATA)
-				err = 0;
+			if (ret == -ENODATA)
+				ret = 0;
 			goto out;
 		}
-		len = err;
+		len = ret;
 	}
 
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
@@ -1724,16 +1755,16 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 				    CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
 				    NULL, 0, 0, false);
 	if (IS_ERR(req)) {
-		err = PTR_ERR(req);
+		ret = PTR_ERR(req);
 		goto out;
 	}
 
 	req->r_mtime = inode->i_mtime;
-	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!err)
-		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+	if (!ret)
+		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 	ceph_osdc_put_request(req);
-	if (err < 0)
+	if (ret < 0)
 		goto out;
 
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
@@ -1742,7 +1773,7 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 				    NULL, ci->i_truncate_seq,
 				    ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
-		err = PTR_ERR(req);
+		ret = PTR_ERR(req);
 		goto out;
 	}
 
@@ -1750,12 +1781,12 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 
 	{
 		__le64 xattr_buf = cpu_to_le64(inline_version);
-		err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
+		ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
 					    "inline_version", &xattr_buf,
 					    sizeof(xattr_buf),
 					    CEPH_OSD_CMPXATTR_OP_GT,
 					    CEPH_OSD_CMPXATTR_MODE_U64);
-		if (err)
+		if (ret)
 			goto out_put;
 	}
 
@@ -1763,22 +1794,31 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 		char xattr_buf[32];
 		int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
 					 "%llu", inline_version);
-		err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
+		ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
 					    "inline_version",
 					    xattr_buf, xattr_len, 0, 0);
-		if (err)
+		if (ret)
 			goto out_put;
 	}
 
 	req->r_mtime = inode->i_mtime;
-	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!err)
-		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+	if (!ret)
+		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 out_put:
 	ceph_osdc_put_request(req);
-	if (err == -ECANCELED)
-		err = 0;
+	if (ret == -ECANCELED)
+		ret = 0;
+out_set_vers:
+	if (!ret) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
+		ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					     &prealloc_cf);
+		spin_unlock(&ci->i_ceph_lock);
+	}
 out:
+	mutex_unlock(&ci->i_truncate_mutex);
 	if (page) {
 		unlock_page(page);
 		if (from_pagecache)
@@ -1786,10 +1826,11 @@  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
 		else if (allocated_page)
 			__free_pages(page, 0);
 	}
+	ceph_free_cap_flush(prealloc_cf);
 
 	dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
-	     inode, ceph_vinop(inode), inline_version, err);
-	return err;
+	     inode, ceph_vinop(inode), inline_version, ret);
+	return ret;
 }
 
 static const struct vm_operations_struct ceph_vmops = {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7bb090fa99d3..252aac44b8ce 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1387,7 +1387,7 @@  static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_cap_flush *prealloc_cf;
 	ssize_t count, written = 0;
-	int err, want, got;
+	int err, want, got, dirty;
 	loff_t pos;
 	loff_t limit = max(i_size_read(inode), fsc->max_file_size);
 
@@ -1438,12 +1438,6 @@  static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 
 	inode_inc_iversion_raw(inode);
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		err = ceph_uninline_data(inode, NULL);
-		if (err < 0)
-			goto out;
-	}
-
 	/* FIXME: not complete since it doesn't account for being at quota */
 	if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) {
 		err = -ENOSPC;
@@ -1462,6 +1456,11 @@  static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	if (err < 0)
 		goto out;
 
+	err = ceph_uninline_data(inode, NULL);
+	if (err < 0)
+		goto out_put_caps;
+	dirty = err;
+
 	dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
@@ -1510,12 +1509,9 @@  static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	}
 
 	if (written >= 0) {
-		int dirty;
-
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &prealloc_cf);
+		dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					        &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1526,6 +1522,7 @@  static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
 	     inode, ceph_vinop(inode), pos, (unsigned)count,
 	     ceph_cap_string(got));
+out_put_caps:
 	ceph_put_cap_refs(ci, got);
 
 	if (written == -EOLDSNAPC) {
@@ -1762,12 +1759,6 @@  static long ceph_fallocate(struct file *file, int mode,
 		goto unlock;
 	}
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		ret = ceph_uninline_data(inode, NULL);
-		if (ret < 0)
-			goto unlock;
-	}
-
 	size = i_size_read(inode);
 
 	/* Are we punching a hole beyond EOF? */
@@ -1785,19 +1776,23 @@  static long ceph_fallocate(struct file *file, int mode,
 	if (ret < 0)
 		goto unlock;
 
+	ret = ceph_uninline_data(inode, NULL);
+	if (ret < 0)
+		goto out_put_caps;
+	dirty = ret;
+
 	ceph_zero_pagecache_range(inode, offset, length);
 	ret = ceph_zero_objects(inode, offset, length);
 
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &prealloc_cf);
+		dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					        &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
 	}
-
+out_put_caps:
 	ceph_put_cap_refs(ci, got);
 unlock:
 	inode_unlock(inode);