@@ -118,6 +118,10 @@ Mount Options
of a non-responsive Ceph file system. The default is 30
seconds.
+ caps_max=X
+ Specify the maximum number of caps to hold. Unused caps are released
+ when number of caps exceeds the limit. The default is 0 (no limit)
+
rbytes
When stat() is called on a directory, set st_size to 'rbytes',
the summation of file sizes over all files nested beneath that
@@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
spin_unlock(&mdsc->caps_list_lock);
}
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+ struct ceph_mount_options *fsopt)
{
spin_lock(&mdsc->caps_list_lock);
- mdsc->caps_min_count += delta;
- BUG_ON(mdsc->caps_min_count < 0);
+ mdsc->caps_min_count = fsopt->max_readdir;
+ if (mdsc->caps_min_count < 1024)
+ mdsc->caps_min_count = 1024;
+ mdsc->caps_use_max = fsopt->caps_max;
+ if (mdsc->caps_use_max > 0 &&
+ mdsc->caps_use_max < mdsc->caps_min_count)
+ mdsc->caps_use_max = mdsc->caps_min_count;
spin_unlock(&mdsc->caps_list_lock);
}
@@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
if (!err) {
BUG_ON(have + alloc != need);
ctx->count = need;
+ ctx->used = 0;
}
spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +302,24 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
}
void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
- struct ceph_cap_reservation *ctx)
+ struct ceph_cap_reservation *ctx)
{
+ bool reclaim = false;
+ if (!ctx->count)
+ return;
+
dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
spin_lock(&mdsc->caps_list_lock);
__ceph_unreserve_caps(mdsc, ctx->count);
ctx->count = 0;
+
+ if (mdsc->caps_use_max > 0 &&
+ mdsc->caps_use_count > mdsc->caps_use_max)
+ reclaim = true;
spin_unlock(&mdsc->caps_list_lock);
+
+ if (reclaim)
+ ceph_reclaim_caps_nr(mdsc, ctx->used);
}
struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
BUG_ON(list_empty(&mdsc->caps_list));
ctx->count--;
+ ctx->used++;
mdsc->caps_reserve_count--;
mdsc->caps_use_count++;
@@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
struct ceph_inode_info *ci)
{
- struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+ struct ceph_mount_options *opt = mdsc->fsc->mount_options;
ci->i_hold_caps_min = round_jiffies(jiffies +
- ma->caps_wanted_delay_min * HZ);
+ opt->caps_wanted_delay_min * HZ);
ci->i_hold_caps_max = round_jiffies(jiffies +
- ma->caps_wanted_delay_max * HZ);
+ opt->caps_wanted_delay_max * HZ);
dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
}
@@ -58,6 +58,7 @@ struct ceph_reconnect_state {
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
static void ceph_cap_release_work(struct work_struct *work);
+static void ceph_cap_reclaim_work(struct work_struct *work);
static const struct ceph_connection_operations mds_con_ops;
@@ -1932,7 +1933,7 @@ void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
* caller holds session->s_cap_lock
*/
void __ceph_queue_cap_release(struct ceph_mds_session *session,
- struct ceph_cap *cap)
+ struct ceph_cap *cap)
{
list_add_tail(&cap->session_caps, &session->s_cap_releases);
session->s_num_cap_releases++;
@@ -1941,6 +1942,66 @@ void __ceph_queue_cap_release(struct ceph_mds_session *session,
ceph_flush_cap_releases(session->s_mdsc, session);
}
+static void ceph_cap_reclaim_work(struct work_struct *work)
+{
+ struct ceph_mds_client *mdsc =
+ container_of(work, struct ceph_mds_client, cap_reclaim_work);
+ struct super_block *sb = mdsc->fsc->sb;
+ struct shrink_control sc = {
+ .gfp_mask = GFP_KERNEL,
+ };
+ unsigned long nr_to_trim = 0;
+ unsigned long freeable, freed;
+
+ spin_lock(&mdsc->caps_list_lock);
+ if (mdsc->caps_use_max > 0 &&
+ mdsc->caps_use_count > mdsc->caps_use_max)
+ nr_to_trim = mdsc->caps_use_count - mdsc->caps_use_max;
+ spin_unlock(&mdsc->caps_list_lock);
+
+ if (!nr_to_trim)
+ return;
+
+ sc.nr_to_scan = nr_to_trim;
+ freeable = sb->s_shrink.count_objects(&sb->s_shrink, &sc);
+ if (freeable == 0 || freeable == SHRINK_EMPTY)
+ return;
+
+ sc.nr_to_scan = min_t(unsigned long,
+ CEPH_CAPS_PER_RELEASE * 2,
+ min(nr_to_trim, freeable));
+ freed = sb->s_shrink.scan_objects(&sb->s_shrink, &sc);
+ if (freed == SHRINK_STOP)
+ return;
+
+ if (freed < nr_to_trim)
+ ceph_queue_cap_reclaim_work(mdsc);
+}
+
+void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
+{
+ if (mdsc->stopping)
+ return;
+
+ if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
+ dout("caps reclaim work queued\n");
+ } else {
+ dout("failed to queue caps release work\n");
+ }
+}
+
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+ int val;
+ if (!nr)
+ return;
+ val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+ if (!(val % CEPH_CAPS_PER_RELEASE)) {
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
+ ceph_queue_cap_reclaim_work(mdsc);
+ }
+}
+
/*
* requests
*/
@@ -2854,7 +2915,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
req->r_op == CEPH_MDS_OP_LSSNAP))
ceph_readdir_prepopulate(req, req->r_session);
- ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
current->journal_info = NULL;
mutex_unlock(&req->r_fill_mutex);
@@ -2863,12 +2923,18 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
if (realm)
ceph_put_snap_realm(mdsc, realm);
- if (err == 0 && req->r_target_inode &&
- test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
- struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
- spin_lock(&ci->i_unsafe_lock);
- list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
- spin_unlock(&ci->i_unsafe_lock);
+ if (err == 0) {
+ if (req->r_target_inode &&
+ test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+ struct ceph_inode_info *ci =
+ ceph_inode(req->r_target_inode);
+ spin_lock(&ci->i_unsafe_lock);
+ list_add_tail(&req->r_unsafe_target_item,
+ &ci->i_unsafe_iops);
+ spin_unlock(&ci->i_unsafe_lock);
+ }
+
+ ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
}
out_err:
mutex_lock(&mdsc->mutex);
@@ -3955,6 +4021,9 @@ static void delayed_work(struct work_struct *work)
int renew_caps;
dout("mdsc delayed_work\n");
+
+ ceph_queue_cap_reclaim_work(mdsc);
+
ceph_check_delayed_caps(mdsc);
ceph_trim_snapid_map(mdsc);
@@ -4055,11 +4124,14 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq);
+ INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+ atomic_set(&mdsc->cap_reclaim_pending, 0);
+
spin_lock_init(&mdsc->dentry_lru_lock);
INIT_LIST_HEAD(&mdsc->dentry_lru);
ceph_caps_init(mdsc);
- ceph_adjust_min_caps(mdsc, fsc->min_caps);
+ ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
spin_lock_init(&mdsc->snapid_map_lock);
mdsc->snapid_map_tree = RB_ROOT;
@@ -4259,9 +4331,9 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
mutex_unlock(&mdsc->mutex);
ceph_cleanup_snapid_map(mdsc);
-
ceph_cleanup_empty_realms(mdsc);
+ cancel_work_sync(&mdsc->cap_reclaim_work);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
dout("stopped\n");
@@ -378,6 +378,9 @@ struct ceph_mds_client {
spinlock_t cap_dirty_lock; /* protects above items */
wait_queue_head_t cap_flushing_wq;
+ struct work_struct cap_reclaim_work;
+ atomic_t cap_reclaim_pending;
+
/*
* Cap reservations
*
@@ -394,6 +397,7 @@ struct ceph_mds_client {
unreserved) */
int caps_total_count; /* total caps allocated */
int caps_use_count; /* in use */
+ int caps_use_max; /* max used caps */
int caps_reserve_count; /* unused, reserved */
int caps_avail_count; /* unused, unreserved */
int caps_min_count; /* keep at least this many
@@ -462,6 +466,8 @@ extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
struct ceph_cap *cap);
extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
+extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
+extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
@@ -133,6 +133,7 @@ enum {
Opt_rasize,
Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max,
+ Opt_caps_max,
Opt_readdir_max_entries,
Opt_readdir_max_bytes,
Opt_congestion_kb,
@@ -175,6 +176,7 @@ static match_table_t fsopt_tokens = {
{Opt_rasize, "rasize=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+ {Opt_caps_max, "caps_max=%d"},
{Opt_readdir_max_entries, "readdir_max_entries=%d"},
{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
{Opt_congestion_kb, "write_congestion_kb=%d"},
@@ -286,6 +288,11 @@ static int parse_fsopt_token(char *c, void *private)
return -EINVAL;
fsopt->caps_wanted_delay_max = intval;
break;
+ case Opt_caps_max:
+ if (intval < 0)
+ return -EINVAL;
+ fsopt->caps_max = intval;
+ break;
case Opt_readdir_max_entries:
if (intval < 1)
return -EINVAL;
@@ -576,6 +583,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
seq_printf(m, ",rasize=%d", fsopt->rasize);
if (fsopt->congestion_kb != default_congestion_kb())
seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+ if (fsopt->caps_max)
+ seq_printf(m, ",caps_max=%d", fsopt->caps_max);
if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
seq_printf(m, ",caps_wanted_delay_min=%d",
fsopt->caps_wanted_delay_min);
@@ -683,9 +692,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
if (!fsc->wb_pagevec_pool)
goto fail_cap_wq;
- /* caps */
- fsc->min_caps = fsopt->max_readdir;
-
return fsc;
fail_cap_wq:
@@ -79,6 +79,7 @@ struct ceph_mount_options {
int rasize; /* max readahead */
int congestion_kb; /* max writeback in flight */
int caps_wanted_delay_min, caps_wanted_delay_max;
+ int caps_max;
int max_readdir; /* max readdir result (entires) */
int max_readdir_bytes; /* max readdir result (bytes) */
@@ -100,7 +101,6 @@ struct ceph_fs_client {
struct ceph_client *client;
unsigned long mount_state;
- int min_caps; /* min caps i added */
loff_t max_file_size;
struct ceph_mds_client *mdsc;
@@ -663,7 +663,8 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
extern void ceph_caps_init(struct ceph_mds_client *mdsc);
extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
-extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+ struct ceph_mount_options *fsopt);
extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
struct ceph_cap_reservation *ctx, int need);
extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -24,6 +24,7 @@ struct ceph_vino {
/* context for the caps reservation mechanism */
struct ceph_cap_reservation {
int count;
+ int used;
};
If number of caps exceed the limit, queue a work to trim unsed dentries when unreserving caps. Trimming dentry releases references to associated inode, which may evict inode and release caps. By default, there is no limit for caps count. Signed-off-by: "Yan, Zheng" <zyan@redhat.com> --- Documentation/filesystems/ceph.txt | 4 ++ fs/ceph/caps.c | 33 ++++++++--- fs/ceph/mds_client.c | 92 ++++++++++++++++++++++++++---- fs/ceph/mds_client.h | 6 ++ fs/ceph/super.c | 12 +++- fs/ceph/super.h | 5 +- include/linux/ceph/types.h | 1 + 7 files changed, 131 insertions(+), 22 deletions(-) Change since V1: - use worker to reclaim caps