@@ -626,6 +626,92 @@ static int striped_read(struct inode *inode,
return ret;
}
+int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op)
+{
+ long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout));
+ if (timeleft > 0)
+ return op->result;
+ else
+ return timeleft ? timeleft : -ETIMEDOUT;
+}
+
+bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode,
+ unsigned long start, unsigned long end,
+ bool* repeated_low_endpoint,
+ struct ceph_aggregated_read_op** ag_op)
+{
+ struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end);
+ bool positive_found = false, negative_found = false;
+ while (node_p) {
+ if (node_p->start == start)
+ *repeated_low_endpoint = true;
+ if (node_p->start <= start &&
+ node_p->last >= end) {
+ positive_found = true;
+ break;
+ }
+
+ node_p = interval_tree_iter_next(node_p, start, end);
+ }
+
+ dout("searched positive tree: found: %d\n", positive_found);
+
+ if (!positive_found) {
+ node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment,
+ ULONG_MAX - end,
+ ULONG_MAX - start);
+ while (node_p) {
+ if (node_p->start <= ULONG_MAX - end &&
+ node_p->last >= ULONG_MAX - start) {
+ negative_found = true;
+ break;
+ }
+ node_p = interval_tree_iter_next(node_p,
+ ULONG_MAX - end,
+ ULONG_MAX - start);
+ }
+ }
+
+ dout("searched negative tree: found: %d\n", negative_found);
+
+ if (positive_found)
+ *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node);
+ else if (negative_found)
+ *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node);
+
+ return positive_found || negative_found;
+}
+
+void register_aggregated_read_op(struct ceph_inode_info* cinode,
+ struct ceph_aggregated_read_op* ag_op,
+ bool suppliment)
+{
+ if (suppliment) {
+ interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
+ } else
+ interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops);
+}
+
+void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
+ struct ceph_aggregated_read_op* ag_op,
+ bool suppliment)
+{
+ if (suppliment)
+ interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment);
+ else
+ interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops);
+}
+
+void ceph_put_aggregated_read_op(struct kref* kref)
+{
+ struct ceph_aggregated_read_op* ag_op = container_of(kref,
+ struct ceph_aggregated_read_op,
+ kref);
+ if (ag_op->num_pages)
+ ceph_release_page_vector(ag_op->pages, ag_op->num_pages);
+ kfree(ag_op);
+}
+
/*
* Completely synchronous read and write methods. Direct from __user
* buffer to osd, or directly to user pages (if O_DIRECT).
@@ -637,11 +723,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
+ struct ceph_inode_info* cinode = ceph_inode(inode);
struct page **pages;
u64 off = iocb->ki_pos;
int num_pages;
ssize_t ret;
size_t len = iov_iter_count(to);
+ bool found_previous_req = false;
+ bool repeated_low_endpoint = false;
+ struct ceph_aggregated_read_op *ag_op = NULL;
dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -676,24 +766,82 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
iov_iter_advance(to, 0);
}
ceph_put_page_vector(pages, num_pages, false);
+ } else if ((off + len) < (ULONG_MAX / 2)) {
+ mutex_lock(&cinode->aggregated_ops_lock);
+ dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+ found_previous_req = find_previous_aggregated_read_op(cinode, off,
+ off + len, &repeated_low_endpoint, &ag_op);
+ if (found_previous_req) {
+ dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+ kref_get(&ag_op->kref);
+ mutex_unlock(&cinode->aggregated_ops_lock);
+ ret = ceph_wait_for_aggregated_read_op(ag_op);
+ dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %ld.\n", off, len);
+ } else {
+ dout("ceph_sync_read: no previous aggregated read op, off: %lld, len: %ld.\n", off, len);
+ ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL);
+ kref_init(&ag_op->kref);
+ ag_op->pos_node.start = off;
+ ag_op->pos_node.last = off + len;
+ ag_op->neg_node.start = ULONG_MAX - off - len;
+ ag_op->neg_node.last = ULONG_MAX - off;
+ init_completion(&ag_op->comp);
+ register_aggregated_read_op(cinode, ag_op, repeated_low_endpoint);
+ dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %ld.\n", off, len);
+ mutex_unlock(&cinode->aggregated_ops_lock);
+
+ num_pages = calc_pages_for(off, len);
+ ag_op->pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+ if (IS_ERR(ag_op->pages))
+ return PTR_ERR(ag_op->pages);
+ ag_op->num_pages = num_pages;
+
+ ret = striped_read(inode, off, len, ag_op->pages, num_pages,
+ (off & ~PAGE_MASK), checkeof);
+ dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %ld, ret: %ld.\n", off, len, ret);
+ ag_op->result = ret;
+ complete_all(&ag_op->comp);
+ mutex_lock(&cinode->aggregated_ops_lock);
+ unregister_aggregated_read_op(cinode, ag_op, repeated_low_endpoint);
+ mutex_unlock(&cinode->aggregated_ops_lock);
+ dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %ld, ret: %ld.\n", off, len, ret);
+ }
+ if (ret > 0) {
+ int l, k = (off - ag_op->pos_node.start) >> PAGE_SHIFT;
+ size_t left = min_t(size_t, len, ret);
+
+ while (left) {
+ size_t page_off = off & ~PAGE_MASK;
+ size_t copy = min_t(size_t, left,
+ PAGE_SIZE - page_off);
+ l = copy_page_to_iter(ag_op->pages[k++], page_off,
+ copy, to);
+ off += l;
+ left -= l;
+ if (l < copy)
+ break;
+ }
+ dout("finished copy_page_to_iter: off: %lld, len: %ld\n", off, left);
+ }
+ kref_put(&ag_op->kref, ceph_put_aggregated_read_op);
} else {
num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
-
+
ret = striped_read(inode, off, len, pages, num_pages,
- (off & ~PAGE_MASK), checkeof);
+ (off & ~PAGE_MASK), checkeof);
if (ret > 0) {
int l, k = 0;
size_t left = ret;
-
+
while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, left,
- PAGE_SIZE - page_off);
+ PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off,
- copy, to);
+ copy, to);
off += l;
left -= l;
if (l < copy)
@@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
spin_lock_init(&ci->i_ceph_lock);
mutex_init(&ci->getattrs_inflight_lock);
mutex_init(&ci->lookups_inflight_lock);
+ mutex_init(&ci->aggregated_ops_lock);
ci->i_version = 0;
ci->i_inline_version = 0;
@@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
ci->i_caps = RB_ROOT;
ci->getattrs_inflight = RB_ROOT;
ci->lookups_inflight = RB_ROOT;
+ ci->aggregated_read_ops = RB_ROOT_CACHED;
+ ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED;
ci->i_auth_cap = NULL;
ci->i_dirty_caps = 0;
ci->i_flushing_caps = 0;
@@ -16,6 +16,7 @@
#include <linux/slab.h>
#include <linux/posix_acl.h>
#include <linux/refcount.h>
+#include <linux/interval_tree.h>
#include <linux/ceph/libceph.h>
@@ -285,6 +286,31 @@ struct ceph_inode_xattrs_info {
u64 version, index_version;
};
+struct ceph_aggregated_read_op {
+ struct kref kref;
+ struct page** pages;
+ int num_pages;
+ unsigned long timeout;
+ int result;
+ struct interval_tree_node pos_node, neg_node;
+ struct completion comp;
+};
+
+extern void ceph_put_aggregated_read_op(struct kref* kref);
+
+extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode,
+ unsigned long start, unsigned long end,
+ bool* repeated_low_endpoint,
+ struct ceph_aggregated_read_op** ag_op);
+
+extern void register_aggregated_read_op(struct ceph_inode_info* cinode,
+ struct ceph_aggregated_read_op* ag_op,
+ bool suppliment);
+
+extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode,
+ struct ceph_aggregated_read_op* ag_op,
+ bool suppliment);
+
/*
* Ceph inode.
*/
@@ -292,8 +318,9 @@ struct ceph_inode_info {
struct ceph_vino i_vino; /* ceph ino + snap */
spinlock_t i_ceph_lock;
- struct mutex getattrs_inflight_lock, lookups_inflight_lock;
+ struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock;
struct rb_root getattrs_inflight, lookups_inflight;
+ struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment;
u64 i_version;
u64 i_inline_version;