diff mbox series

[v4] ceph: drop the messages from MDS when unmounting

Message ID 20230327030508.310588-1-xiubli@redhat.com (mailing list archive)
State New, archived
Headers show
Series [v4] ceph: drop the messages from MDS when unmounting | expand

Commit Message

Xiubo Li March 27, 2023, 3:05 a.m. UTC
From: Xiubo Li <xiubli@redhat.com>

When unmounting and all the dirty buffer will be flushed and after
the last osd request is finished the last reference of the i_count
will be released. Then it will flush the dirty cap/snap to MDSs,
and the unmounting won't wait the possible acks, which will ihode
the inodes when updating the metadata locally but makes no sense
any more, of this. This will make the evict_inodes() to skip these
inodes.

If encrypt is enabled the kernel generate a warning when removing
the encrypt keys when the skipped inodes still hold the keyring:

WARNING: CPU: 4 PID: 168846 at fs/crypto/keyring.c:242 fscrypt_destroy_keyring+0x7e/0xd0
CPU: 4 PID: 168846 Comm: umount Tainted: G S  6.1.0-rc5-ceph-g72ead199864c #1
Hardware name: Supermicro SYS-5018R-WR/X10SRW-F, BIOS 2.0 12/17/2015
RIP: 0010:fscrypt_destroy_keyring+0x7e/0xd0
RSP: 0018:ffffc9000b277e28 EFLAGS: 00010202
RAX: 0000000000000002 RBX: ffff88810d52ac00 RCX: ffff88810b56aa00
RDX: 0000000080000000 RSI: ffffffff822f3a09 RDI: ffff888108f59000
RBP: ffff8881d394fb88 R08: 0000000000000028 R09: 0000000000000000
R10: 0000000000000001 R11: 11ff4fe6834fcd91 R12: ffff8881d394fc40
R13: ffff888108f59000 R14: ffff8881d394f800 R15: 0000000000000000
FS:  00007fd83f6f1080(0000) GS:ffff88885fd00000(0000) knlGS:0000000000000000
CS:  0010 DS: 0000 ES: 0000 CR0: 0000000080050033
CR2: 00007f918d417000 CR3: 000000017f89a005 CR4: 00000000003706e0
DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
DR3: 0000000000000000 DR6: 00000000fffe0ff0 DR7: 0000000000000400
Call Trace:
<TASK>
generic_shutdown_super+0x47/0x120
kill_anon_super+0x14/0x30
ceph_kill_sb+0x36/0x90 [ceph]
deactivate_locked_super+0x29/0x60
cleanup_mnt+0xb8/0x140
task_work_run+0x67/0xb0
exit_to_user_mode_prepare+0x23d/0x240
syscall_exit_to_user_mode+0x25/0x60
do_syscall_64+0x40/0x80
entry_SYSCALL_64_after_hwframe+0x63/0xcd
RIP: 0033:0x7fd83dc39e9b

Later the kernel will crash when iput() the inodes and dereferencing
the "sb->s_master_keys", which has been released by the
generic_shutdown_super().

URL: https://tracker.ceph.com/issues/58126
URL: https://tracker.ceph.com/issues/59162
Signed-off-by: Xiubo Li <xiubli@redhat.com>
---

Only updated the 2/2 patch since series V3:

Changed in V4:
- Always resend the session close requests to MDS when receives
  new messages just before dropping them.
- Even receives a corrupted message we should increase the s_seq
  and resend the session close requests.


 fs/ceph/caps.c       |  6 ++++-
 fs/ceph/mds_client.c | 14 ++++++++---
 fs/ceph/mds_client.h | 11 ++++++++-
 fs/ceph/quota.c      |  9 ++++---
 fs/ceph/snap.c       | 10 ++++----
 fs/ceph/super.c      | 57 ++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/super.h      |  3 +++
 7 files changed, 96 insertions(+), 14 deletions(-)

Comments

Luis Henriques March 29, 2023, 2:41 p.m. UTC | #1
Hi!

I finally got to have a look at this patch because I found that the
version you sent along with fscrypt v17 is a bit different from what's in
the testing branch :-)

xiubli@redhat.com writes:

