[1/2] ceph: add mount option to limit caps count
diff mbox series

Message ID 20190115080305.37000-1-zyan@redhat.com
State New
Headers show
Series
  • [1/2] ceph: add mount option to limit caps count
Related show

Commit Message

Yan, Zheng Jan. 15, 2019, 8:03 a.m. UTC
If number of caps exceed the limit, unsed dentries are trimmed by
s_shrink.scan_objects when unreserving caps, or by ceph_d_delete
when releasing dentry's last reference. Trimming dentry releases
references to associated inode, which may evict inode and release
caps.

By default, there is no limit for caps count.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
---
 Documentation/filesystems/ceph.txt |  4 +++
 fs/ceph/caps.c                     | 46 +++++++++++++++++++++++++-----
 fs/ceph/dir.c                      | 29 +++++++++++++++++++
 fs/ceph/mds_client.c               | 21 ++++++++------
 fs/ceph/mds_client.h               |  1 +
 fs/ceph/super.c                    | 12 ++++++--
 fs/ceph/super.h                    |  5 ++--
 include/linux/ceph/types.h         |  1 +
 8 files changed, 99 insertions(+), 20 deletions(-)

Comments

Jeff Layton Jan. 15, 2019, 2:52 p.m. UTC | #1
On Tue, 2019-01-15 at 16:03 +0800, Yan, Zheng wrote:
> If number of caps exceed the limit, unsed dentries are trimmed by
> s_shrink.scan_objects when unreserving caps, or by ceph_d_delete
> when releasing dentry's last reference. Trimming dentry releases
> references to associated inode, which may evict inode and release
> caps.
> 
> By default, there is no limit for caps count.
> 
> Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
> ---
>  Documentation/filesystems/ceph.txt |  4 +++
>  fs/ceph/caps.c                     | 46 +++++++++++++++++++++++++-----
>  fs/ceph/dir.c                      | 29 +++++++++++++++++++
>  fs/ceph/mds_client.c               | 21 ++++++++------
>  fs/ceph/mds_client.h               |  1 +
>  fs/ceph/super.c                    | 12 ++++++--
>  fs/ceph/super.h                    |  5 ++--
>  include/linux/ceph/types.h         |  1 +
>  8 files changed, 99 insertions(+), 20 deletions(-)
> 
> diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
> index 1177052701e1..bc4145ee5dba 100644
> --- a/Documentation/filesystems/ceph.txt
> +++ b/Documentation/filesystems/ceph.txt
> @@ -118,6 +118,10 @@ Mount Options
>  	of a non-responsive Ceph file system.  The default is 30
>  	seconds.
>  
> +  caps_max=X
> +	Specify the maximum number of caps to hold. Unused caps are released
> +	when number of caps exceeds the limit. The default is 0 (no limit)
> +
>    rbytes
>  	When stat() is called on a directory, set st_size to 'rbytes',
>  	the summation of file sizes over all files nested beneath that
> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
> index 0eaf1b48c431..ef57491157fc 100644
> --- a/fs/ceph/caps.c
> +++ b/fs/ceph/caps.c
> @@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
>  	spin_unlock(&mdsc->caps_list_lock);
>  }
>  
> -void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
> +void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
> +			      struct ceph_mount_options *fsopt)
>  {
>  	spin_lock(&mdsc->caps_list_lock);
> -	mdsc->caps_min_count += delta;
> -	BUG_ON(mdsc->caps_min_count < 0);
> +	mdsc->caps_min_count = fsopt->max_readdir;
> +	if (mdsc->caps_min_count < 1024)
> +		mdsc->caps_min_count = 1024;
> +	mdsc->caps_use_max = fsopt->caps_max;
> +	if (mdsc->caps_use_max > 0 &&
> +	    mdsc->caps_use_max < mdsc->caps_min_count)
> +		mdsc->caps_use_max = mdsc->caps_min_count;
>  	spin_unlock(&mdsc->caps_list_lock);
>  }
>  
> @@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
>  	if (!err) {
>  		BUG_ON(have + alloc != need);
>  		ctx->count = need;
> +		ctx->used = 0;
>  	}
>  
>  	spin_lock(&mdsc->caps_list_lock);
> @@ -294,14 +301,38 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
>  	return err;
>  }
>  
> +static void __shrink_inodes(struct super_block *sb, int nr)
> +{
> +	struct shrink_control sc = {
> +		.gfp_mask = GFP_KERNEL,
> +		.nr_to_scan = nr,
> +	};
> +	sb->s_shrink.scan_objects(&sb->s_shrink, &sc);
> +}
> +
>  void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
> -			struct ceph_cap_reservation *ctx)
> +			 struct ceph_cap_reservation *ctx)
>  {
> +	int nr_to_trim = 0;
> +
> +	if (ctx->count == 0)
> +		return;
> +
>  	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
>  	spin_lock(&mdsc->caps_list_lock);
>  	__ceph_unreserve_caps(mdsc, ctx->count);
>  	ctx->count = 0;
> +
> +	if (mdsc->caps_use_max > 0 &&
> +	    mdsc->caps_use_count > mdsc->caps_use_max) {
> +		nr_to_trim = mdsc->caps_use_count - mdsc->caps_use_max;
> +		/* trim a little more */
> +		nr_to_trim = min(nr_to_trim + 64, ctx->used);
> +	}
>  	spin_unlock(&mdsc->caps_list_lock);
> +
> +	if (nr_to_trim > 0)
> +		__shrink_inodes(mdsc->fsc->sb, nr_to_trim);
>  }
>  
>  struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
> @@ -346,6 +377,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
>  	BUG_ON(list_empty(&mdsc->caps_list));
>  
>  	ctx->count--;
> +	ctx->used++;
>  	mdsc->caps_reserve_count--;
>  	mdsc->caps_use_count++;
>  
> @@ -500,12 +532,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
>  static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
>  			       struct ceph_inode_info *ci)
>  {
> -	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
> +	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
>  
>  	ci->i_hold_caps_min = round_jiffies(jiffies +
> -					    ma->caps_wanted_delay_min * HZ);
> +					    opt->caps_wanted_delay_min * HZ);
>  	ci->i_hold_caps_max = round_jiffies(jiffies +
> -					    ma->caps_wanted_delay_max * HZ);
> +					    opt->caps_wanted_delay_max * HZ);
>  	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
>  	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
>  }
> diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
> index 82928cea0209..1328aec876cb 100644
> --- a/fs/ceph/dir.c
> +++ b/fs/ceph/dir.c
> @@ -1308,6 +1308,34 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
>  	return valid;
>  }
>  
> +/*
> + * Delete unused dentry and associated inode when there are too many caps
> + *
> + * Called under dentry->d_lock.
> + */
> +static int ceph_d_delete(const struct dentry *dentry)
> +{
> +	struct ceph_mds_client *mdsc;
> +	int ret = 0;
> +
> +	if (d_really_is_negative(dentry))
> +		return 0;
> +
> +	if (!ceph_is_any_caps(d_inode(dentry)))
> +		return 0;
> +
> +	mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
> +	if (mdsc->caps_use_max == 0)
> +		return 0;
> +
> +	spin_lock(&mdsc->caps_list_lock);
> +	if (mdsc->caps_use_max > 0 &&
> +	    mdsc->caps_use_count > mdsc->caps_use_max)
> +		ret = 1;
> +	spin_unlock(&mdsc->caps_list_lock);
> +	return ret;
> +}
> +

