[V2] ceph: add mount option to limit caps count
diff mbox series

Message ID 20190116131217.3270-1-zyan@redhat.com
State New
Headers show
Series
  • [V2] ceph: add mount option to limit caps count
Related show

Commit Message

Yan, Zheng Jan. 16, 2019, 1:12 p.m. UTC
If number of caps exceed the limit, queue a work to trim unsed dentries
when unreserving caps. Trimming dentry releases references to associated
inode, which may evict inode and release caps.

By default, there is no limit for caps count.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
---
 Documentation/filesystems/ceph.txt |  4 ++
 fs/ceph/caps.c                     | 33 ++++++++---
 fs/ceph/mds_client.c               | 92 ++++++++++++++++++++++++++----
 fs/ceph/mds_client.h               |  6 ++
 fs/ceph/super.c                    | 12 +++-
 fs/ceph/super.h                    |  5 +-
 include/linux/ceph/types.h         |  1 +
 7 files changed, 131 insertions(+), 22 deletions(-)

Change since V1:
 - use worker to reclaim caps

Patch
diff mbox series

diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 1177052701e1..bc4145ee5dba 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -118,6 +118,10 @@  Mount Options
 	of a non-responsive Ceph file system.  The default is 30
 	seconds.
 
+  caps_max=X
+	Specify the maximum number of caps to hold. Unused caps are released
+	when number of caps exceeds the limit. The default is 0 (no limit)
+
   rbytes
 	When stat() is called on a directory, set st_size to 'rbytes',
 	the summation of file sizes over all files nested beneath that
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index da5b56e14cc7..9b5c7b0f3610 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -148,11 +148,17 @@  void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+			      struct ceph_mount_options *fsopt)
 {
 	spin_lock(&mdsc->caps_list_lock);
-	mdsc->caps_min_count += delta;
-	BUG_ON(mdsc->caps_min_count < 0);
+	mdsc->caps_min_count = fsopt->max_readdir;
+	if (mdsc->caps_min_count < 1024)
+		mdsc->caps_min_count = 1024;
+	mdsc->caps_use_max = fsopt->caps_max;
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_max < mdsc->caps_min_count)
+		mdsc->caps_use_max = mdsc->caps_min_count;
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
@@ -272,6 +278,7 @@  int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 	if (!err) {
 		BUG_ON(have + alloc != need);
 		ctx->count = need;
+		ctx->used = 0;
 	}
 
 	spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +302,24 @@  int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 }
 
 void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
-			struct ceph_cap_reservation *ctx)
+			 struct ceph_cap_reservation *ctx)
 {
+	bool reclaim = false;
+	if (!ctx->count)
+		return;
+
 	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
 	spin_lock(&mdsc->caps_list_lock);
 	__ceph_unreserve_caps(mdsc, ctx->count);
 	ctx->count = 0;
+
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max)
+		reclaim = true;
 	spin_unlock(&mdsc->caps_list_lock);
+
+	if (reclaim)
+		ceph_reclaim_caps_nr(mdsc, ctx->used);
 }
 
 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +364,7 @@  struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
 	BUG_ON(list_empty(&mdsc->caps_list));
 
 	ctx->count--;
+	ctx->used++;
 	mdsc->caps_reserve_count--;
 	mdsc->caps_use_count++;
 
@@ -500,12 +519,12 @@  static void __insert_cap_node(struct ceph_inode_info *ci,
 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
 			       struct ceph_inode_info *ci)
 {
-	struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
 
 	ci->i_hold_caps_min = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_min * HZ);