> From: Xiubo Li <xiubli@redhat.com>
>
> When unmounting and all the dirty buffer will be flushed and after
> the last osd request is finished the last reference of the i_count
> will be released. Then it will flush the dirty cap/snap to MDSs,
> and the unmounting won't wait the possible acks, which will ihode
> the inodes when updating the metadata locally but makes no sense
> any more, of this. This will make the evict_inodes() to skip these
> inodes.
>
> If encrypt is enabled the kernel generate a warning when removing
> the encrypt keys when the skipped inodes still hold the keyring:
>
> WARNING: CPU: 4 PID: 168846 at fs/crypto/keyring.c:242 fscrypt_destroy_keyring+0x7e/0xd0
> CPU: 4 PID: 168846 Comm: umount Tainted: G S  6.1.0-rc5-ceph-g72ead199864c #1
> Hardware name: Supermicro SYS-5018R-WR/X10SRW-F, BIOS 2.0 12/17/2015
> RIP: 0010:fscrypt_destroy_keyring+0x7e/0xd0
> RSP: 0018:ffffc9000b277e28 EFLAGS: 00010202
> RAX: 0000000000000002 RBX: ffff88810d52ac00 RCX: ffff88810b56aa00
> RDX: 0000000080000000 RSI: ffffffff822f3a09 RDI: ffff888108f59000
> RBP: ffff8881d394fb88 R08: 0000000000000028 R09: 0000000000000000
> R10: 0000000000000001 R11: 11ff4fe6834fcd91 R12: ffff8881d394fc40
> R13: ffff888108f59000 R14: ffff8881d394f800 R15: 0000000000000000
> FS:  00007fd83f6f1080(0000) GS:ffff88885fd00000(0000) knlGS:0000000000000000
> CS:  0010 DS: 0000 ES: 0000 CR0: 0000000080050033
> CR2: 00007f918d417000 CR3: 000000017f89a005 CR4: 00000000003706e0
> DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
> DR3: 0000000000000000 DR6: 00000000fffe0ff0 DR7: 0000000000000400
> Call Trace:
> <TASK>
> generic_shutdown_super+0x47/0x120
> kill_anon_super+0x14/0x30
> ceph_kill_sb+0x36/0x90 [ceph]
> deactivate_locked_super+0x29/0x60
> cleanup_mnt+0xb8/0x140
> task_work_run+0x67/0xb0
> exit_to_user_mode_prepare+0x23d/0x240
> syscall_exit_to_user_mode+0x25/0x60
> do_syscall_64+0x40/0x80
> entry_SYSCALL_64_after_hwframe+0x63/0xcd
> RIP: 0033:0x7fd83dc39e9b
>
> Later the kernel will crash when iput() the inodes and dereferencing
> the "sb->s_master_keys", which has been released by the
> generic_shutdown_super().
>
> URL: https://tracker.ceph.com/issues/58126
> URL: https://tracker.ceph.com/issues/59162
> Signed-off-by: Xiubo Li <xiubli@redhat.com>
> ---
>
> Only updated the 2/2 patch since series V3:
>
> Changed in V4:
> - Always resend the session close requests to MDS when receives
>   new messages just before dropping them.
> - Even receives a corrupted message we should increase the s_seq
>   and resend the session close requests.
>
>
>  fs/ceph/caps.c       |  6 ++++-
>  fs/ceph/mds_client.c | 14 ++++++++---
>  fs/ceph/mds_client.h | 11 ++++++++-
>  fs/ceph/quota.c      |  9 ++++---
>  fs/ceph/snap.c       | 10 ++++----
>  fs/ceph/super.c      | 57 ++++++++++++++++++++++++++++++++++++++++++++
>  fs/ceph/super.h      |  3 +++
>  7 files changed, 96 insertions(+), 14 deletions(-)
>
> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
> index 6379c0070492..e1bb6d9c16f8 100644
> --- a/fs/ceph/caps.c
> +++ b/fs/ceph/caps.c
> @@ -4222,6 +4222,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  
>  	dout("handle_caps from mds%d\n", session->s_mds);
>  
> +	if (!ceph_inc_stopping_blocker(mdsc, session))
> +		return;
> +
>  	/* decode */
>  	end = msg->front.iov_base + msg->front.iov_len;
>  	if (msg->front.iov_len < sizeof(*h))
> @@ -4323,7 +4326,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  	     vino.snap, inode);
>  
>  	mutex_lock(&session->s_mutex);
> -	inc_session_sequence(session);
>  	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
>  	     (unsigned)seq);
>  
> @@ -4435,6 +4437,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  done_unlocked:
>  	iput(inode);
>  out:
> +	ceph_dec_stopping_blocker(mdsc);
> +
>  	ceph_put_string(extra_info.pool_ns);
>  
>  	/* Defer closing the sessions after s_mutex lock being released */
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index 85d639f75ea1..21ca41e5f68b 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -4877,6 +4877,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>  
>  	dout("handle_lease from mds%d\n", mds);
>  
> +	if (!ceph_inc_stopping_blocker(mdsc, session))
> +		return;
> +
>  	/* decode */
>  	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
>  		goto bad;
> @@ -4895,8 +4898,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>  	     dname.len, dname.name);
>  
>  	mutex_lock(&session->s_mutex);
> -	inc_session_sequence(session);
> -
>  	if (!inode) {
>  		dout("handle_lease no inode %llx\n", vino.ino);
>  		goto release;
> @@ -4958,9 +4959,13 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>  out:
>  	mutex_unlock(&session->s_mutex);
>  	iput(inode);
> +
> +	ceph_dec_stopping_blocker(mdsc);
>  	return;
>  
>  bad:
> +	ceph_dec_stopping_blocker(mdsc);
> +
>  	pr_err("corrupt lease message\n");
>  	ceph_msg_dump(msg);
>  }
> @@ -5156,6 +5161,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
>  	}
>  
>  	init_completion(&mdsc->safe_umount_waiters);
> +	spin_lock_init(&mdsc->stopping_lock);
> +	atomic_set(&mdsc->stopping_blockers, 0);
> +	init_completion(&mdsc->stopping_waiter);
>  	init_waitqueue_head(&mdsc->session_close_wq);
>  	INIT_LIST_HEAD(&mdsc->waiting_for_map);
>  	mdsc->quotarealms_inodes = RB_ROOT;
> @@ -5270,7 +5278,7 @@ void send_flush_mdlog(struct ceph_mds_session *s)
>  void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
>  {
>  	dout("pre_umount\n");
> -	mdsc->stopping = 1;
> +	mdsc->stopping = CEPH_MDSC_STOPPING_BEGAIN;
>  
>  	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
>  	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index 81a1f9a4ac3b..5bf32701c84c 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -398,6 +398,11 @@ struct cap_wait {
>  	int			want;
>  };
>  
> +enum {
> +	CEPH_MDSC_STOPPING_BEGAIN = 1,

Typo: I think you want to define this as "CEPH_MDSC_STOPPING_BEGIN"
instead, right?

> +	CEPH_MDSC_STOPPING_FLUSHED = 2,
> +};
> +
>  /*
>   * mds client state
>   */
> @@ -414,7 +419,11 @@ struct ceph_mds_client {
>  	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
>  	atomic_t		num_sessions;
>  	int                     max_sessions;  /* len of sessions array */
> -	int                     stopping;      /* true if shutting down */
> +
> +	spinlock_t              stopping_lock;  /* protect snap_empty */
> +	int                     stopping;      /* the stage of shutting down */
> +	atomic_t                stopping_blockers;
> +	struct completion	stopping_waiter;
>  
>  	atomic64_t		quotarealms_count; /* # realms with quota */
>  	/*
> diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
> index 64592adfe48f..37b062783717 100644
> --- a/fs/ceph/quota.c
> +++ b/fs/ceph/quota.c
> @@ -47,6 +47,9 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>  	struct inode *inode;
>  	struct ceph_inode_info *ci;
>  
> +	if (!ceph_inc_stopping_blocker(mdsc, session))
> +		return;
> +
>  	if (msg->front.iov_len < sizeof(*h)) {
>  		pr_err("%s corrupt message mds%d len %d\n", __func__,
>  		       session->s_mds, (int)msg->front.iov_len);
> @@ -54,11 +57,6 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>  		return;
>  	}
>  
> -	/* increment msg sequence number */
> -	mutex_lock(&session->s_mutex);
> -	inc_session_sequence(session);
> -	mutex_unlock(&session->s_mutex);
> -
>  	/* lookup inode */
>  	vino.ino = le64_to_cpu(h->ino);
>  	vino.snap = CEPH_NOSNAP;
> @@ -78,6 +76,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>  	spin_unlock(&ci->i_ceph_lock);
>  
>  	iput(inode);
> +	ceph_dec_stopping_blocker(mdsc);

There are two 'return' statements in this function where function
ceph_dec_stopping_blocker() isn't being called.

>  }
>  
>  static struct ceph_quotarealm_inode *
> diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
> index aa8e0657fc03..23b31600ee3c 100644
> --- a/fs/ceph/snap.c
> +++ b/fs/ceph/snap.c
> @@ -1011,6 +1011,9 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>  	int locked_rwsem = 0;
>  	bool close_sessions = false;
>  
> +	if (!ceph_inc_stopping_blocker(mdsc, session))
> +		return;
> +
>  	/* decode */
>  	if (msg->front.iov_len < sizeof(*h))
>  		goto bad;
> @@ -1026,10 +1029,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>  	dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
>  	     mds, ceph_snap_op_name(op), split, trace_len);
>  
> -	mutex_lock(&session->s_mutex);
> -	inc_session_sequence(session);
> -	mutex_unlock(&session->s_mutex);
> -
>  	down_write(&mdsc->snap_rwsem);
>  	locked_rwsem = 1;
>  
> @@ -1134,12 +1133,15 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>  	up_write(&mdsc->snap_rwsem);
>  
>  	flush_snaps(mdsc);
> +	ceph_dec_stopping_blocker(mdsc);
>  	return;
>  
>  bad:
>  	pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
>  	ceph_msg_dump(msg);
>  out:
> +	ceph_dec_stopping_blocker(mdsc);
> +
>  	if (locked_rwsem)
>  		up_write(&mdsc->snap_rwsem);
>  
> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
> index 4b0a070d5c6d..f5ddd4abc0ab 100644
> --- a/fs/ceph/super.c
> +++ b/fs/ceph/super.c
> @@ -1467,15 +1467,72 @@ static int ceph_init_fs_context(struct fs_context *fc)
>  	return -ENOMEM;
>  }
>  
> +/*
> + * Return true if mdsc successfully increase blocker counter,
> + * or false if the mdsc is in stopping and flushed state.
> + */
> +bool ceph_inc_stopping_blocker(struct ceph_mds_client *mdsc,
> +			       struct ceph_mds_session *session)
> +{
> +	mutex_lock(&session->s_mutex);
> +	inc_session_sequence(session);
> +	mutex_unlock(&session->s_mutex);
> +
> +	spin_lock(&mdsc->stopping_lock);
> +	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) {
> +		spin_unlock(&mdsc->stopping_lock);
> +		return false;
> +	}
> +	atomic_inc(&mdsc->stopping_blockers);
> +	spin_unlock(&mdsc->stopping_lock);
> +	return true;
> +}
> +
> +void ceph_dec_stopping_blocker(struct ceph_mds_client *mdsc)
> +{
> +	spin_lock(&mdsc->stopping_lock);
> +	if (!atomic_dec_return(&mdsc->stopping_blockers) &&
> +	    mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
> +		complete_all(&mdsc->stopping_waiter);
> +	spin_unlock(&mdsc->stopping_lock);
> +}
> +
>  static void ceph_kill_sb(struct super_block *s)
>  {
>  	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
> +	bool wait;
>  
>  	dout("kill_sb %p\n", s);
>  
>  	ceph_mdsc_pre_umount(fsc->mdsc);
>  	flush_fs_workqueues(fsc);
>  
> +	/*
> +	 * Though the kill_anon_super() will finally trigger the
> +	 * sync_filesystem() anyway, we still need to do it here and
> +	 * then bump the stage of shutdown. This will allow us to
> +	 * drop any further message, which will increase the inodes'
> +	 * i_count reference counters but makes no sense any more,
> +	 * from MDSs.
> +	 *
> +	 * Without this when evicting the inodes it may fail in the
> +	 * kill_anon_super(), which will trigger a warning when
> +	 * destroying the fscrypt keyring and then possibly trigger
> +	 * a further crash in ceph module when the iput() tries to
> +	 * evict the inodes later.
> +	 */
> +	sync_filesystem(s);
> +
> +	spin_lock(&fsc->mdsc->stopping_lock);
> +	fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
> +	wait = !!atomic_read(&fsc->mdsc->stopping_blockers);
> +	spin_unlock(&fsc->mdsc->stopping_lock);
> +
> +	while (wait || atomic_read(&fsc->mdsc->stopping_blockers)) {
> +		wait = false;
> +		wait_for_completion(&fsc->mdsc->stopping_waiter);
> +	}

This is a odd construct :-) I'd rather see it like this:

	if (wait) {
		while (atomic_read(&fsc->mdsc->stopping_blockers))
			wait_for_completion(&fsc->mdsc->stopping_waiter);
	}

But... even with my suggestion, can't we deadlock here if we get preempted
just before the wait_for_completion() and ceph_dec_stopping_blocker() is
executed?

Cheers,
Xiubo Li March 30, 2023, 1:33 a.m. UTC | #2
On 29/03/2023 22:41, Luís Henriques wrote:
> Hi!
>
> I finally got to have a look at this patch because I found that the
> version you sent along with fscrypt v17 is a bit different from what's in
> the testing branch :-)

