diff mbox series

[RFC,11/11] ceph: wait for async dir ops to complete before doing synchronous dir ops

Message ID 20190409194229.8247-12-jlayton@kernel.org (mailing list archive)
State New, archived
Headers show
Series [RFC,01/11] ceph: after an MDS request, do callback and completions | expand

Commit Message

Jeffrey Layton April 9, 2019, 7:42 p.m. UTC
Ensure that we wait on replies from any pending directory operations
involving children before we allow synchronous operations involving
that directory to proceed.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
 fs/ceph/file.c  |  4 +++
 fs/ceph/super.h |  1 +
 3 files changed, 66 insertions(+), 4 deletions(-)

Comments

Jeff Layton April 9, 2019, 7:54 p.m. UTC | #1
On Tue, Apr 9, 2019 at 3:42 PM Jeff Layton <jlayton@kernel.org> wrote:
>
> Ensure that we wait on replies from any pending directory operations
> involving children before we allow synchronous operations involving
> that directory to proceed.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
>  fs/ceph/file.c  |  4 +++
>  fs/ceph/super.h |  1 +
>  3 files changed, 66 insertions(+), 4 deletions(-)
>
> diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> index 386c9439a020..0b8cee46e07c 100644
> --- a/fs/ceph/dir.c
> +++ b/fs/ceph/dir.c
> @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
>         struct ceph_mds_request *req;
>         int err;
>
> +       dout("link in dir %p old_dentry %p dentry %p\n", dir,
> +            old_dentry, dentry);
> +
>         if (ceph_snap(dir) != CEPH_NOSNAP)
>                 return -EROFS;
>
> -       dout("link in dir %p old_dentry %p dentry %p\n", dir,
> -            old_dentry, dentry);
> +       err = ceph_async_dirop_request_wait(dir);
> +       if (err)
> +               return err;
> +
>         req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
>         if (IS_ERR(req)) {
>                 d_drop(dentry);
> @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
>         iput(req->r_old_inode);
>  }
>
> +int ceph_async_dirop_request_wait(struct inode *inode)
> +{
> +       struct ceph_inode_info *ci = ceph_inode(inode);
> +       struct ceph_mds_request *req = NULL;
> +       int ret = 0;
> +
> +       /* Only applicable for directories */
> +       if (S_ISDIR(inode->i_mode))

That should, of course, be !S_ISDIR(...

> +               return 0;
> +
> +       spin_lock(&ci->i_unsafe_lock);
> +       if (!list_empty(&ci->i_unsafe_dirops)) {
> +               struct ceph_mds_request *last;
> +               last = list_last_entry(&ci->i_unsafe_dirops,
> +                                      struct ceph_mds_request,
> +                                      r_unsafe_dir_item);
> +               /*
> +                * If last request hasn't gotten a reply, then wait
> +                * for it.
> +                */
> +               if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
> +                   !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
> +                       req = last;
> +                       ceph_mdsc_get_request(req);
> +               }
> +       }
> +       spin_unlock(&ci->i_unsafe_lock);
> +
> +       if (req) {
> +               dout("%s %p wait on tid %llu\n", __func__, inode,
> +                    req ? req->r_tid : 0ULL);
> +               ret = wait_for_completion_killable(&req->r_completion);
> +               ceph_mdsc_put_request(req);
> +       }
> +       return ret;
> +}
> +
>  /*
>   * rmdir and unlink are differ only by the metadata op code
>   */
> @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
>                         CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
>         } else
>                 goto out;
> +
> +       /* Wait for any requests involving children to get a reply */
> +       err = ceph_async_dirop_request_wait(dir);
> +       if (err)
> +               goto out;
> +
>         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
>         if (IS_ERR(req)) {
>                 err = PTR_ERR(req);
> @@ -1115,6 +1163,9 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
>         int op = CEPH_MDS_OP_RENAME;
>         int err;
>
> +       dout("rename dir %p dentry %p to dir %p dentry %p\n",
> +            old_dir, old_dentry, new_dir, new_dentry);
> +
>         if (flags)
>                 return -EINVAL;
>
> @@ -1131,8 +1182,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
>             (!ceph_quota_is_same_realm(old_dir, new_dir)))
>                 return -EXDEV;
>
> -       dout("rename dir %p dentry %p to dir %p dentry %p\n",
> -            old_dir, old_dentry, new_dir, new_dentry);
> +       err = ceph_async_dirop_request_wait(old_dir);
> +       if (err)
> +               return err;
> +
> +       err = ceph_async_dirop_request_wait(new_dir);
> +       if (err)
> +               return err;
> +
>         req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
>         if (IS_ERR(req))
>                 return PTR_ERR(req);
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index f24d18f46715..f7e49907514e 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -444,6 +444,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
>              dir, dentry, dentry,
>              d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
>
> +       err = ceph_async_dirop_request_wait(dir);
> +       if (err)
> +               return err;
> +
>         if (dentry->d_name.len > NAME_MAX)
>                 return -ENAMETOOLONG;
>
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index 5c361dc1f47f..e97a6ce31a4e 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -1070,6 +1070,7 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req,
>                                struct dentry *dentry, int err);
>  extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
>                                          struct dentry *dentry, int err);
> +extern int ceph_async_dirop_request_wait(struct inode *inode);
>
>  extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di);
>  extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di);
> --
> 2.20.1
>
Luis Henriques April 10, 2019, 11:05 a.m. UTC | #2
Jeff Layton <jlayton@kernel.org> writes:

> Ensure that we wait on replies from any pending directory operations
> involving children before we allow synchronous operations involving
> that directory to proceed.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
>  fs/ceph/file.c  |  4 +++
>  fs/ceph/super.h |  1 +
>  3 files changed, 66 insertions(+), 4 deletions(-)
>
> diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> index 386c9439a020..0b8cee46e07c 100644
> --- a/fs/ceph/dir.c
> +++ b/fs/ceph/dir.c
> @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
>  	struct ceph_mds_request *req;
>  	int err;
>  
> +	dout("link in dir %p old_dentry %p dentry %p\n", dir,
> +	     old_dentry, dentry);
> +
>  	if (ceph_snap(dir) != CEPH_NOSNAP)
>  		return -EROFS;
>  
> -	dout("link in dir %p old_dentry %p dentry %p\n", dir,
> -	     old_dentry, dentry);
> +	err = ceph_async_dirop_request_wait(dir);
> +	if (err)
> +		return err;
> +
>  	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
>  	if (IS_ERR(req)) {
>  		d_drop(dentry);
> @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
>  	iput(req->r_old_inode);
>  }
>  
> +int ceph_async_dirop_request_wait(struct inode *inode)
> +{
> +	struct ceph_inode_info *ci = ceph_inode(inode);
> +	struct ceph_mds_request *req = NULL;
> +	int ret = 0;
> +
> +	/* Only applicable for directories */
> +	if (S_ISDIR(inode->i_mode))
> +		return 0;
> +
> +	spin_lock(&ci->i_unsafe_lock);
> +	if (!list_empty(&ci->i_unsafe_dirops)) {
> +		struct ceph_mds_request *last;
> +		last = list_last_entry(&ci->i_unsafe_dirops,
> +				       struct ceph_mds_request,
> +				       r_unsafe_dir_item);
> +		/*
> +		 * If last request hasn't gotten a reply, then wait
> +		 * for it.
> +		 */
> +		if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
> +		    !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
> +			req = last;
> +			ceph_mdsc_get_request(req);
> +		}
> +	}
> +	spin_unlock(&ci->i_unsafe_lock);
> +
> +	if (req) {
> +		dout("%s %p wait on tid %llu\n", __func__, inode,
> +		     req ? req->r_tid : 0ULL);
> +		ret = wait_for_completion_killable(&req->r_completion);
> +		ceph_mdsc_put_request(req);
> +	}
> +	return ret;
> +}
> +
>  /*
>   * rmdir and unlink are differ only by the metadata op code
>   */
> @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
>  			CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
>  	} else
>  		goto out;
> +
> +	/* Wait for any requests involving children to get a reply */
> +	err = ceph_async_dirop_request_wait(dir);
> +	if (err)
> +		goto out;
> +