+					    opt->caps_wanted_delay_min * HZ);
 	ci->i_hold_caps_max = round_jiffies(jiffies +
-					    ma->caps_wanted_delay_max * HZ);
+					    opt->caps_wanted_delay_max * HZ);
 	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
 	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
 }
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 8fe7882f7b82..acbedc5861ec 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -58,6 +58,7 @@  struct ceph_reconnect_state {
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
 static void ceph_cap_release_work(struct work_struct *work);
+static void ceph_cap_reclaim_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -1932,7 +1933,7 @@  void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
  * caller holds session->s_cap_lock
  */
 void __ceph_queue_cap_release(struct ceph_mds_session *session,
-			      struct ceph_cap *cap)
+                              struct ceph_cap *cap)
 {
 	list_add_tail(&cap->session_caps, &session->s_cap_releases);
 	session->s_num_cap_releases++;
@@ -1941,6 +1942,66 @@  void __ceph_queue_cap_release(struct ceph_mds_session *session,
 		ceph_flush_cap_releases(session->s_mdsc, session);
 }
 
+static void ceph_cap_reclaim_work(struct work_struct *work)
+{
+	struct ceph_mds_client *mdsc =
+		container_of(work, struct ceph_mds_client, cap_reclaim_work);
+	struct super_block *sb = mdsc->fsc->sb;
+	struct shrink_control sc = {
+		.gfp_mask = GFP_KERNEL,
+	};
+	unsigned long nr_to_trim = 0;
+	unsigned long freeable, freed;
+
+	spin_lock(&mdsc->caps_list_lock);
+	if (mdsc->caps_use_max > 0 &&
+	    mdsc->caps_use_count > mdsc->caps_use_max)
+                nr_to_trim = mdsc->caps_use_count - mdsc->caps_use_max;
+	spin_unlock(&mdsc->caps_list_lock);
+
+	if (!nr_to_trim)
+		return;
+
+	sc.nr_to_scan = nr_to_trim;
+	freeable = sb->s_shrink.count_objects(&sb->s_shrink, &sc);
+	if (freeable == 0 || freeable == SHRINK_EMPTY)
+		return;
+
+	sc.nr_to_scan = min_t(unsigned long,
+			      CEPH_CAPS_PER_RELEASE * 2,
+			      min(nr_to_trim, freeable));
+	freed = sb->s_shrink.scan_objects(&sb->s_shrink, &sc);
+	if (freed == SHRINK_STOP)
+		return;
+
+	if (freed < nr_to_trim)
+		ceph_queue_cap_reclaim_work(mdsc);
+}
+
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
+{
+	if (mdsc->stopping)
+		return;
+
+        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
+                dout("caps reclaim work queued\n");
+        } else {
+                dout("failed to queue caps release work\n");
+        }
+}
+
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+	int val;
+	if (!nr)
+		return;
+	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+	if (!(val % CEPH_CAPS_PER_RELEASE)) {
+		atomic_set(&mdsc->cap_reclaim_pending, 0);
+		ceph_queue_cap_reclaim_work(mdsc);
+	}
+}
+
 /*
  * requests
  */
@@ -2854,7 +2915,6 @@  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
 				    req->r_op == CEPH_MDS_OP_LSSNAP))
 			ceph_readdir_prepopulate(req, req->r_session);
-		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
@@ -2863,12 +2923,18 @@  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 	if (realm)
 		ceph_put_snap_realm(mdsc, realm);
 