I wonder if the above is really desirable? If we just happen to be over
the limit at the time this is called, then we'll end up deleting the
most-recently-used dentry.

Would it be better to leave off this part and just kick the shrinker if
it looks like we're over the limit? We might end up exceeding the cap
limit a bit, but presumably that should be corrected soon afterward once
the shrinker kicks in.

>  /*
>   * Release our ceph_dentry_info.
>   */
> @@ -1531,6 +1559,7 @@ const struct inode_operations ceph_snapdir_iops = {
>  
>  const struct dentry_operations ceph_dentry_ops = {
>  	.d_revalidate = ceph_d_revalidate,
> +	.d_delete = ceph_d_delete,
>  	.d_release = ceph_d_release,
>  	.d_prune = ceph_d_prune,
>  	.d_init = ceph_d_init,
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index c2a453473b69..c9f1b1e8fa03 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -2811,7 +2811,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
>  		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
>  				    req->r_op == CEPH_MDS_OP_LSSNAP))
>  			ceph_readdir_prepopulate(req, req->r_session);
> -		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
>  	}
>  	current->journal_info = NULL;
>  	mutex_unlock(&req->r_fill_mutex);
> @@ -2820,12 +2819,18 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
>  	if (realm)
>  		ceph_put_snap_realm(mdsc, realm);
>  
> -	if (err == 0 && req->r_target_inode &&
> -	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
> -		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
> -		spin_lock(&ci->i_unsafe_lock);
> -		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
> -		spin_unlock(&ci->i_unsafe_lock);
> +	if (err == 0) {
> +		if (req->r_target_inode &&
> +		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
> +			struct ceph_inode_info *ci =
> +				ceph_inode(req->r_target_inode);
> +			spin_lock(&ci->i_unsafe_lock);
> +			list_add_tail(&req->r_unsafe_target_item,
> +				      &ci->i_unsafe_iops);
> +			spin_unlock(&ci->i_unsafe_lock);
> +		}
> +
> +		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
>  	}
>  out_err:
>  	mutex_lock(&mdsc->mutex);
> @@ -4016,7 +4021,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
>  	INIT_LIST_HEAD(&mdsc->dentry_lru);
>  
>  	ceph_caps_init(mdsc);
> -	ceph_adjust_min_caps(mdsc, fsc->min_caps);
> +	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
>  
>  	spin_lock_init(&mdsc->snapid_map_lock);
>  	mdsc->snapid_map_tree = RB_ROOT;
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index af3b25e59e90..94fe2312c092 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -393,6 +393,7 @@ struct ceph_mds_client {
>  						unreserved) */
>  	int		caps_total_count;    /* total caps allocated */
>  	int		caps_use_count;      /* in use */
> +	int		caps_use_max;	     /* max used caps */
>  	int		caps_reserve_count;  /* unused, reserved */
>  	int		caps_avail_count;    /* unused, unreserved */
>  	int		caps_min_count;      /* keep at least this many
> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
> index da2cd8e89062..93404e3c89db 100644
> --- a/fs/ceph/super.c
> +++ b/fs/ceph/super.c
> @@ -133,6 +133,7 @@ enum {
>  	Opt_rasize,
>  	Opt_caps_wanted_delay_min,
>  	Opt_caps_wanted_delay_max,
> +	Opt_caps_max,
>  	Opt_readdir_max_entries,
>  	Opt_readdir_max_bytes,
>  	Opt_congestion_kb,
> @@ -175,6 +176,7 @@ static match_table_t fsopt_tokens = {
>  	{Opt_rasize, "rasize=%d"},
>  	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
>  	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
> +	{Opt_caps_max, "caps_max=%d"},
>  	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
>  	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
>  	{Opt_congestion_kb, "write_congestion_kb=%d"},
> @@ -286,6 +288,11 @@ static int parse_fsopt_token(char *c, void *private)
>  			return -EINVAL;
>  		fsopt->caps_wanted_delay_max = intval;
>  		break;
> +	case Opt_caps_max:
> +		if (intval < 0)
> +			return -EINVAL;
> +		fsopt->caps_max = intval;
> +		break;
>  	case Opt_readdir_max_entries:
>  		if (intval < 1)
>  			return -EINVAL;
> @@ -576,6 +583,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
>  		seq_printf(m, ",rasize=%d", fsopt->rasize);
>  	if (fsopt->congestion_kb != default_congestion_kb())
>  		seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
> +	if (fsopt->caps_max)
> +		seq_printf(m, ",caps_max=%d", fsopt->caps_max);
>  	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
>  		seq_printf(m, ",caps_wanted_delay_min=%d",
>  			 fsopt->caps_wanted_delay_min);
> @@ -680,9 +689,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
>  	if (!fsc->wb_pagevec_pool)
>  		goto fail_trunc_wq;
>  
> -	/* caps */
> -	fsc->min_caps = fsopt->max_readdir;
> -
>  	return fsc;
>  
>  fail_trunc_wq:
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index 7cec46513aa3..631b46e824a8 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -79,6 +79,7 @@ struct ceph_mount_options {
>  	int rasize;           /* max readahead */
>  	int congestion_kb;    /* max writeback in flight */
>  	int caps_wanted_delay_min, caps_wanted_delay_max;
> +	int caps_max;
>  	int max_readdir;       /* max readdir result (entires) */
>  	int max_readdir_bytes; /* max readdir result (bytes) */
>  
> @@ -100,7 +101,6 @@ struct ceph_fs_client {
>  	struct ceph_client *client;
>  
>  	unsigned long mount_state;
> -	int min_caps;                  /* min caps i added */
>  	loff_t max_file_size;
>  
>  	struct ceph_mds_client *mdsc;
> @@ -661,7 +661,8 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
>  
>  extern void ceph_caps_init(struct ceph_mds_client *mdsc);
>  extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
> -extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
> +extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
> +				     struct ceph_mount_options *fsopt);
>  extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
>  			     struct ceph_cap_reservation *ctx, int need);
>  extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
> diff --git a/include/linux/ceph/types.h b/include/linux/ceph/types.h
> index 27cd973d3881..bd3d532902d7 100644
> --- a/include/linux/ceph/types.h
> +++ b/include/linux/ceph/types.h
> @@ -24,6 +24,7 @@ struct ceph_vino {
>  /* context for the caps reservation mechanism */
>  struct ceph_cap_reservation {
>  	int count;
> +	int used;
>  };
>  
>

Patch
diff mbox series

diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 1177052701e1..bc4145ee5dba 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -118,6 +118,10 @@  Mount Options
 	of a non-responsive Ceph file system.  The default is 30
 	seconds.
 
+  caps_max=X
+	Specify the maximum number of caps to hold. Unused caps are released
+	when number of caps exceeds the limit. The default is 0 (no limit)
+
   rbytes
 	When stat() is called on a directory, set st_size to 'rbytes',
 	the summation of file sizes over all files nested beneath that
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 0eaf1b48c431..ef57491157fc 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -148,11 +148,17 @@  void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+			      struct ceph_mount_options *fsopt)
 {
 	spin_lock(&mdsc->caps_list_lock);
-	mdsc->caps_min_count += delta;
-	BUG_ON(mdsc->caps_min_count < 0);
+	mdsc->caps_min_count = fsopt->max_readdir;
+	if (mdsc->caps_min_count < 1024)
+		mdsc->caps_min_count = 1024;
+	mdsc->caps_use_max = fsopt->caps_max;
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_max < mdsc->caps_min_count)
+		mdsc->caps_use_max = mdsc->caps_min_count;
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
@@ -272,6 +278,7 @@  int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 	if (!err) {
 		BUG_ON(have + alloc != need);
 		ctx->count = need;
+		ctx->used = 0;
 	}
 
 	spin_lock(&mdsc->caps_list_lock);
@@ -294,14 +301,38 @@  int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 	return err;
 }
 