In this case, couldn't we move this check into the 'else' branch added
in the previous patch?  IOW, couldn't we have two (or more) asynchronous
unlink operations at the same time?

Cheers,
Jeff Layton April 10, 2019, 12:16 p.m. UTC | #3
On Wed, Apr 10, 2019 at 7:05 AM Luis Henriques <lhenriques@suse.com> wrote:
>
> Jeff Layton <jlayton@kernel.org> writes:
>
> > Ensure that we wait on replies from any pending directory operations
> > involving children before we allow synchronous operations involving
> > that directory to proceed.
> >
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >  fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
> >  fs/ceph/file.c  |  4 +++
> >  fs/ceph/super.h |  1 +
> >  3 files changed, 66 insertions(+), 4 deletions(-)
> >
> > diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> > index 386c9439a020..0b8cee46e07c 100644
> > --- a/fs/ceph/dir.c
> > +++ b/fs/ceph/dir.c
> > @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
> >       struct ceph_mds_request *req;
> >       int err;
> >
> > +     dout("link in dir %p old_dentry %p dentry %p\n", dir,
> > +          old_dentry, dentry);
> > +
> >       if (ceph_snap(dir) != CEPH_NOSNAP)
> >               return -EROFS;
> >
> > -     dout("link in dir %p old_dentry %p dentry %p\n", dir,
> > -          old_dentry, dentry);
> > +     err = ceph_async_dirop_request_wait(dir);
> > +     if (err)
> > +             return err;
> > +
> >       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
> >       if (IS_ERR(req)) {
> >               d_drop(dentry);
> > @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
> >       iput(req->r_old_inode);
> >  }
> >
> > +int ceph_async_dirop_request_wait(struct inode *inode)
> > +{
> > +     struct ceph_inode_info *ci = ceph_inode(inode);
> > +     struct ceph_mds_request *req = NULL;
> > +     int ret = 0;
> > +
> > +     /* Only applicable for directories */
> > +     if (S_ISDIR(inode->i_mode))
> > +             return 0;
> > +
> > +     spin_lock(&ci->i_unsafe_lock);
> > +     if (!list_empty(&ci->i_unsafe_dirops)) {
> > +             struct ceph_mds_request *last;
> > +             last = list_last_entry(&ci->i_unsafe_dirops,
> > +                                    struct ceph_mds_request,
> > +                                    r_unsafe_dir_item);
> > +             /*
> > +              * If last request hasn't gotten a reply, then wait
> > +              * for it.
> > +              */
> > +             if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
> > +                 !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
> > +                     req = last;
> > +                     ceph_mdsc_get_request(req);
> > +             }
> > +     }
> > +     spin_unlock(&ci->i_unsafe_lock);
> > +
> > +     if (req) {
> > +             dout("%s %p wait on tid %llu\n", __func__, inode,
> > +                  req ? req->r_tid : 0ULL);
> > +             ret = wait_for_completion_killable(&req->r_completion);
> > +             ceph_mdsc_put_request(req);
> > +     }
> > +     return ret;
> > +}
> > +
> >  /*
> >   * rmdir and unlink are differ only by the metadata op code
> >   */
> > @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
> >                       CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
> >       } else
> >               goto out;
> > +
> > +     /* Wait for any requests involving children to get a reply */
> > +     err = ceph_async_dirop_request_wait(dir);
> > +     if (err)
> > +             goto out;
> > +
>
> In this case, couldn't we move this check into the 'else' branch added
> in the previous patch?  IOW, couldn't we have two (or more) asynchronous
> unlink operations at the same time?
>
> Cheers,

