diff mbox series

[2/2] ceph: aggregate ceph_sync_read requests

Message ID 20181028135742.24668-2-xxhdx1985126@gmail.com (mailing list archive)
State New, archived
Headers show
Series [1/2] ceph: issue getattr/lookup reqs to MDSes in an aggregative pattern | expand

Commit Message

Xuehan Xu Oct. 28, 2018, 1:57 p.m. UTC
From: Xuehan Xu <xuxuehan@360.cn>

As for now, concurrent threads may issue concurrent file read reqs
to MDSes, ignoring the fact that the requested file ranges of some
reqs may be included by previous issued reqs. This commit make those
reqs wait for the previous ones to finish, saving the overhead of
issuing them.

Signed-off-by: Xuehan Xu <xuxuehan@360.cn>
---
 fs/ceph/file.c  | 158 ++++++++++++++++++++++++++++++++++++++++++++++--
 fs/ceph/inode.c |   3 +
 fs/ceph/super.h |  29 ++++++++-
 3 files changed, 184 insertions(+), 6 deletions(-)
diff mbox series

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index ad0bed99b1d5..fb83c037a40f 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -626,6 +626,92 @@  static int striped_read(struct inode *inode,
 	return ret;
 }
 
+int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op)
+{
+	long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout));
+	if (timeleft > 0)
+		return op->result;
+	else
+		return timeleft ? timeleft : -ETIMEDOUT;
+}
+
+bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, 
+				      unsigned long start, unsigned long end, 
+				      bool* repeated_low_endpoint, 
+				      struct ceph_aggregated_read_op** ag_op)
+{
+	struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end);
+	bool positive_found = false, negative_found = false;
+	while (node_p) {
+		if (node_p->start == start)
+			*repeated_low_endpoint = true;
+		if (node_p->start <= start &&
+				node_p->last >= end) {
+			positive_found = true;
+			break;
+		}
+
+		node_p = interval_tree_iter_next(node_p, start, end);
+	}
+
+	dout("searched positive tree: found: %d\n", positive_found);
+
+	if (!positive_found) {
+		node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment, 
+				ULONG_MAX - end, 
+				ULONG_MAX - start);
+		while (node_p) {
+			if (node_p->start <= ULONG_MAX - end &&
+					node_p->last >= ULONG_MAX - start) {
+				negative_found = true;
+				break;
+			}
+			node_p = interval_tree_iter_next(node_p,
+					ULONG_MAX - end,
+					ULONG_MAX - start);
+		}
+	}
+	
+	dout("searched negative tree: found: %d\n", negative_found);
+
+	if (positive_found)
+		*ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node);
+	else if (negative_found)
+		*ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node);
+
+	return positive_found || negative_found;
+}
+
+void register_aggregated_read_op(struct ceph_inode_info* cinode,
+				 struct ceph_aggregated_read_op* ag_op,
+				 bool suppliment)
+{
+	if (suppliment) {
+		interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
+	} else
+		interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops);
+}
+
+void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
+				   struct ceph_aggregated_read_op* ag_op,
+				   bool suppliment)
+{
+	if (suppliment)
+		interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
+	else
+		interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops);
+}
+
+void ceph_put_aggregated_read_op(struct kref* kref)
+{
+	struct ceph_aggregated_read_op* ag_op = container_of(kref, 
+			struct ceph_aggregated_read_op,
+			kref);
+	if (ag_op->num_pages)
+		ceph_release_page_vector(ag_op->pages, ag_op->num_pages);
+	kfree(ag_op);
+}
+
 /*
  * Completely synchronous read and write methods.  Direct from __user
  * buffer to osd, or directly to user pages (if O_DIRECT).
@@ -637,11 +723,15 @@  static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
+	struct ceph_inode_info* cinode = ceph_inode(inode);
 	struct page **pages;
 	u64 off = iocb->ki_pos;
 	int num_pages;
 	ssize_t ret;
 	size_t len = iov_iter_count(to);
+	bool found_previous_req = false;
+	bool repeated_low_endpoint = false;
+	struct ceph_aggregated_read_op *ag_op = NULL;
 
 	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
 	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -676,24 +766,82 @@  static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 			iov_iter_advance(to, 0);
 		}
 		ceph_put_page_vector(pages, num_pages, false);
+	} else if ((off + len) < (ULONG_MAX / 2)) {
+		mutex_lock(&cinode->aggregated_ops_lock);
+		dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+		found_previous_req = find_previous_aggregated_read_op(cinode, off, 
+				off + len, &repeated_low_endpoint, &ag_op);
+		if (found_previous_req) {
+			dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+			kref_get(&ag_op->kref);
+			mutex_unlock(&cinode->aggregated_ops_lock);
+			ret = ceph_wait_for_aggregated_read_op(ag_op);
+			dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %ld.\n", off, len);
+		} else {
+			dout("ceph_sync_read: no previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+			ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL);
+			kref_init(&ag_op->kref);
+			ag_op->pos_node.start = off;
+			ag_op->pos_node.last = off + len;
+			ag_op->neg_node.start = ULONG_MAX - off - len;
+			ag_op->neg_node.last = ULONG_MAX - off;
+			init_completion(&ag_op->comp);
+			register_aggregated_read_op(cinode, ag_op, repeated_low_endpoint);
+			dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %ld.\n", off, len);
+			mutex_unlock(&cinode->aggregated_ops_lock);
+
+			num_pages = calc_pages_for(off, len);
+			ag_op->pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+			if (IS_ERR(ag_op->pages))
+				return PTR_ERR(ag_op->pages);
+			ag_op->num_pages = num_pages;
+
+			ret = striped_read(inode, off, len, ag_op->pages, num_pages,
+     					(off & ~PAGE_MASK), checkeof);
+			dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %ld, ret: %ld.\n", off, len, ret);
+			ag_op->result = ret;
+			complete_all(&ag_op->comp);
+			mutex_lock(&cinode->aggregated_ops_lock);
+			unregister_aggregated_read_op(cinode, ag_op, repeated_low_endpoint);
+			mutex_unlock(&cinode->aggregated_ops_lock);
+			dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %ld, ret: %ld.\n", off, len, ret);
+		}
+		if (ret > 0) {
+			int l, k = (off - ag_op->pos_node.start) >> PAGE_SHIFT;
+			size_t left = min_t(size_t, len, ret);
+
+			while (left) {
+				size_t page_off = off & ~PAGE_MASK;
+				size_t copy = min_t(size_t, left,
+						    PAGE_SIZE - page_off);
+				l = copy_page_to_iter(ag_op->pages[k++], page_off,
+						      copy, to);
+				off += l;
+				left -= l;
+				if (l < copy)
+					break;
+			}
+			dout("finished copy_page_to_iter: off: %lld, len: %ld\n", off, left);
+		}
+		kref_put(&ag_op->kref, ceph_put_aggregated_read_op);
 	} else {
 		num_pages = calc_pages_for(off, len);
 		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 		if (IS_ERR(pages))
 			return PTR_ERR(pages);
-
+		
 		ret = striped_read(inode, off, len, pages, num_pages,
-				   (off & ~PAGE_MASK), checkeof);
+				(off & ~PAGE_MASK), checkeof);
 		if (ret > 0) {
 			int l, k = 0;
 			size_t left = ret;
-
+			
 			while (left) {
 				size_t page_off = off & ~PAGE_MASK;
 				size_t copy = min_t(size_t, left,
-						    PAGE_SIZE - page_off);
+						PAGE_SIZE - page_off);
 				l = copy_page_to_iter(pages[k++], page_off,
-						      copy, to);
+						copy, to);
 				off += l;
 				left -= l;
 				if (l < copy)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index c51e2f186139..21b9bac2d8bb 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -432,6 +432,7 @@  struct inode *ceph_alloc_inode(struct super_block *sb)
 	spin_lock_init(&ci->i_ceph_lock);
 	mutex_init(&ci->getattrs_inflight_lock);
 	mutex_init(&ci->lookups_inflight_lock);
+	mutex_init(&ci->aggregated_ops_lock);
 
 	ci->i_version = 0;
 	ci->i_inline_version = 0;
@@ -465,6 +466,8 @@  struct inode *ceph_alloc_inode(struct super_block *sb)
 	ci->i_caps = RB_ROOT;
 	ci->getattrs_inflight = RB_ROOT;
 	ci->lookups_inflight = RB_ROOT;
+	ci->aggregated_read_ops = RB_ROOT_CACHED;
+	ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED;
 	ci->i_auth_cap = NULL;
 	ci->i_dirty_caps = 0;
 	ci->i_flushing_caps = 0;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index d39234049e88..811f2ab83331 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -16,6 +16,7 @@ 
 #include <linux/slab.h>
 #include <linux/posix_acl.h>
 #include <linux/refcount.h>
+#include <linux/interval_tree.h>
 
 #include <linux/ceph/libceph.h>
 
@@ -285,6 +286,31 @@  struct ceph_inode_xattrs_info {
 	u64 version, index_version;
 };
 
+struct ceph_aggregated_read_op {
+	struct kref kref;
+	struct page** pages;
+	int num_pages;
+	unsigned long timeout;
+	int result;
+	struct interval_tree_node pos_node, neg_node;
+	struct completion comp;
+};
+
+extern void ceph_put_aggregated_read_op(struct kref* kref);
+
+extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode,
+					     unsigned long start, unsigned long end,
+					     bool* repeated_low_endpoint,
+					     struct ceph_aggregated_read_op** ag_op);
+
+extern void register_aggregated_read_op(struct ceph_inode_info* cinode,
+					struct ceph_aggregated_read_op* ag_op,
+					bool suppliment);
+
+extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
+					  struct ceph_aggregated_read_op* ag_op,
+					  bool suppliment);
+
 /*
  * Ceph inode.
  */
@@ -292,8 +318,9 @@  struct ceph_inode_info {
 	struct ceph_vino i_vino;   /* ceph ino + snap */
 
 	spinlock_t i_ceph_lock;
-	struct mutex getattrs_inflight_lock, lookups_inflight_lock;
+	struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock;
 	struct rb_root getattrs_inflight, lookups_inflight;
+	struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment;
 
 	u64 i_version;
 	u64 i_inline_version;