@@ -36,3 +36,11 @@ config CEPH_FS_POSIX_ACL
groups beyond the owner/group/world scheme.
If you don't know what Access Control Lists are, say N
+
+config CEPH_LIB_IO_POLICY
+ bool "Use the ceph-specific blkcg policy to limit io reqs"
+ depends on CEPH_LIB
+ default n
+ help
+ If you say Y here, the blkcg policy will be inited when
+ libceph module is loaded
@@ -12,3 +12,4 @@ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
+ceph-$(CONFIG_CEPH_LIB_IO_POLICY) += ceph_io_policy.o
new file mode 100644
@@ -0,0 +1,445 @@
+#include <linux/ceph/ceph_io_policy.h>
+#include <linux/slab.h>
+
+enum {
+ CEPH_MDS_META_OPS,
+ CEPH_MDS_META_OPS_IOPS,
+ CEPH_OSD_DATA_OPS,
+ CEPH_OSD_DATA_OPS_IOPS,
+ CEPH_OSD_DATA_OPS_BANDWIDTH,
+};
+
+static void put_token(struct token_bucket_throttle* ptbt, u64 tick_interval)
+{
+ struct token_bucket* ptb = NULL;
+ u64 tokens_to_put = 0;
+ int i = 0;
+
+ for (i = 0; i < ptbt->tb_num; i++) {
+ ptb = &ptbt->tb[i];
+
+ if (!ptb->max)
+ continue;
+
+ tokens_to_put = ptb->target_throughput * tick_interval / HZ;
+
+ if (ptb->remain + tokens_to_put >= ptb->max)
+ ptb->remain = ptb->max;
+ else
+ ptb->remain += tokens_to_put;
+ pr_debug("%s: put_token: token bucket remain: %lld\n", __func__, ptb->remain);
+ }
+}
+
+static bool should_wait(struct token_bucket_throttle* ptbt, struct queue_item* qitem)
+{
+ struct token_bucket* ptb = NULL;
+ int i = 0;
+
+ BUG_ON(ptbt->tb_num != qitem->tb_item_num);
+ for (i = 0; i < ptbt->tb_num; i++) {
+ ptb = &ptbt->tb[i];
+
+ if (!ptb->max)
+ continue;
+
+ if (ptb->remain < qitem->tokens_requested[i])
+ return true;
+ }
+ return false;
+}
+
+static void get_token(struct token_bucket_throttle* ptbt, struct queue_item* qitem)
+{
+ struct token_bucket* ptb = NULL;
+ int i = 0;
+ BUG_ON(should_wait(ptbt, qitem));
+
+ for (i = 0; i < ptbt->tb_num; i++) {
+ ptb = &ptbt->tb[i];
+ if (!ptb->max)
+ continue;
+ ptb->remain -= qitem->tokens_requested[i];
+ }
+}
+
+void schedule_token_bucket_throttle_tick(struct token_bucket_throttle* ptbt, u64 tick_interval)
+{
+ if (tick_interval)
+ schedule_delayed_work(&ptbt->tick_work, tick_interval);
+}
+EXPORT_SYMBOL(schedule_token_bucket_throttle_tick);
+
+void token_bucket_throttle_tick(struct work_struct* work)
+{
+ struct token_bucket_throttle* ptbt =
+ container_of(work, struct token_bucket_throttle, tick_work.work);
+ struct queue_item* req = NULL, *tmp = NULL;
+ LIST_HEAD(reqs_to_go);
+ u64 tick_interval = ptbt->tick_interval;
+
+ mutex_lock(&ptbt->bucket_lock);
+ put_token(ptbt, tick_interval);
+ if (!tick_interval)
+ pr_debug("%s: tick_interval set to 0, turning off the throttle, item: %p\n", __func__, req);
+
+ list_for_each_entry_safe(req, tmp, &ptbt->reqs_blocked, token_bucket_throttle_item) {
+ pr_debug("%s: waiting item: %p\n", __func__, req);
+ if (tick_interval) {
+ if (should_wait(ptbt, req))
+ break;
+ get_token(ptbt, req);
+ }
+ list_del_init(&req->token_bucket_throttle_item);
+ list_add_tail(&req->token_bucket_throttle_item, &reqs_to_go);
+ pr_debug("%s: tokens got for req: %p\n", __func__, req);
+ }
+ mutex_unlock(&ptbt->bucket_lock);
+
+ list_for_each_entry_safe(req, tmp, &reqs_to_go, token_bucket_throttle_item) {
+ pr_debug("%s: notifying req: %p, list head: %p\n", __func__, req, &reqs_to_go);
+ complete_all(&req->throttled);
+ list_del_init(&req->token_bucket_throttle_item);
+ }
+
+ if (tick_interval)
+ schedule_token_bucket_throttle_tick(ptbt, tick_interval);
+}
+EXPORT_SYMBOL(token_bucket_throttle_tick);
+
+int get_token_bucket_throttle(struct token_bucket_throttle* ptbt, struct queue_item* req)
+{
+ int ret = 0;
+ long timeleft = 0;
+
+ mutex_lock(&ptbt->bucket_lock);
+ if (should_wait(ptbt, req)) {
+ pr_debug("%s: wait for tokens, req: %p\n", __func__, req);
+ list_add_tail(&req->token_bucket_throttle_item, &ptbt->reqs_blocked);
+ mutex_unlock(&ptbt->bucket_lock);
+ timeleft = wait_for_completion_killable_timeout(&req->throttled, req->tbt_timeout ?: MAX_SCHEDULE_TIMEOUT);
+ if (timeleft > 0)
+ ret = 0;
+ else {
+ if (!timeleft)
+ ret = -EIO; /* timed out */
+ else {
+ /* killed */
+ pr_debug("%s: killed, req: %p\n", __func__, req);
+ ret = timeleft;
+ }
+ mutex_lock(&ptbt->bucket_lock);
+ if (!list_empty(&req->token_bucket_throttle_item))
+ list_del_init(&req->token_bucket_throttle_item);
+ mutex_unlock(&ptbt->bucket_lock);
+ }
+ } else {
+ pr_debug("%s: no need to wait for tokens, going ahead, req: %p\n", __func__, req);
+ get_token(ptbt, req);
+ mutex_unlock(&ptbt->bucket_lock);
+ }
+ return ret;
+}
+EXPORT_SYMBOL(get_token_bucket_throttle);
+
+int queue_item_init(struct queue_item* qitem, struct token_bucket_throttle* ptbt, int tb_item_num)
+{
+ qitem->tokens_requested = kzalloc(sizeof(*qitem->tokens_requested) * tb_item_num, GFP_KERNEL);
+ if (!qitem->tokens_requested)
+ return -ENOMEM;
+
+ qitem->tb_item_num = tb_item_num;
+ INIT_LIST_HEAD(&qitem->token_bucket_throttle_item);
+ init_completion(&qitem->throttled);
+ qitem->tbt_timeout = ptbt->tbt_timeout;
+
+ return 0;
+}
+EXPORT_SYMBOL(queue_item_init);
+
+void queue_item_free(struct queue_item* qitem)
+{
+ kfree(qitem->tokens_requested);
+}
+EXPORT_SYMBOL(queue_item_free);
+
+int token_bucket_throttle_init(struct token_bucket_throttle* ptbt,
+ int token_bucket_num)
+{
+ int i = 0;
+
+ INIT_LIST_HEAD(&ptbt->reqs_blocked);
+ mutex_init(&ptbt->bucket_lock);
+ ptbt->tb_num = token_bucket_num;
+ ptbt->tb = kzalloc(sizeof(*ptbt->tb) * ptbt->tb_num, GFP_KERNEL);
+ if (!ptbt->tb) {
+ return -ENOMEM;
+ }
+
+ for (i = 0; i < ptbt->tb_num; i++) {
+ ptbt->tb[i].target_throughput = 0;
+ ptbt->tb[i].max = 0;
+ }
+ ptbt->tick_interval = 0;
+ ptbt->tbt_timeout = 0;
+ INIT_DELAYED_WORK(&ptbt->tick_work, token_bucket_throttle_tick);
+
+ return 0;
+}
+EXPORT_SYMBOL(token_bucket_throttle_init);
+
+static int set_throttle_params(struct token_bucket_throttle* ptbt, char* param_list)
+{
+ char* options = strstrip(param_list);
+ char* val = NULL;
+ int res = 0;
+ unsigned long interval = 0, timeout = 0, last_interval = ptbt->tick_interval;
+
+ val = strsep(&options, ",");
+ if (!val)
+ return -EINVAL;
+
+ res = kstrtol(val, 0, &interval);
+ if (res)
+ return res;
+
+ val = strsep(&options, ",");
+ if (!val)
+ return -EINVAL;
+
+ res = kstrtol(val, 0, &timeout);
+ if (res)
+ return res;
+
+ if (last_interval && !interval) {
+ int i = 0;
+
+ for (i = 0; i<ptbt->tb_num; i++) {
+ if (ptbt->tb[i].max) {
+ /* all token bucket must be unset
+ * before turning off the throttle */
+ return -EINVAL;
+ }
+ }
+ }
+ ptbt->tick_interval = msecs_to_jiffies(interval);
+ ptbt->tbt_timeout = timeout;
+
+ if (ptbt->tick_interval && !last_interval) {
+ schedule_token_bucket_throttle_tick(ptbt, ptbt->tick_interval);
+ }
+
+ return 0;
+}
+
+static int set_tb_params(struct token_bucket_throttle* ptbt, int tb_idx, char* param_list)
+{
+ char* options = strstrip(param_list);
+ char* val = NULL;
+ int res = 0;
+ unsigned long throughput = 0, burst = 0;
+
+ val = strsep(&options, ",");
+ if (!val)
+ return -EINVAL;
+
+ res = kstrtol(val, 0, &throughput);
+ if (res)
+ return res;
+
+ val = strsep(&options, ",");
+ if (!val)
+ return -EINVAL;
+
+ res = kstrtol(val, 0, &burst);
+ if (res)
+ return res;
+
+ if (!(throughput && burst) && (throughput || burst)) {
+ /* either both or none of throughput and burst are set*/
+ return -EINVAL;
+ }
+ if (throughput && !ptbt->tick_interval) {
+ /* all token bucket must be unset
+ * before turning off the throttle */
+ return -EINVAL;
+ }
+ ptbt->tb[tb_idx].target_throughput = throughput;
+ ptbt->tb[tb_idx].max = burst;
+
+ return 0;
+}
+
+static ssize_t ceph_set_throttle_params(struct kernfs_open_file *of,
+ char *buf, size_t nbytes, loff_t off)
+{
+ int ret = 0;
+ struct blkcg* blkcg = css_to_blkcg(of_css(of));
+ struct ceph_group_data* cephgd_p = blkcg_to_cephgd(blkcg);
+ int index = of_cft(of)->private;
+
+ switch (index) {
+ case CEPH_MDS_META_OPS:
+ ret = set_throttle_params(&cephgd_p->meta_ops_throttle, buf);
+ break;
+ case CEPH_OSD_DATA_OPS:
+ ret = set_throttle_params(&cephgd_p->data_ops_throttle, buf);
+ break;
+ case CEPH_MDS_META_OPS_IOPS:
+ ret = set_tb_params(&cephgd_p->meta_ops_throttle,
+ META_OPS_IOPS_IDX, buf);
+ break;
+ case CEPH_OSD_DATA_OPS_IOPS:
+ ret = set_tb_params(&cephgd_p->data_ops_throttle,
+ DATA_OPS_IOPS_IDX, buf);
+ break;
+ case CEPH_OSD_DATA_OPS_BANDWIDTH:
+ ret = set_tb_params(&cephgd_p->data_ops_throttle,
+ DATA_OPS_BAND_IDX, buf);
+ break;
+ default:
+ BUG();
+ }
+
+ return ret ?: nbytes;
+}
+
+static int ceph_throttle_params_read(struct seq_file *sf, void *v)
+{
+ struct blkcg* blkcg = css_to_blkcg(seq_css(sf));
+ struct ceph_group_data* cephgd_p = blkcg_to_cephgd(blkcg);
+ int index = seq_cft(sf)->private;
+
+ switch (index) {
+ case CEPH_MDS_META_OPS:
+ seq_printf(sf, "%llu,%lu\n",
+ cephgd_p->meta_ops_throttle.tick_interval,
+ cephgd_p->meta_ops_throttle.tbt_timeout);
+ break;
+ case CEPH_OSD_DATA_OPS:
+ seq_printf(sf, "%llu,%lu\n",
+ cephgd_p->data_ops_throttle.tick_interval,
+ cephgd_p->data_ops_throttle.tbt_timeout);
+ break;
+ case CEPH_MDS_META_OPS_IOPS:
+ seq_printf(sf, "%llu,%llu\n",
+ cephgd_p->meta_ops_throttle.tb[META_OPS_IOPS_IDX].target_throughput,
+ cephgd_p->meta_ops_throttle.tb[META_OPS_IOPS_IDX].max);
+ break;
+ case CEPH_OSD_DATA_OPS_IOPS:
+ seq_printf(sf, "%llu,%llu\n",
+ cephgd_p->data_ops_throttle.tb[DATA_OPS_IOPS_IDX].target_throughput,
+ cephgd_p->data_ops_throttle.tb[DATA_OPS_IOPS_IDX].max);
+ break;
+ case CEPH_OSD_DATA_OPS_BANDWIDTH:
+ seq_printf(sf, "%llu,%llu\n",
+ cephgd_p->data_ops_throttle.tb[DATA_OPS_BAND_IDX].target_throughput,
+ cephgd_p->data_ops_throttle.tb[DATA_OPS_BAND_IDX].max);
+ break;
+ default:
+ BUG();
+ }
+
+ return 0;
+}
+
+static struct cftype cephgd_files[] = {
+ {
+ .name = "cephfs_meta_ops.iops",
+ .write = ceph_set_throttle_params,
+ .seq_show = ceph_throttle_params_read,
+ .private = CEPH_MDS_META_OPS_IOPS,
+ },
+ {
+ .name = "cephfs_meta_ops",
+ .write = ceph_set_throttle_params,
+ .seq_show = ceph_throttle_params_read,
+ .private = CEPH_MDS_META_OPS,
+ },
+ {
+ .name = "cephfs_data_ops.iops",
+ .write = ceph_set_throttle_params,
+ .seq_show = ceph_throttle_params_read,
+ .private = CEPH_OSD_DATA_OPS_IOPS,
+ },
+ {
+ .name = "cephfs_data_ops.bandwidth",
+ .write = ceph_set_throttle_params,
+ .seq_show = ceph_throttle_params_read,
+ .private = CEPH_OSD_DATA_OPS_BANDWIDTH,
+ },
+ {
+ .name = "cephfs_data_ops",
+ .write = ceph_set_throttle_params,
+ .seq_show = ceph_throttle_params_read,
+ .private = CEPH_OSD_DATA_OPS,
+ },
+ { }
+};
+
+static struct blkcg_policy_data * ceph_cpd_alloc(gfp_t gfp) {
+
+ struct ceph_group_data* cephgd_p = NULL;
+ struct blkcg_policy_data *ret = NULL;
+ int r = 0;
+
+ cephgd_p = kzalloc(sizeof(*cephgd_p), gfp);
+ if (!cephgd_p) {
+ ret = ERR_PTR(-ENOMEM);
+ goto err;
+ }
+
+ r = token_bucket_throttle_init(&cephgd_p->meta_ops_throttle,
+ META_OPS_TB_NUM);
+ if (r) {
+ ret = ERR_PTR(r);
+ goto err;
+ }
+
+ r = token_bucket_throttle_init(&cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (r) {
+ ret = ERR_PTR(r);
+ goto err;
+ }
+
+ return &cephgd_p->cpd;
+err:
+ return ret;
+}
+
+static void ceph_cpd_init(struct blkcg_policy_data *cpd) {
+}
+
+static void ceph_cpd_free(struct blkcg_policy_data *cpd) {
+ struct ceph_group_data* cephgd_p = cpd_to_cephgd(cpd);
+
+ cancel_delayed_work_sync(&cephgd_p->meta_ops_throttle.tick_work);
+ cancel_delayed_work_sync(&cephgd_p->data_ops_throttle.tick_work);
+
+ kfree(cephgd_p->meta_ops_throttle.tb);
+ kfree(cephgd_p->data_ops_throttle.tb);
+
+ kfree(cephgd_p);
+}
+
+struct blkcg_policy io_policy_ceph = {
+ .dfl_cftypes = cephgd_files,
+
+ .cpd_alloc_fn = ceph_cpd_alloc,
+ .cpd_init_fn = ceph_cpd_init,
+ .cpd_free_fn = ceph_cpd_free,
+};
+EXPORT_SYMBOL_GPL(io_policy_ceph);
+
+int ceph_io_policy_init()
+{
+ return blkcg_policy_register(&io_policy_ceph);
+};
+EXPORT_SYMBOL(ceph_io_policy_init);
+
+void ceph_io_policy_release()
+{
+ blkcg_policy_unregister(&io_policy_ceph);
+};
+EXPORT_SYMBOL(ceph_io_policy_release);
new file mode 100644
@@ -0,0 +1,74 @@
+#ifndef _CEPHFS_CGROUP_H
+#define _CEPHFS_CGROUP_H
+
+#include <linux/blk-cgroup.h>
+
+#define META_OPS_IOPS_IDX 0
+#define DATA_OPS_IOPS_IDX 0
+#define DATA_OPS_BAND_IDX 1
+#define META_OPS_TB_NUM 1
+#define DATA_OPS_TB_NUM 2
+
+/*
+ * token bucket throttle
+ */
+struct token_bucket {
+ u64 remain;
+ u64 max;
+ u64 target_throughput;
+};
+
+struct token_bucket_throttle {
+ struct token_bucket* tb;
+ u64 tick_interval;
+ int tb_num;
+ struct list_head reqs_blocked;
+ struct mutex bucket_lock;
+ struct delayed_work tick_work;
+ unsigned long tbt_timeout;
+};
+
+struct queue_item {
+ struct list_head token_bucket_throttle_item;
+ u64* tokens_requested;
+ int tb_item_num;
+ struct completion throttled;
+ unsigned long tbt_timeout;
+};
+
+struct ceph_group_data {
+ struct blkcg_policy_data cpd;
+
+ struct token_bucket_throttle meta_ops_throttle;
+ struct token_bucket_throttle data_ops_throttle;
+};
+
+extern struct blkcg_policy io_policy_ceph;
+
+static inline struct ceph_group_data *cpd_to_cephgd(struct blkcg_policy_data *cpd)
+{
+ return cpd ? container_of(cpd, struct ceph_group_data, cpd) : NULL;
+}
+
+static inline struct ceph_group_data* blkcg_to_cephgd(struct blkcg* blkcg)
+{
+ return cpd_to_cephgd(blkcg_to_cpd(blkcg, &io_policy_ceph));
+}
+
+extern void schedule_token_bucket_throttle_tick(struct token_bucket_throttle* ptbt, u64 tick_interval);
+
+extern void token_bucket_throttle_tick(struct work_struct* work);
+
+extern int get_token_bucket_throttle(struct token_bucket_throttle* ptbt, struct queue_item* req);
+
+extern int queue_item_init(struct queue_item* qitem, struct token_bucket_throttle* ptbt, int tb_item_num);
+
+extern int token_bucket_throttle_init(struct token_bucket_throttle* ptbt, int token_bucket_num);
+
+extern int ceph_io_policy_init(void);
+
+extern void ceph_io_policy_release(void);
+
+extern void queue_item_free(struct queue_item* qitem);
+
+#endif /*_CEPHFS_CGROUP_H*/
@@ -15,6 +15,9 @@
#include <linux/ceph/msgpool.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
struct ceph_msg;
struct ceph_snap_context;
@@ -193,6 +196,10 @@ struct ceph_osd_request {
int r_result;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ /* token bucket throttle item*/
+ struct queue_item qitem;
+#endif
struct ceph_osd_client *r_osdc;
struct kref r_kref;
bool r_mempool;
@@ -26,6 +26,9 @@
#include <linux/ceph/decode.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/auth.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
#include "crypto.h"
@@ -776,6 +779,12 @@ static int __init init_ceph_lib(void)
{
int ret = 0;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ret = ceph_io_policy_init();
+ if (ret < 0)
+ goto out;
+#endif
+
ret = ceph_debugfs_init();
if (ret < 0)
goto out;
@@ -812,6 +821,10 @@ static void __exit exit_ceph_lib(void)
dout("exit_ceph_lib\n");
WARN_ON(!ceph_strings_empty());
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ceph_io_policy_release();
+#endif
+
ceph_osdc_cleanup();
ceph_msgr_exit();
ceph_crypto_shutdown();