Yeah, after v17 I updated this and force pushed to the testing branch to 
run the test.

> xiubli@redhat.com writes:
>
>> From: Xiubo Li <xiubli@redhat.com>
>>
>> When unmounting and all the dirty buffer will be flushed and after
>> the last osd request is finished the last reference of the i_count
>> will be released. Then it will flush the dirty cap/snap to MDSs,
>> and the unmounting won't wait the possible acks, which will ihode
>> the inodes when updating the metadata locally but makes no sense
>> any more, of this. This will make the evict_inodes() to skip these
>> inodes.
>>
>> If encrypt is enabled the kernel generate a warning when removing
>> the encrypt keys when the skipped inodes still hold the keyring:
>>
>> WARNING: CPU: 4 PID: 168846 at fs/crypto/keyring.c:242 fscrypt_destroy_keyring+0x7e/0xd0
>> CPU: 4 PID: 168846 Comm: umount Tainted: G S  6.1.0-rc5-ceph-g72ead199864c #1
>> Hardware name: Supermicro SYS-5018R-WR/X10SRW-F, BIOS 2.0 12/17/2015
>> RIP: 0010:fscrypt_destroy_keyring+0x7e/0xd0
>> RSP: 0018:ffffc9000b277e28 EFLAGS: 00010202
>> RAX: 0000000000000002 RBX: ffff88810d52ac00 RCX: ffff88810b56aa00
>> RDX: 0000000080000000 RSI: ffffffff822f3a09 RDI: ffff888108f59000
>> RBP: ffff8881d394fb88 R08: 0000000000000028 R09: 0000000000000000
>> R10: 0000000000000001 R11: 11ff4fe6834fcd91 R12: ffff8881d394fc40
>> R13: ffff888108f59000 R14: ffff8881d394f800 R15: 0000000000000000
>> FS:  00007fd83f6f1080(0000) GS:ffff88885fd00000(0000) knlGS:0000000000000000
>> CS:  0010 DS: 0000 ES: 0000 CR0: 0000000080050033
>> CR2: 00007f918d417000 CR3: 000000017f89a005 CR4: 00000000003706e0
>> DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
>> DR3: 0000000000000000 DR6: 00000000fffe0ff0 DR7: 0000000000000400
>> Call Trace:
>> <TASK>
>> generic_shutdown_super+0x47/0x120
>> kill_anon_super+0x14/0x30
>> ceph_kill_sb+0x36/0x90 [ceph]
>> deactivate_locked_super+0x29/0x60
>> cleanup_mnt+0xb8/0x140
>> task_work_run+0x67/0xb0
>> exit_to_user_mode_prepare+0x23d/0x240
>> syscall_exit_to_user_mode+0x25/0x60
>> do_syscall_64+0x40/0x80
>> entry_SYSCALL_64_after_hwframe+0x63/0xcd
>> RIP: 0033:0x7fd83dc39e9b
>>
>> Later the kernel will crash when iput() the inodes and dereferencing
>> the "sb->s_master_keys", which has been released by the
>> generic_shutdown_super().
>>
>> URL: https://tracker.ceph.com/issues/58126
>> URL: https://tracker.ceph.com/issues/59162
>> Signed-off-by: Xiubo Li <xiubli@redhat.com>
>> ---
>>
>> Only updated the 2/2 patch since series V3:
>>
>> Changed in V4:
>> - Always resend the session close requests to MDS when receives
>>    new messages just before dropping them.
>> - Even receives a corrupted message we should increase the s_seq
>>    and resend the session close requests.
>>
>>
>>   fs/ceph/caps.c       |  6 ++++-
>>   fs/ceph/mds_client.c | 14 ++++++++---
>>   fs/ceph/mds_client.h | 11 ++++++++-
>>   fs/ceph/quota.c      |  9 ++++---
>>   fs/ceph/snap.c       | 10 ++++----
>>   fs/ceph/super.c      | 57 ++++++++++++++++++++++++++++++++++++++++++++
>>   fs/ceph/super.h      |  3 +++
>>   7 files changed, 96 insertions(+), 14 deletions(-)
>>
>> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
>> index 6379c0070492..e1bb6d9c16f8 100644
>> --- a/fs/ceph/caps.c
>> +++ b/fs/ceph/caps.c
>> @@ -4222,6 +4222,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>>   
>>   	dout("handle_caps from mds%d\n", session->s_mds);
>>   
>> +	if (!ceph_inc_stopping_blocker(mdsc, session))
>> +		return;
>> +
>>   	/* decode */
>>   	end = msg->front.iov_base + msg->front.iov_len;
>>   	if (msg->front.iov_len < sizeof(*h))
>> @@ -4323,7 +4326,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>>   	     vino.snap, inode);
>>   
>>   	mutex_lock(&session->s_mutex);
>> -	inc_session_sequence(session);
>>   	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
>>   	     (unsigned)seq);
>>   
>> @@ -4435,6 +4437,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>>   done_unlocked:
>>   	iput(inode);
>>   out:
>> +	ceph_dec_stopping_blocker(mdsc);
>> +
>>   	ceph_put_string(extra_info.pool_ns);
>>   
>>   	/* Defer closing the sessions after s_mutex lock being released */
>> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
>> index 85d639f75ea1..21ca41e5f68b 100644
>> --- a/fs/ceph/mds_client.c
>> +++ b/fs/ceph/mds_client.c
>> @@ -4877,6 +4877,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>>   
>>   	dout("handle_lease from mds%d\n", mds);
>>   
>> +	if (!ceph_inc_stopping_blocker(mdsc, session))
>> +		return;
>> +
>>   	/* decode */
>>   	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
>>   		goto bad;
>> @@ -4895,8 +4898,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>>   	     dname.len, dname.name);
>>   
>>   	mutex_lock(&session->s_mutex);
>> -	inc_session_sequence(session);
>> -
>>   	if (!inode) {
>>   		dout("handle_lease no inode %llx\n", vino.ino);
>>   		goto release;
>> @@ -4958,9 +4959,13 @@ static void handle_lease(struct ceph_mds_client *mdsc,
>>   out:
>>   	mutex_unlock(&session->s_mutex);
>>   	iput(inode);
>> +
>> +	ceph_dec_stopping_blocker(mdsc);
>>   	return;
>>   
>>   bad:
>> +	ceph_dec_stopping_blocker(mdsc);
>> +
>>   	pr_err("corrupt lease message\n");
>>   	ceph_msg_dump(msg);
>>   }
>> @@ -5156,6 +5161,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
>>   	}
>>   
>>   	init_completion(&mdsc->safe_umount_waiters);
>> +	spin_lock_init(&mdsc->stopping_lock);
>> +	atomic_set(&mdsc->stopping_blockers, 0);
>> +	init_completion(&mdsc->stopping_waiter);
>>   	init_waitqueue_head(&mdsc->session_close_wq);
>>   	INIT_LIST_HEAD(&mdsc->waiting_for_map);
>>   	mdsc->quotarealms_inodes = RB_ROOT;
>> @@ -5270,7 +5278,7 @@ void send_flush_mdlog(struct ceph_mds_session *s)
>>   void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
>>   {
>>   	dout("pre_umount\n");
>> -	mdsc->stopping = 1;
>> +	mdsc->stopping = CEPH_MDSC_STOPPING_BEGAIN;
>>   
>>   	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
>>   	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
>> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
>> index 81a1f9a4ac3b..5bf32701c84c 100644
>> --- a/fs/ceph/mds_client.h
>> +++ b/fs/ceph/mds_client.h
>> @@ -398,6 +398,11 @@ struct cap_wait {
>>   	int			want;
>>   };
>>   
>> +enum {
>> +	CEPH_MDSC_STOPPING_BEGAIN = 1,
> Typo: I think you want to define this as "CEPH_MDSC_STOPPING_BEGIN"
> instead, right?

Yeah, good catch.

>> +	CEPH_MDSC_STOPPING_FLUSHED = 2,
>> +};
>> +
>>   /*
>>    * mds client state
>>    */
>> @@ -414,7 +419,11 @@ struct ceph_mds_client {
>>   	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
>>   	atomic_t		num_sessions;
>>   	int                     max_sessions;  /* len of sessions array */
>> -	int                     stopping;      /* true if shutting down */
>> +
>> +	spinlock_t              stopping_lock;  /* protect snap_empty */
>> +	int                     stopping;      /* the stage of shutting down */
>> +	atomic_t                stopping_blockers;
>> +	struct completion	stopping_waiter;
>>   
>>   	atomic64_t		quotarealms_count; /* # realms with quota */
>>   	/*
>> diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
>> index 64592adfe48f..37b062783717 100644
>> --- a/fs/ceph/quota.c
>> +++ b/fs/ceph/quota.c
>> @@ -47,6 +47,9 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>>   	struct inode *inode;
>>   	struct ceph_inode_info *ci;
>>   
>> +	if (!ceph_inc_stopping_blocker(mdsc, session))
>> +		return;
>> +
>>   	if (msg->front.iov_len < sizeof(*h)) {
>>   		pr_err("%s corrupt message mds%d len %d\n", __func__,
>>   		       session->s_mds, (int)msg->front.iov_len);
>> @@ -54,11 +57,6 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>>   		return;
>>   	}
>>   
>> -	/* increment msg sequence number */
>> -	mutex_lock(&session->s_mutex);
>> -	inc_session_sequence(session);
>> -	mutex_unlock(&session->s_mutex);
>> -
>>   	/* lookup inode */
>>   	vino.ino = le64_to_cpu(h->ino);
>>   	vino.snap = CEPH_NOSNAP;
>> @@ -78,6 +76,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
>>   	spin_unlock(&ci->i_ceph_lock);
>>   
>>   	iput(inode);
>> +	ceph_dec_stopping_blocker(mdsc);
> There are two 'return' statements in this function where function
> ceph_dec_stopping_blocker() isn't being called.
Good catch.
>>   }
>>   
>>   static struct ceph_quotarealm_inode *
>> diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
>> index aa8e0657fc03..23b31600ee3c 100644
>> --- a/fs/ceph/snap.c
>> +++ b/fs/ceph/snap.c
>> @@ -1011,6 +1011,9 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>>   	int locked_rwsem = 0;
>>   	bool close_sessions = false;
>>   
>> +	if (!ceph_inc_stopping_blocker(mdsc, session))
>> +		return;
>> +
>>   	/* decode */
>>   	if (msg->front.iov_len < sizeof(*h))
>>   		goto bad;
>> @@ -1026,10 +1029,6 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>>   	dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
>>   	     mds, ceph_snap_op_name(op), split, trace_len);
>>   
>> -	mutex_lock(&session->s_mutex);
>> -	inc_session_sequence(session);
>> -	mutex_unlock(&session->s_mutex);
>> -
>>   	down_write(&mdsc->snap_rwsem);
>>   	locked_rwsem = 1;
>>   
>> @@ -1134,12 +1133,15 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
>>   	up_write(&mdsc->snap_rwsem);
>>   
>>   	flush_snaps(mdsc);
>> +	ceph_dec_stopping_blocker(mdsc);
>>   	return;
>>   
>>   bad:
>>   	pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
>>   	ceph_msg_dump(msg);
>>   out:
>> +	ceph_dec_stopping_blocker(mdsc);
>> +
>>   	if (locked_rwsem)
>>   		up_write(&mdsc->snap_rwsem);
>>   
>> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
>> index 4b0a070d5c6d..f5ddd4abc0ab 100644
>> --- a/fs/ceph/super.c
>> +++ b/fs/ceph/super.c
>> @@ -1467,15 +1467,72 @@ static int ceph_init_fs_context(struct fs_context *fc)
>>   	return -ENOMEM;
>>   }
>>   
>> +/*
>> + * Return true if mdsc successfully increase blocker counter,
>> + * or false if the mdsc is in stopping and flushed state.
>> + */
>> +bool ceph_inc_stopping_blocker(struct ceph_mds_client *mdsc,
>> +			       struct ceph_mds_session *session)
>> +{
>> +	mutex_lock(&session->s_mutex);
>> +	inc_session_sequence(session);
>> +	mutex_unlock(&session->s_mutex);
>> +
>> +	spin_lock(&mdsc->stopping_lock);
>> +	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) {
>> +		spin_unlock(&mdsc->stopping_lock);
>> +		return false;
>> +	}
>> +	atomic_inc(&mdsc->stopping_blockers);
>> +	spin_unlock(&mdsc->stopping_lock);
>> +	return true;
>> +}
>> +
>> +void ceph_dec_stopping_blocker(struct ceph_mds_client *mdsc)
>> +{
>> +	spin_lock(&mdsc->stopping_lock);
>> +	if (!atomic_dec_return(&mdsc->stopping_blockers) &&
>> +	    mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
>> +		complete_all(&mdsc->stopping_waiter);
>> +	spin_unlock(&mdsc->stopping_lock);
>> +}
>> +
>>   static void ceph_kill_sb(struct super_block *s)
>>   {
>>   	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
>> +	bool wait;
>>   
>>   	dout("kill_sb %p\n", s);
>>   
>>   	ceph_mdsc_pre_umount(fsc->mdsc);
>>   	flush_fs_workqueues(fsc);
>>   
>> +	/*
>> +	 * Though the kill_anon_super() will finally trigger the
>> +	 * sync_filesystem() anyway, we still need to do it here and
>> +	 * then bump the stage of shutdown. This will allow us to
>> +	 * drop any further message, which will increase the inodes'
>> +	 * i_count reference counters but makes no sense any more,
>> +	 * from MDSs.
>> +	 *
>> +	 * Without this when evicting the inodes it may fail in the
>> +	 * kill_anon_super(), which will trigger a warning when
>> +	 * destroying the fscrypt keyring and then possibly trigger
>> +	 * a further crash in ceph module when the iput() tries to
>> +	 * evict the inodes later.
>> +	 */
>> +	sync_filesystem(s);
>> +
>> +	spin_lock(&fsc->mdsc->stopping_lock);
>> +	fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
>> +	wait = !!atomic_read(&fsc->mdsc->stopping_blockers);
>> +	spin_unlock(&fsc->mdsc->stopping_lock);
>> +
>> +	while (wait || atomic_read(&fsc->mdsc->stopping_blockers)) {
>> +		wait = false;
>> +		wait_for_completion(&fsc->mdsc->stopping_waiter);
>> +	}
> This is a odd construct :-) I'd rather see it like this:
>
> 	if (wait) {
> 		while (atomic_read(&fsc->mdsc->stopping_blockers))
> 			wait_for_completion(&fsc->mdsc->stopping_waiter);
> 	}
This also looks good to me. As I remembered when reading the kernel code 
I just saw another place in VFS is also doing like this and I just 
followed it.
> But... even with my suggestion, can't we deadlock here if we get preempted
> just before the wait_for_completion() and ceph_dec_stopping_blocker() is
> executed?

Good question. Actually it won't.

The 'x->done' member in the completion will be initialized as 0, and the 
complete_all() will set it to 'UINT_MAX'. And only when 'x->done == 0' 
will the wait_for_completion() go to sleep and wait.

Thanks

- Xiubo

>
> Cheers,
Luis Henriques March 30, 2023, 8:59 a.m. UTC | #3
Xiubo Li <xiubli@redhat.com> writes:
> On 29/03/2023 22:41, Luís Henriques wrote:
<...>
> This also looks good to me. As I remembered when reading the kernel code I just
> saw another place in VFS is also doing like this and I just followed it.
>> But... even with my suggestion, can't we deadlock here if we get preempted
>> just before the wait_for_completion() and ceph_dec_stopping_blocker() is
>> executed?
>
> Good question. Actually it won't.

Yeah, you're right.  Doh!

> The 'x->done' member in the completion will be initialized as 0, and the
> complete_all() will set it to 'UINT_MAX'. And only when 'x->done == 0' will the
> wait_for_completion() go to sleep and wait.

Cheers,
diff mbox series

Patch

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6379c0070492..e1bb6d9c16f8 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -4222,6 +4222,9 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 
 	dout("handle_caps from mds%d\n", session->s_mds);
 
+	if (!ceph_inc_stopping_blocker(mdsc, session))
+		return;
+
 	/* decode */
 	end = msg->front.iov_base + msg->front.iov_len;
 	if (msg->front.iov_len < sizeof(*h))
@@ -4323,7 +4326,6 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 	     vino.snap, inode);
 
 	mutex_lock(&session->s_mutex);