+static void __shrink_inodes(struct super_block *sb, int nr)
+{
+	struct shrink_control sc = {
+		.gfp_mask = GFP_KERNEL,
+		.nr_to_scan = nr,
+	};
+	sb->s_shrink.scan_objects(&sb->s_shrink, &sc);
+}
+
 void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
-			struct ceph_cap_reservation *ctx)
+			 struct ceph_cap_reservation *ctx)
 {
+	int nr_to_trim = 0;
+
+	if (ctx->count == 0)
+		return;
+
 	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
 	spin_lock(&mdsc->caps_list_lock);
 	__ceph_unreserve_caps(mdsc, ctx->count);
 	ctx->count = 0;
+
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max) {
+		nr_to_trim = mdsc->caps_use_count - mdsc->caps_use_max;
+		/* trim a little more */
+		nr_to_trim = min(nr_to_trim + 64, ctx->used);
+	}
 	spin_unlock(&mdsc->caps_list_lock);
+
+	if (nr_to_trim > 0)
+		__shrink_inodes(mdsc->fsc->sb, nr_to_trim);
 }
 
 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +377,7 @@  struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
 	BUG_ON(list_empty(&mdsc->caps_list));
 
 	ctx->count--;
+	ctx->used++;
 	mdsc->caps_reserve_count--;
 	mdsc->caps_use_count++;
 