For this set, it won't matter. We're only doing async unlinks on
regular files, and rmdir is still done synchronously. So, if this is a
candidate for an async unlink it can't have any child dirops anyway.

It's a minor thing, but this has us blocking before the
ceph_mdsc_create_request call, which means we won't do any allocation
until we're ready to fire off the request, which I like marginally
better.

Longer term, I'd like to expand this so that can do async rmdirs as
well, but that require a different set of caps (FxLx on the parent).
Once I get there, I'll probably split off a separate ceph_rmdir
inode_operation.

>
> >       req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if
> > (IS_ERR(req)) { err = PTR_ERR(req); @@ -1115,6 +1163,9 @@ static int
> > ceph_rename(struct inode *old_dir, struct dentry *old_dentry, int op =
> > CEPH_MDS_OP_RENAME; int err;
> >
> > +     dout("rename dir %p dentry %p to dir %p dentry %p\n",
> > +          old_dir, old_dentry, new_dir, new_dentry);
> > +
> >       if (flags)
> >               return -EINVAL;
> >
> > @@ -1131,8 +1182,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
> >           (!ceph_quota_is_same_realm(old_dir, new_dir)))
> >               return -EXDEV;
> >
> > -     dout("rename dir %p dentry %p to dir %p dentry %p\n",
> > -          old_dir, old_dentry, new_dir, new_dentry);
> > +     err = ceph_async_dirop_request_wait(old_dir);
> > +     if (err)
> > +             return err;
> > +
> > +     err = ceph_async_dirop_request_wait(new_dir);
> > +     if (err)
> > +             return err;
> > +
> >       req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
> >       if (IS_ERR(req))
> >               return PTR_ERR(req);
> > diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> > index f24d18f46715..f7e49907514e 100644
> > --- a/fs/ceph/file.c
> > +++ b/fs/ceph/file.c
> > @@ -444,6 +444,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
> >            dir, dentry, dentry,
> >            d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
> >
> > +     err = ceph_async_dirop_request_wait(dir);
> > +     if (err)
> > +             return err;
> > +
> >       if (dentry->d_name.len > NAME_MAX)
> >               return -ENAMETOOLONG;
> >
> > diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> > index 5c361dc1f47f..e97a6ce31a4e 100644
> > --- a/fs/ceph/super.h
> > +++ b/fs/ceph/super.h
> > @@ -1070,6 +1070,7 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req,
> >                              struct dentry *dentry, int err);
> >  extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
> >                                        struct dentry *dentry, int err);
> > +extern int ceph_async_dirop_request_wait(struct inode *inode);
> >
> >  extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di);
> >  extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di);
Jeff Layton April 10, 2019, 12:21 p.m. UTC | #4
On Wed, Apr 10, 2019 at 8:16 AM Jeff Layton <jlayton@poochiereds.net> wrote:
>
> On Wed, Apr 10, 2019 at 7:05 AM Luis Henriques <lhenriques@suse.com> wrote:
> >
> > Jeff Layton <jlayton@kernel.org> writes:
> >
> > > Ensure that we wait on replies from any pending directory operations
> > > involving children before we allow synchronous operations involving
> > > that directory to proceed.
> > >
> > > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > > ---
> > >  fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
> > >  fs/ceph/file.c  |  4 +++
> > >  fs/ceph/super.h |  1 +
> > >  3 files changed, 66 insertions(+), 4 deletions(-)
> > >
> > > diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> > > index 386c9439a020..0b8cee46e07c 100644
> > > --- a/fs/ceph/dir.c
> > > +++ b/fs/ceph/dir.c
> > > @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
> > >       struct ceph_mds_request *req;
> > >       int err;
> > >
> > > +     dout("link in dir %p old_dentry %p dentry %p\n", dir,
> > > +          old_dentry, dentry);
> > > +
> > >       if (ceph_snap(dir) != CEPH_NOSNAP)
> > >               return -EROFS;
> > >
> > > -     dout("link in dir %p old_dentry %p dentry %p\n", dir,
> > > -          old_dentry, dentry);
> > > +     err = ceph_async_dirop_request_wait(dir);
> > > +     if (err)
> > > +             return err;
> > > +
> > >       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
> > >       if (IS_ERR(req)) {
> > >               d_drop(dentry);
> > > @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
> > >       iput(req->r_old_inode);
> > >  }
> > >
> > > +int ceph_async_dirop_request_wait(struct inode *inode)
> > > +{
> > > +     struct ceph_inode_info *ci = ceph_inode(inode);
> > > +     struct ceph_mds_request *req = NULL;
> > > +     int ret = 0;
> > > +
> > > +     /* Only applicable for directories */
> > > +     if (S_ISDIR(inode->i_mode))
> > > +             return 0;
> > > +
> > > +     spin_lock(&ci->i_unsafe_lock);
> > > +     if (!list_empty(&ci->i_unsafe_dirops)) {
> > > +             struct ceph_mds_request *last;
> > > +             last = list_last_entry(&ci->i_unsafe_dirops,
> > > +                                    struct ceph_mds_request,
> > > +                                    r_unsafe_dir_item);
> > > +             /*
> > > +              * If last request hasn't gotten a reply, then wait
> > > +              * for it.
> > > +              */
> > > +             if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
> > > +                 !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
> > > +                     req = last;
> > > +                     ceph_mdsc_get_request(req);
> > > +             }
> > > +     }
> > > +     spin_unlock(&ci->i_unsafe_lock);
> > > +
> > > +     if (req) {
> > > +             dout("%s %p wait on tid %llu\n", __func__, inode,
> > > +                  req ? req->r_tid : 0ULL);
> > > +             ret = wait_for_completion_killable(&req->r_completion);
> > > +             ceph_mdsc_put_request(req);
> > > +     }
> > > +     return ret;
> > > +}
> > > +
> > >  /*
> > >   * rmdir and unlink are differ only by the metadata op code
> > >   */
> > > @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
> > >                       CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
> > >       } else
> > >               goto out;
> > > +
> > > +     /* Wait for any requests involving children to get a reply */
> > > +     err = ceph_async_dirop_request_wait(dir);
> > > +     if (err)
> > > +             goto out;
> > > +
> >
> > In this case, couldn't we move this check into the 'else' branch added
> > in the previous patch?  IOW, couldn't we have two (or more) asynchronous
> > unlink operations at the same time?
> >
> > Cheers,
>
> For this set, it won't matter. We're only doing async unlinks on
> regular files, and rmdir is still done synchronously. So, if this is a
> candidate for an async unlink it can't have any child dirops anyway.
>
> It's a minor thing, but this has us blocking before the
> ceph_mdsc_create_request call, which means we won't do any allocation
> until we're ready to fire off the request, which I like marginally
> better.
>
> Longer term, I'd like to expand this so that can do async rmdirs as
> well, but that require a different set of caps (FxLx on the parent).
> Once I get there, I'll probably split off a separate ceph_rmdir
> inode_operation.
>