-	inc_session_sequence(session);
 	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
 	     (unsigned)seq);
 
@@ -4435,6 +4437,8 @@  void ceph_handle_caps(struct ceph_mds_session *session,
 done_unlocked:
 	iput(inode);
 out:
+	ceph_dec_stopping_blocker(mdsc);
+
 	ceph_put_string(extra_info.pool_ns);
 
 	/* Defer closing the sessions after s_mutex lock being released */
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 85d639f75ea1..21ca41e5f68b 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -4877,6 +4877,9 @@  static void handle_lease(struct ceph_mds_client *mdsc,
 
 	dout("handle_lease from mds%d\n", mds);
 
+	if (!ceph_inc_stopping_blocker(mdsc, session))
+		return;
+
 	/* decode */
 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
 		goto bad;
@@ -4895,8 +4898,6 @@  static void handle_lease(struct ceph_mds_client *mdsc,
 	     dname.len, dname.name);
 
 	mutex_lock(&session->s_mutex);
-	inc_session_sequence(session);
-
 	if (!inode) {
 		dout("handle_lease no inode %llx\n", vino.ino);
 		goto release;
@@ -4958,9 +4959,13 @@  static void handle_lease(struct ceph_mds_client *mdsc,
 out:
 	mutex_unlock(&session->s_mutex);
 	iput(inode);
+
+	ceph_dec_stopping_blocker(mdsc);
 	return;
 
 bad:
+	ceph_dec_stopping_blocker(mdsc);
+
 	pr_err("corrupt lease message\n");
 	ceph_msg_dump(msg);
 }
@@ -5156,6 +5161,9 @@  int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	}
 
 	init_completion(&mdsc->safe_umount_waiters);
+	spin_lock_init(&mdsc->stopping_lock);
+	atomic_set(&mdsc->stopping_blockers, 0);
+	init_completion(&mdsc->stopping_waiter);
 	init_waitqueue_head(&mdsc->session_close_wq);
 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
 	mdsc->quotarealms_inodes = RB_ROOT;
@@ -5270,7 +5278,7 @@  void send_flush_mdlog(struct ceph_mds_session *s)
 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
 {
 	dout("pre_umount\n");
-	mdsc->stopping = 1;
+	mdsc->stopping = CEPH_MDSC_STOPPING_BEGAIN;
 
 	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
 	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 81a1f9a4ac3b..5bf32701c84c 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -398,6 +398,11 @@  struct cap_wait {
 	int			want;
 };
 
+enum {
+	CEPH_MDSC_STOPPING_BEGAIN = 1,
+	CEPH_MDSC_STOPPING_FLUSHED = 2,
+};
+
 /*
  * mds client state
  */
@@ -414,7 +419,11 @@  struct ceph_mds_client {
 	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
 	atomic_t		num_sessions;
 	int                     max_sessions;  /* len of sessions array */
-	int                     stopping;      /* true if shutting down */
+
+	spinlock_t              stopping_lock;  /* protect snap_empty */
+	int                     stopping;      /* the stage of shutting down */
+	atomic_t                stopping_blockers;
+	struct completion	stopping_waiter;
 
 	atomic64_t		quotarealms_count; /* # realms with quota */
 	/*
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index 64592adfe48f..37b062783717 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -47,6 +47,9 @@  void ceph_handle_quota(struct ceph_mds_client *mdsc,
 	struct inode *inode;
 	struct ceph_inode_info *ci;
 
+	if (!ceph_inc_stopping_blocker(mdsc, session))
+		return;
+
 	if (msg->front.iov_len < sizeof(*h)) {
 		pr_err("%s corrupt message mds%d len %d\n", __func__,
 		       session->s_mds, (int)msg->front.iov_len);
@@ -54,11 +57,6 @@  void ceph_handle_quota(struct ceph_mds_client *mdsc,
 		return;
 	}
 
-	/* increment msg sequence number */
-	mutex_lock(&session->s_mutex);
-	inc_session_sequence(session);
-	mutex_unlock(&session->s_mutex);
-
 	/* lookup inode */
 	vino.ino = le64_to_cpu(h->ino);
 	vino.snap = CEPH_NOSNAP;
@@ -78,6 +76,7 @@  void ceph_handle_quota(struct ceph_mds_client *mdsc,
 	spin_unlock(&ci->i_ceph_lock);
 
 	iput(inode);
+	ceph_dec_stopping_blocker(mdsc);
 }
 
 static struct ceph_quotarealm_inode *
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index aa8e0657fc03..23b31600ee3c 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -1011,6 +1011,9 @@  void ceph_handle_snap(struct ceph_mds_client *mdsc,
 	int locked_rwsem = 0;
 	bool close_sessions = false;
 
+	if (!ceph_inc_stopping_blocker(mdsc, session))
+		return;
+
 	/* decode */
 	if (msg->front.iov_len < sizeof(*h))
 		goto bad;
@@ -1026,10 +1029,6 @@  void ceph_handle_snap(struct ceph_mds_client *mdsc,
 	dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
 	     mds, ceph_snap_op_name(op), split, trace_len);
 
-	mutex_lock(&session->s_mutex);
-	inc_session_sequence(session);
-	mutex_unlock(&session->s_mutex);
-
 	down_write(&mdsc->snap_rwsem);
 	locked_rwsem = 1;
 
@@ -1134,12 +1133,15 @@  void ceph_handle_snap(struct ceph_mds_client *mdsc,
 	up_write(&mdsc->snap_rwsem);
 
 	flush_snaps(mdsc);
+	ceph_dec_stopping_blocker(mdsc);
 	return;
 
 bad:
 	pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
 	ceph_msg_dump(msg);
 out:
+	ceph_dec_stopping_blocker(mdsc);
+
 	if (locked_rwsem)
 		up_write(&mdsc->snap_rwsem);
 
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 4b0a070d5c6d..f5ddd4abc0ab 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -1467,15 +1467,72 @@  static int ceph_init_fs_context(struct fs_context *fc)
 	return -ENOMEM;
 }
 
+/*
+ * Return true if mdsc successfully increase blocker counter,
+ * or false if the mdsc is in stopping and flushed state.
+ */
+bool ceph_inc_stopping_blocker(struct ceph_mds_client *mdsc,
+			       struct ceph_mds_session *session)
+{
+	mutex_lock(&session->s_mutex);
+	inc_session_sequence(session);
+	mutex_unlock(&session->s_mutex);
+
+	spin_lock(&mdsc->stopping_lock);
+	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED) {
+		spin_unlock(&mdsc->stopping_lock);
+		return false;
+	}
+	atomic_inc(&mdsc->stopping_blockers);
+	spin_unlock(&mdsc->stopping_lock);
+	return true;
+}
+
+void ceph_dec_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+	spin_lock(&mdsc->stopping_lock);
+	if (!atomic_dec_return(&mdsc->stopping_blockers) &&
+	    mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
+		complete_all(&mdsc->stopping_waiter);
+	spin_unlock(&mdsc->stopping_lock);
+}
+
 static void ceph_kill_sb(struct super_block *s)
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
+	bool wait;
 
 	dout("kill_sb %p\n", s);
 
 	ceph_mdsc_pre_umount(fsc->mdsc);
 	flush_fs_workqueues(fsc);
 
+	/*
+	 * Though the kill_anon_super() will finally trigger the
+	 * sync_filesystem() anyway, we still need to do it here and
+	 * then bump the stage of shutdown. This will allow us to
+	 * drop any further message, which will increase the inodes'
+	 * i_count reference counters but makes no sense any more,
+	 * from MDSs.
+	 *
+	 * Without this when evicting the inodes it may fail in the
+	 * kill_anon_super(), which will trigger a warning when
+	 * destroying the fscrypt keyring and then possibly trigger
+	 * a further crash in ceph module when the iput() tries to
+	 * evict the inodes later.
+	 */
+	sync_filesystem(s);
+
+	spin_lock(&fsc->mdsc->stopping_lock);
+	fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
+	wait = !!atomic_read(&fsc->mdsc->stopping_blockers);
+	spin_unlock(&fsc->mdsc->stopping_lock);
+
+	while (wait || atomic_read(&fsc->mdsc->stopping_blockers)) {
+		wait = false;
+		wait_for_completion(&fsc->mdsc->stopping_waiter);
+	}
+
 	kill_anon_super(s);
 
 	fsc->client->extra_mon_dispatch = NULL;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a785e5cb9b40..8a9afa81a76f 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1398,4 +1398,7 @@  extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
 				     struct kstatfs *buf);
 extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
 
+bool ceph_inc_stopping_blocker(struct ceph_mds_client *mdsc,
+			       struct ceph_mds_session *session);
+void ceph_dec_stopping_blocker(struct ceph_mds_client *mdsc);
 #endif /* _FS_CEPH_SUPER_H */