@@ -500,12 +532,12 @@  static void __insert_cap_node(struct ceph_inode_info *ci,
 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
 			       struct ceph_inode_info *ci)
 {
-	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
 
 	ci->i_hold_caps_min = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_min * HZ);
+					    opt->caps_wanted_delay_min * HZ);
 	ci->i_hold_caps_max = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_max * HZ);
+					    opt->caps_wanted_delay_max * HZ);
 	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
 	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
 }
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 82928cea0209..1328aec876cb 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1308,6 +1308,34 @@  static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 	return valid;
 }
 
+/*
+ * Delete unused dentry and associated inode when there are too many caps
+ *
+ * Called under dentry->d_lock.
+ */
+static int ceph_d_delete(const struct dentry *dentry)
+{
+	struct ceph_mds_client *mdsc;
+	int ret = 0;
+
+	if (d_really_is_negative(dentry))
+		return 0;
+
+	if (!ceph_is_any_caps(d_inode(dentry)))
+		return 0;
+
+	mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	if (mdsc->caps_use_max == 0)
+		return 0;
+
+	spin_lock(&mdsc->caps_list_lock);
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max)
+		ret = 1;
+	spin_unlock(&mdsc->caps_list_lock);
+	return ret;
+}
+
 /*
  * Release our ceph_dentry_info.
  */