Sorry -- now that I look closer, I think this is a bug. We should be doing:

    err = ceph_async_dirop_request_wait(inode);

Basically, we want to wait for any requests involving children to
finish before we issue an rmdir. I'll fix that up and also test to see
whether this improves performance. Good catch!

Thanks for the review so far!

> >
> > >       req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if
> > > (IS_ERR(req)) { err = PTR_ERR(req); @@ -1115,6 +1163,9 @@ static int
> > > ceph_rename(struct inode *old_dir, struct dentry *old_dentry, int op =
> > > CEPH_MDS_OP_RENAME; int err;
> > >
> > > +     dout("rename dir %p dentry %p to dir %p dentry %p\n",
> > > +          old_dir, old_dentry, new_dir, new_dentry);
> > > +
> > >       if (flags)
> > >               return -EINVAL;
> > >
> > > @@ -1131,8 +1182,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
> > >           (!ceph_quota_is_same_realm(old_dir, new_dir)))
> > >               return -EXDEV;
> > >
> > > -     dout("rename dir %p dentry %p to dir %p dentry %p\n",
> > > -          old_dir, old_dentry, new_dir, new_dentry);
> > > +     err = ceph_async_dirop_request_wait(old_dir);
> > > +     if (err)
> > > +             return err;
> > > +
> > > +     err = ceph_async_dirop_request_wait(new_dir);
> > > +     if (err)
> > > +             return err;
> > > +
> > >       req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
> > >       if (IS_ERR(req))
> > >               return PTR_ERR(req);
> > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> > > index f24d18f46715..f7e49907514e 100644
> > > --- a/fs/ceph/file.c
> > > +++ b/fs/ceph/file.c
> > > @@ -444,6 +444,10 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
> > >            dir, dentry, dentry,
> > >            d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
> > >
> > > +     err = ceph_async_dirop_request_wait(dir);
> > > +     if (err)
> > > +             return err;
> > > +
> > >       if (dentry->d_name.len > NAME_MAX)
> > >               return -ENAMETOOLONG;
> > >
> > > diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> > > index 5c361dc1f47f..e97a6ce31a4e 100644
> > > --- a/fs/ceph/super.h
> > > +++ b/fs/ceph/super.h
> > > @@ -1070,6 +1070,7 @@ extern int ceph_handle_snapdir(struct ceph_mds_request *req,
> > >                              struct dentry *dentry, int err);
> > >  extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
> > >                                        struct dentry *dentry, int err);
> > > +extern int ceph_async_dirop_request_wait(struct inode *inode);
> > >
> > >  extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di);
> > >  extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di);
>
>
>
> --
> Jeff Layton <jlayton@poochiereds.net>
Luis Henriques April 10, 2019, 1:50 p.m. UTC | #5
Jeff Layton <jlayton@poochiereds.net> writes:

> On Wed, Apr 10, 2019 at 8:16 AM Jeff Layton <jlayton@poochiereds.net> wrote:
>>
>> On Wed, Apr 10, 2019 at 7:05 AM Luis Henriques <lhenriques@suse.com> wrote:
>> >
>> > Jeff Layton <jlayton@kernel.org> writes:
>> >
>> > > Ensure that we wait on replies from any pending directory operations
>> > > involving children before we allow synchronous operations involving
>> > > that directory to proceed.
>> > >
>> > > Signed-off-by: Jeff Layton <jlayton@kernel.org>
>> > > ---
>> > >  fs/ceph/dir.c   | 65 ++++++++++++++++++++++++++++++++++++++++++++++---
>> > >  fs/ceph/file.c  |  4 +++
>> > >  fs/ceph/super.h |  1 +
>> > >  3 files changed, 66 insertions(+), 4 deletions(-)
>> > >
>> > > diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
>> > > index 386c9439a020..0b8cee46e07c 100644
>> > > --- a/fs/ceph/dir.c
>> > > +++ b/fs/ceph/dir.c
>> > > @@ -998,11 +998,16 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
>> > >       struct ceph_mds_request *req;
>> > >       int err;
>> > >
>> > > +     dout("link in dir %p old_dentry %p dentry %p\n", dir,
>> > > +          old_dentry, dentry);
>> > > +
>> > >       if (ceph_snap(dir) != CEPH_NOSNAP)
>> > >               return -EROFS;
>> > >
>> > > -     dout("link in dir %p old_dentry %p dentry %p\n", dir,
>> > > -          old_dentry, dentry);
>> > > +     err = ceph_async_dirop_request_wait(dir);
>> > > +     if (err)
>> > > +             return err;
>> > > +
>> > >       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
>> > >       if (IS_ERR(req)) {
>> > >               d_drop(dentry);
>> > > @@ -1041,6 +1046,43 @@ static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
>> > >       iput(req->r_old_inode);
>> > >  }
>> > >
>> > > +int ceph_async_dirop_request_wait(struct inode *inode)
>> > > +{
>> > > +     struct ceph_inode_info *ci = ceph_inode(inode);
>> > > +     struct ceph_mds_request *req = NULL;
>> > > +     int ret = 0;
>> > > +
>> > > +     /* Only applicable for directories */
>> > > +     if (S_ISDIR(inode->i_mode))
>> > > +             return 0;
>> > > +
>> > > +     spin_lock(&ci->i_unsafe_lock);
>> > > +     if (!list_empty(&ci->i_unsafe_dirops)) {
>> > > +             struct ceph_mds_request *last;
>> > > +             last = list_last_entry(&ci->i_unsafe_dirops,
>> > > +                                    struct ceph_mds_request,
>> > > +                                    r_unsafe_dir_item);
>> > > +             /*
>> > > +              * If last request hasn't gotten a reply, then wait
>> > > +              * for it.
>> > > +              */
>> > > +             if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
>> > > +                 !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
>> > > +                     req = last;
>> > > +                     ceph_mdsc_get_request(req);
>> > > +             }
>> > > +     }
>> > > +     spin_unlock(&ci->i_unsafe_lock);
>> > > +
>> > > +     if (req) {
>> > > +             dout("%s %p wait on tid %llu\n", __func__, inode,
>> > > +                  req ? req->r_tid : 0ULL);
>> > > +             ret = wait_for_completion_killable(&req->r_completion);
>> > > +             ceph_mdsc_put_request(req);
>> > > +     }
>> > > +     return ret;
>> > > +}
>> > > +
>> > >  /*
>> > >   * rmdir and unlink are differ only by the metadata op code
>> > >   */
>> > > @@ -1064,6 +1106,12 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
>> > >                       CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
>> > >       } else
>> > >               goto out;
>> > > +
>> > > +     /* Wait for any requests involving children to get a reply */
>> > > +     err = ceph_async_dirop_request_wait(dir);
>> > > +     if (err)
>> > > +             goto out;
>> > > +
>> >
>> > In this case, couldn't we move this check into the 'else' branch added
>> > in the previous patch?  IOW, couldn't we have two (or more) asynchronous
>> > unlink operations at the same time?
>> >
>> > Cheers,
>>
>> For this set, it won't matter. We're only doing async unlinks on
>> regular files, and rmdir is still done synchronously. So, if this is a
>> candidate for an async unlink it can't have any child dirops anyway.
>>
>> It's a minor thing, but this has us blocking before the
>> ceph_mdsc_create_request call, which means we won't do any allocation
>> until we're ready to fire off the request, which I like marginally
>> better.
>>
>> Longer term, I'd like to expand this so that can do async rmdirs as
>> well, but that require a different set of caps (FxLx on the parent).
>> Once I get there, I'll probably split off a separate ceph_rmdir
>> inode_operation.
>>
>
> Sorry -- now that I look closer, I think this is a bug. We should be doing:
>
>     err = ceph_async_dirop_request_wait(inode);
>
> Basically, we want to wait for any requests involving children to
> finish before we issue an rmdir. I'll fix that up and also test to see
> whether this improves performance. Good catch!