-	if (err == 0 && req->r_target_inode &&
-	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
-		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-		spin_lock(&ci->i_unsafe_lock);
-		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
-		spin_unlock(&ci->i_unsafe_lock);
+	if (err == 0) {
+		if (req->r_target_inode &&
+		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+			struct ceph_inode_info *ci =
+				ceph_inode(req->r_target_inode);
+			spin_lock(&ci->i_unsafe_lock);
+			list_add_tail(&req->r_unsafe_target_item,
+				      &ci->i_unsafe_iops);
+			spin_unlock(&ci->i_unsafe_lock);
+		}
+
+		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 out_err:
 	mutex_lock(&mdsc->mutex);
@@ -3955,6 +4021,9 @@  static void delayed_work(struct work_struct *work)
 	int renew_caps;
 
 	dout("mdsc delayed_work\n");
+
+	ceph_queue_cap_reclaim_work(mdsc);
+
 	ceph_check_delayed_caps(mdsc);
 
 	ceph_trim_snapid_map(mdsc);
@@ -4055,11 +4124,14 @@  int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	mdsc->num_cap_flushing = 0;
 	spin_lock_init(&mdsc->cap_dirty_lock);
 	init_waitqueue_head(&mdsc->cap_flushing_wq);
+	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+	atomic_set(&mdsc->cap_reclaim_pending, 0);
+
 	spin_lock_init(&mdsc->dentry_lru_lock);
 	INIT_LIST_HEAD(&mdsc->dentry_lru);
 
 	ceph_caps_init(mdsc);
-	ceph_adjust_min_caps(mdsc, fsc->min_caps);
+	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
 
 	spin_lock_init(&mdsc->snapid_map_lock);
 	mdsc->snapid_map_tree = RB_ROOT;
@@ -4259,9 +4331,9 @@  void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 	mutex_unlock(&mdsc->mutex);
 
 	ceph_cleanup_snapid_map(mdsc);
-
 	ceph_cleanup_empty_realms(mdsc);
 
+	cancel_work_sync(&mdsc->cap_reclaim_work);
 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
 
 	dout("stopped\n");
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 2147ecd0c9e5..8db89465de7b 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -378,6 +378,9 @@  struct ceph_mds_client {
 	spinlock_t        cap_dirty_lock;   /* protects above items */
 	wait_queue_head_t cap_flushing_wq;
 
+	struct work_struct cap_reclaim_work;
+	atomic_t	   cap_reclaim_pending;
+
 	/*
 	 * Cap reservations
 	 *
@@ -394,6 +397,7 @@  struct ceph_mds_client {
 						unreserved) */
 	int		caps_total_count;    /* total caps allocated */
 	int		caps_use_count;      /* in use */
+	int		caps_use_max;	     /* max used caps */
 	int		caps_reserve_count;  /* unused, reserved */
 	int		caps_avail_count;    /* unused, unreserved */
 	int		caps_min_count;      /* keep at least this many
@@ -462,6 +466,8 @@  extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
 				    struct ceph_cap *cap);
 extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
 				    struct ceph_mds_session *session);
+extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
+extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 200836bcf542..6d5bb2f74612 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -133,6 +133,7 @@  enum {
 	Opt_rasize,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_max,
+	Opt_caps_max,
 	Opt_readdir_max_entries,
 	Opt_readdir_max_bytes,
 	Opt_congestion_kb,
@@ -175,6 +176,7 @@  static match_table_t fsopt_tokens = {
 	{Opt_rasize, "rasize=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+	{Opt_caps_max, "caps_max=%d"},
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
 	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
 	{Opt_congestion_kb, "write_congestion_kb=%d"},
@@ -286,6 +288,11 @@  static int parse_fsopt_token(char *c, void *private)
 			return -EINVAL;
 		fsopt->caps_wanted_delay_max = intval;
 		break;
+	case Opt_caps_max:
+		if (intval < 0)
+			return -EINVAL;
+		fsopt->caps_max = intval;
+		break;
 	case Opt_readdir_max_entries:
 		if (intval < 1)
 			return -EINVAL;
@@ -576,6 +583,8 @@  static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_printf(m, ",rasize=%d", fsopt->rasize);
 	if (fsopt->congestion_kb != default_congestion_kb())
 		seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+	if (fsopt->caps_max)
+		seq_printf(m, ",caps_max=%d", fsopt->caps_max);
 	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
 		seq_printf(m, ",caps_wanted_delay_min=%d",
 			 fsopt->caps_wanted_delay_min);
@@ -683,9 +692,6 @@  static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	if (!fsc->wb_pagevec_pool)
 		goto fail_cap_wq;
 
-	/* caps */
-	fsc->min_caps = fsopt->max_readdir;
-
 	return fsc;
 
 fail_cap_wq:
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 007406266382..ba5b7675ed58 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -79,6 +79,7 @@  struct ceph_mount_options {
 	int rasize;           /* max readahead */
 	int congestion_kb;    /* max writeback in flight */
 	int caps_wanted_delay_min, caps_wanted_delay_max;
+	int caps_max;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
 
@@ -100,7 +101,6 @@  struct ceph_fs_client {
 	struct ceph_client *client;
 
 	unsigned long mount_state;
-	int min_caps;                  /* min caps i added */
 	loff_t max_file_size;
 
 	struct ceph_mds_client *mdsc;
@@ -663,7 +663,8 @@  extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
 
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
-extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+				     struct ceph_mount_options *fsopt);
 extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_cap_reservation *ctx, int need);
 extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
diff --git a/include/linux/ceph/types.h b/include/linux/ceph/types.h
index 27cd973d3881..bd3d532902d7 100644
--- a/include/linux/ceph/types.h
+++ b/include/linux/ceph/types.h
@@ -24,6 +24,7 @@  struct ceph_vino {
 /* context for the caps reservation mechanism */
 struct ceph_cap_reservation {
 	int count;
+	int used;
 };