@@ -193,6 +193,27 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
int err = 0;
u64 off = page_offset(page);
u64 len = PAGE_SIZE;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+
+ err = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (err < 0)
+ return err;
+
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+
+ err = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ queue_item_free(&qitem);
+ if (err < 0)
+ return err;
+#endif
if (off >= i_size_read(inode)) {
zero_user_segment(page, 0, PAGE_SIZE);
@@ -317,6 +338,16 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
int nr_pages = 0;
int got = 0;
int ret = 0;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+
+ ret = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (ret < 0)
+ return ret;
+#endif
if (!rw_ctx) {
/* caller of readpages does not hold buffer and read caps
@@ -401,6 +432,17 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
req->r_callback = finish_read;
req->r_inode = inode;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ ret = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (ret < 0)
+ goto out_pages;
+#endif
+
dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
ret = ceph_osdc_start_request(osdc, req, false);
if (ret < 0)
@@ -411,6 +453,9 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
* So we can drop our cap refs. */
if (got)
ceph_put_cap_refs(ci, got);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
return nr_pages;
@@ -423,6 +468,10 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
out_put:
ceph_osdc_put_request(req);
out:
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
+
if (got)
ceph_put_cap_refs(ci, got);
return ret;
@@ -580,6 +629,36 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
loff_t page_off = page_offset(page);
int err, len = PAGE_SIZE;
struct ceph_writeback_ctl ceph_wbc;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p = NULL;
+ struct queue_item qitem;
+
+#ifdef CONFIG_CGROUP_WRITEBACK
+ if (wbc->wb)
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(wbc->wb->blkcg_css));
+ else
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#else
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#endif
+
+ err = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (err < 0)
+ return err;
+
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+
+ err = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ queue_item_free(&qitem);
+ if (err < 0)
+ return err;
+
+#endif
dout("writepage %p idx %lu\n", page, page->index);
@@ -801,6 +880,24 @@ static int ceph_writepages_start(struct address_space *mapping,
struct ceph_writeback_ctl ceph_wbc;
bool should_loop, range_whole = false;
bool done = false;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p = NULL;
+ struct queue_item qitem;
+
+#ifdef CONFIG_CGROUP_WRITEBACK
+ if (wbc->wb)
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(wbc->wb->blkcg_css));
+ else
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#else
+ cephgd_p = blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#endif
+
+ rc = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (rc < 0)
+ return rc;
+#endif
dout("writepages_start %p (mode=%s)\n", inode,
wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
@@ -1132,6 +1229,21 @@ static int ceph_writepages_start(struct address_space *mapping,
}
req->r_mtime = inode->i_mtime;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = req->r_num_ops;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = 0;
+
+ for (i = 0; i < req->r_num_ops; i++)
+ if (req->r_ops[i].op == CEPH_OSD_OP_WRITE)
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] +=
+ req->r_ops[i].extent.length;
+
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ rc = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ BUG_ON(rc);
+#endif
rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
BUG_ON(rc);
req = NULL;
@@ -1191,6 +1303,9 @@ static int ceph_writepages_start(struct address_space *mapping,
mapping->writeback_index = index;
out:
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
ceph_osdc_put_request(req);
ceph_put_snap_context(last_snapc);
dout("writepages dend - startone, rc = %d\n", rc);
@@ -1671,6 +1786,16 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
u64 len, inline_version;
int err = 0;
bool from_pagecache = false;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+
+ err = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (err < 0)
+ goto out;
+#endif
spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
@@ -1731,6 +1856,20 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
}
req->r_mtime = inode->i_mtime;
+
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = 0;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+
+ err = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (err < 0) {
+ goto out_put;
+ }
+#endif
+
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req);
@@ -1772,6 +1911,19 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
goto out_put;
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 3;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+
+ err = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (err < 0) {
+ goto out_put;
+ }
+#endif
+
req->r_mtime = inode->i_mtime;
err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err)
@@ -1781,6 +1933,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
if (err == -ECANCELED)
err = 0;
out:
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
+
if (page && page != locked_page) {
if (from_pagecache) {
unlock_page(page);
@@ -10,6 +10,9 @@
#include <linux/namei.h>
#include <linux/writeback.h>
#include <linux/falloc.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
#include "super.h"
#include "mds_client.h"
@@ -579,6 +582,11 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ssize_t ret;
u64 off = iocb->ki_pos;
u64 len = iov_iter_count(to);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+#endif
dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -597,6 +605,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
return ret;
ret = 0;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ret = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (ret < 0)
+ return ret;
+#endif
while ((len = iov_iter_count(to)) > 0) {
struct ceph_osd_request *req;
struct page **pages;
@@ -642,6 +656,19 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
}
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ ret = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (ret < 0) {
+ ceph_osdc_put_request(req);
+ break;
+ }
+#endif
+
osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
false, false);
ret = ceph_osdc_start_request(osdc, req, false);
@@ -703,6 +730,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
iocb->ki_pos = off;
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
+
dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
return ret;
}
@@ -923,6 +954,17 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
loff_t pos = iocb->ki_pos;
bool write = iov_iter_rw(iter) == WRITE;
bool should_dirty = !write && iter_is_iovec(iter);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+
+ ret = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (ret < 0)
+ return ret;
+
+#endif
if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
return -EROFS;
@@ -978,6 +1020,20 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ret = len;
break;
}
+
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ ret = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (ret < 0) {
+ ceph_osdc_put_request(req);
+ break;
+ }
+#endif
+
if (len != size)
osd_req_op_extent_update(req, 0, len);
@@ -1099,6 +1155,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
ret = pos - iocb->ki_pos;
iocb->ki_pos = pos;
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
return ret;
}
@@ -1128,6 +1187,11 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
bool check_caps = false;
struct timespec64 mtime = current_time(inode);
size_t count = iov_iter_count(from);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+#endif
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
return -EROFS;
@@ -1148,6 +1212,13 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ret = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (ret < 0)
+ return ret;
+#endif
+
while ((len = iov_iter_count(from)) > 0) {
size_t left;
int n;
@@ -1192,6 +1263,17 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
goto out;
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = len;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ ret = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (ret < 0)
+ goto out;
+#endif
+
req->r_inode = inode;
osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
@@ -1226,6 +1308,11 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
ret = written;
iocb->ki_pos = pos;
}
+
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
+
return ret;
}
@@ -1648,6 +1735,16 @@ static int ceph_zero_partial_object(struct inode *inode,
int ret = 0;
loff_t zero = 0;
int op;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+ struct queue_item qitem;
+
+ ret = queue_item_init(&qitem, &cephgd_p->data_ops_throttle,
+ DATA_OPS_TB_NUM);
+ if (ret < 0)
+ goto out;
+#endif
if (!length) {
op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
@@ -1656,6 +1753,16 @@ static int ceph_zero_partial_object(struct inode *inode,
op = CEPH_OSD_OP_ZERO;
}
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ qitem.tokens_requested[DATA_OPS_IOPS_IDX] = 1;
+ qitem.tokens_requested[DATA_OPS_BAND_IDX] = *length;
+ dout("%s: tokens_requested: %p, tb_item_num: %d\n", __func__,
+ qitem.tokens_requested, qitem.tb_item_num);
+ ret = get_token_bucket_throttle(&cephgd_p->data_ops_throttle,
+ &qitem);
+ if (ret < 0)
+ goto out;
+#endif
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode),
offset, length,
@@ -1677,6 +1784,9 @@ static int ceph_zero_partial_object(struct inode *inode,
ceph_osdc_put_request(req);
out:
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_free(&qitem);
+#endif
return ret;
}
@@ -19,6 +19,9 @@
#include <linux/ceph/pagelist.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
@@ -683,6 +686,9 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request,
r_kref);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ kfree(req->qitem.tokens_requested);
+#endif
destroy_reply_info(&req->r_reply_info);
if (req->r_request)
ceph_msg_put(req->r_request);
@@ -2030,6 +2036,10 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
{
struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
struct timespec64 ts;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#endif
if (!req)
return ERR_PTR(-ENOMEM);
@@ -2053,6 +2063,10 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
req->r_op = op;
req->r_direct_mode = mode;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ queue_item_init(&req->qitem, &cephgd_p->meta_ops_throttle,
+ META_OPS_TB_NUM);
+#endif
return req;
}
@@ -2703,13 +2717,25 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req)
{
int err;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ struct ceph_group_data* cephgd_p =
+ blkcg_to_cephgd(css_to_blkcg(blkcg_css()));
+#endif
dout("do_request on %p\n", req);
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ req->qitem.tokens_requested[META_OPS_IOPS_IDX] = 1;
+ err = get_token_bucket_throttle(&cephgd_p->meta_ops_throttle,
+ &req->qitem);
+ if (err)
+ goto out;
+#endif
/* issue */
err = ceph_mdsc_submit_request(mdsc, dir, req);
if (!err)
err = ceph_mdsc_wait_request(mdsc, req);
+out:
dout("do_request %p done, result %d\n", req, err);
return err;
}
@@ -15,6 +15,9 @@
#include <linux/ceph/messenger.h>
#include <linux/ceph/mdsmap.h>
#include <linux/ceph/auth.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
/* The first 8 bits are reserved for old ceph releases */
#define CEPHFS_FEATURE_MIMIC 8
@@ -284,6 +287,10 @@ struct ceph_mds_request {
/* unsafe requests that modify the target inode */
struct list_head r_unsafe_target_item;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ /* requests that blocked by the token bucket throttle*/
+ struct queue_item qitem;
+#endif
struct ceph_mds_session *r_session;
int r_attempts; /* resend attempts */
@@ -25,6 +25,9 @@
#include <linux/ceph/mon_client.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+#include <linux/ceph/ceph_io_policy.h>
+#endif
/*
* Ceph superblock operations
@@ -1174,6 +1177,12 @@ static int __init init_ceph(void)
if (ret)
goto out;
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ret = ceph_io_policy_init();
+ if (ret < 0)
+ goto out;
+#endif
+
ceph_flock_init();
ceph_xattr_init();
ret = register_filesystem(&ceph_fs_type);
@@ -1194,6 +1203,9 @@ static int __init init_ceph(void)
static void __exit exit_ceph(void)
{
dout("exit_ceph\n");
+#ifdef CONFIG_CEPH_LIB_IO_POLICY
+ ceph_io_policy_release();
+#endif
unregister_filesystem(&ceph_fs_type);
ceph_xattr_exit();
destroy_caches();
@@ -26,9 +26,6 @@
#include <linux/ceph/decode.h>
#include <linux/ceph/mon_client.h>
#include <linux/ceph/auth.h>
-#ifdef CONFIG_CEPH_LIB_IO_POLICY
-#include <linux/ceph/ceph_io_policy.h>
-#endif
#include "crypto.h"
@@ -779,12 +776,6 @@ static int __init init_ceph_lib(void)
{
int ret = 0;
-#ifdef CONFIG_CEPH_LIB_IO_POLICY
- ret = ceph_io_policy_init();
- if (ret < 0)
- goto out;
-#endif
-
ret = ceph_debugfs_init();
if (ret < 0)
goto out;
@@ -821,10 +812,6 @@ static void __exit exit_ceph_lib(void)
dout("exit_ceph_lib\n");
WARN_ON(!ceph_strings_empty());
-#ifdef CONFIG_CEPH_LIB_IO_POLICY
- ceph_io_policy_release();
-#endif
-
ceph_osdc_cleanup();
ceph_msgr_exit();
ceph_crypto_shutdown();