Heh, obviously I did *not* found that bug -- I probably just got
confused when I saw 'dir' and immediately assumed an rmdir.  So, I
accidentally made you see a bug.  Nice :-)

Cheers,
diff mbox series

Patch

diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 386c9439a020..0b8cee46e07c 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -998,11 +998,16 @@  static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 	struct ceph_mds_request *req;
 	int err;
 
+	dout("link in dir %p old_dentry %p dentry %p\n", dir,
+	     old_dentry, dentry);
+
 	if (ceph_snap(dir) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("link in dir %p old_dentry %p dentry %p\n", dir,
-	     old_dentry, dentry);
+	err = ceph_async_dirop_request_wait(dir);
+	if (err)
+		return err;
+
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS);
 	if (IS_ERR(req)) {
 		d_drop(dentry);
@@ -1041,6 +1046,43 @@  static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
 	iput(req->r_old_inode);
 }
 
+int ceph_async_dirop_request_wait(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_request *req = NULL;
+	int ret = 0;
+
+	/* Only applicable for directories */
+	if (S_ISDIR(inode->i_mode))
+		return 0;
+
+	spin_lock(&ci->i_unsafe_lock);
+	if (!list_empty(&ci->i_unsafe_dirops)) {
+		struct ceph_mds_request *last;
+		last = list_last_entry(&ci->i_unsafe_dirops,
+				       struct ceph_mds_request,
+				       r_unsafe_dir_item);
+		/*
+		 * If last request hasn't gotten a reply, then wait
+		 * for it.
+		 */
+		if (!test_bit(CEPH_MDS_R_GOT_UNSAFE, &last->r_req_flags) &&
+		    !test_bit(CEPH_MDS_R_GOT_SAFE, &last->r_req_flags)) {
+			req = last;
+			ceph_mdsc_get_request(req);
+		}
+	}
+	spin_unlock(&ci->i_unsafe_lock);
+
+	if (req) {
+		dout("%s %p wait on tid %llu\n", __func__, inode,
+		     req ? req->r_tid : 0ULL);
+		ret = wait_for_completion_killable(&req->r_completion);
+		ceph_mdsc_put_request(req);
+	}
+	return ret;
+}
+
 /*
  * rmdir and unlink are differ only by the metadata op code
  */