@@ -1531,6 +1559,7 @@  const struct inode_operations ceph_snapdir_iops = {
 
 const struct dentry_operations ceph_dentry_ops = {
 	.d_revalidate = ceph_d_revalidate,
+	.d_delete = ceph_d_delete,
 	.d_release = ceph_d_release,
 	.d_prune = ceph_d_prune,
 	.d_init = ceph_d_init,
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c2a453473b69..c9f1b1e8fa03 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2811,7 +2811,6 @@  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
 				    req->r_op == CEPH_MDS_OP_LSSNAP))
 			ceph_readdir_prepopulate(req, req->r_session);
-		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
@@ -2820,12 +2819,18 @@  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 	if (realm)
 		ceph_put_snap_realm(mdsc, realm);
 
-	if (err == 0 && req->r_target_inode &&
-	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
-		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-		spin_lock(&ci->i_unsafe_lock);
-		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
-		spin_unlock(&ci->i_unsafe_lock);
+	if (err == 0) {
+		if (req->r_target_inode &&
+		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+			struct ceph_inode_info *ci =
+				ceph_inode(req->r_target_inode);
+			spin_lock(&ci->i_unsafe_lock);
+			list_add_tail(&req->r_unsafe_target_item,
+				      &ci->i_unsafe_iops);
+			spin_unlock(&ci->i_unsafe_lock);
+		}
+
+		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 out_err:
 	mutex_lock(&mdsc->mutex);
@@ -4016,7 +4021,7 @@  int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	INIT_LIST_HEAD(&mdsc->dentry_lru);
 
 	ceph_caps_init(mdsc);
-	ceph_adjust_min_caps(mdsc, fsc->min_caps);
+	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
 
 	spin_lock_init(&mdsc->snapid_map_lock);
 	mdsc->snapid_map_tree = RB_ROOT;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index af3b25e59e90..94fe2312c092 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -393,6 +393,7 @@  struct ceph_mds_client {
 						unreserved) */
 	int		caps_total_count;    /* total caps allocated */
 	int		caps_use_count;      /* in use */
+	int		caps_use_max;	     /* max used caps */
 	int		caps_reserve_count;  /* unused, reserved */
 	int		caps_avail_count;    /* unused, unreserved */
 	int		caps_min_count;      /* keep at least this many
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index da2cd8e89062..93404e3c89db 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -133,6 +133,7 @@  enum {
 	Opt_rasize,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_max,
+	Opt_caps_max,
 	Opt_readdir_max_entries,
 	Opt_readdir_max_bytes,
 	Opt_congestion_kb,
@@ -175,6 +176,7 @@  static match_table_t fsopt_tokens = {
 	{Opt_rasize, "rasize=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+	{Opt_caps_max, "caps_max=%d"},
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
 	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
 	{Opt_congestion_kb, "write_congestion_kb=%d"},
@@ -286,6 +288,11 @@  static int parse_fsopt_token(char *c, void *private)
 			return -EINVAL;
 		fsopt->caps_wanted_delay_max = intval;
 		break;
+	case Opt_caps_max:
+		if (intval < 0)
+			return -EINVAL;
+		fsopt->caps_max = intval;
+		break;
 	case Opt_readdir_max_entries:
 		if (intval < 1)
 			return -EINVAL;
@@ -576,6 +583,8 @@  static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_printf(m, ",rasize=%d", fsopt->rasize);
 	if (fsopt->congestion_kb != default_congestion_kb())
 		seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+	if (fsopt->caps_max)
+		seq_printf(m, ",caps_max=%d", fsopt->caps_max);
 	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
 		seq_printf(m, ",caps_wanted_delay_min=%d",
 			 fsopt->caps_wanted_delay_min);
@@ -680,9 +689,6 @@  static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	if (!fsc->wb_pagevec_pool)
 		goto fail_trunc_wq;
 
-	/* caps */
-	fsc->min_caps = fsopt->max_readdir;
-
 	return fsc;
 
 fail_trunc_wq:
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 7cec46513aa3..631b46e824a8 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -79,6 +79,7 @@  struct ceph_mount_options {
 	int rasize;           /* max readahead */
 	int congestion_kb;    /* max writeback in flight */
 	int caps_wanted_delay_min, caps_wanted_delay_max;
+	int caps_max;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
 
@@ -100,7 +101,6 @@  struct ceph_fs_client {
 	struct ceph_client *client;
 
 	unsigned long mount_state;
-	int min_caps;                  /* min caps i added */
 	loff_t max_file_size;
 
 	struct ceph_mds_client *mdsc;
@@ -661,7 +661,8 @@  extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
 
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
-extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+				     struct ceph_mount_options *fsopt);
 extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_cap_reservation *ctx, int need);
 extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
diff --git a/include/linux/ceph/types.h b/include/linux/ceph/types.h
index 27cd973d3881..bd3d532902d7 100644
--- a/include/linux/ceph/types.h
+++ b/include/linux/ceph/types.h
@@ -24,6 +24,7 @@  struct ceph_vino {
 /* context for the caps reservation mechanism */
 struct ceph_cap_reservation {
 	int count;
+	int used;
 };