@@ -1064,6 +1106,12 @@  static int ceph_unlink(struct inode *dir, struct dentry *dentry)
 			CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK;
 	} else
 		goto out;
+
+	/* Wait for any requests involving children to get a reply */
+	err = ceph_async_dirop_request_wait(dir);
+	if (err)
+		goto out;
+
 	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
@@ -1115,6 +1163,9 @@  static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 	int op = CEPH_MDS_OP_RENAME;
 	int err;
 
+	dout("rename dir %p dentry %p to dir %p dentry %p\n",
+	     old_dir, old_dentry, new_dir, new_dentry);
+
 	if (flags)
 		return -EINVAL;
 
@@ -1131,8 +1182,14 @@  static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 	    (!ceph_quota_is_same_realm(old_dir, new_dir)))
 		return -EXDEV;
 
-	dout("rename dir %p dentry %p to dir %p dentry %p\n",
-	     old_dir, old_dentry, new_dir, new_dentry);
+	err = ceph_async_dirop_request_wait(old_dir);
+	if (err)
+		return err;
+
+	err = ceph_async_dirop_request_wait(new_dir);
+	if (err)
+		return err;
+
 	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index f24d18f46715..f7e49907514e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -444,6 +444,10 @@  int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	     dir, dentry, dentry,
 	     d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
 
+	err = ceph_async_dirop_request_wait(dir);
+	if (err)
+		return err;
+
 	if (dentry->d_name.len > NAME_MAX)
 		return -ENAMETOOLONG;
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 5c361dc1f47f..e97a6ce31a4e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1070,6 +1070,7 @@  extern int ceph_handle_snapdir(struct ceph_mds_request *req,
 			       struct dentry *dentry, int err);
 extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
 					 struct dentry *dentry, int err);
+extern int ceph_async_dirop_request_wait(struct inode *inode);
 
 extern void __ceph_dentry_lease_touch(struct ceph_dentry_info *di);
 extern void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di);