Message ID | 20190225122924.25827-2-xxhdx1985126@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | [1/2] ceph: issue getattr/lookup reqs to MDSes in an aggregative pattern | expand |
On Mon, Feb 25, 2019 at 8:30 PM <xxhdx1985126@gmail.com> wrote: > > From: Xuehan Xu <xuxuehan@360.cn> > > As for now, concurrent threads may issue concurrent file read reqs > to MDSes, ignoring the fact that the requested file ranges of some > reqs may be included by previous issued reqs. This commit make those > reqs wait for the previous ones to finish, saving the overhead of > issuing them. > I think this change will cause problem in following scenario client 1 client 2 process a read process b write, then ask process c in client 1 to read process c read > Signed-off-by: Xuehan Xu <xuxuehan@360.cn> > --- > fs/ceph/file.c | 207 ++++++++++++++++++++++++++++++++++++++++++------ > fs/ceph/inode.c | 3 + > fs/ceph/super.h | 29 ++++++- > 3 files changed, 215 insertions(+), 24 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 189df668b6a0..718ee163dce1 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -557,6 +557,92 @@ enum { > READ_INLINE = 3, > }; > > +int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op) > +{ > + long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout)); > + if (timeleft > 0) > + return op->result; > + else > + return timeleft ? timeleft : -ETIMEDOUT; > +} > + > +bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, > + unsigned long start, unsigned long end, > + bool* repeated_low_endpoint, > + struct ceph_aggregated_read_op** ag_op) > +{ > + struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end); > + bool positive_found = false, negative_found = false; > + while (node_p) { > + if (node_p->start == start) > + *repeated_low_endpoint = true; > + if (node_p->start <= start && > + node_p->last >= end) { > + positive_found = true; > + break; > + } > + > + node_p = interval_tree_iter_next(node_p, start, end); > + } > + > + dout("searched positive tree: found: %d\n", positive_found); > + > + if (!positive_found) { > + node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment, > + ULONG_MAX - end, > + ULONG_MAX - start); > + while (node_p) { > + if (node_p->start <= ULONG_MAX - end && > + node_p->last >= ULONG_MAX - start) { > + negative_found = true; > + break; > + } > + node_p = interval_tree_iter_next(node_p, > + ULONG_MAX - end, > + ULONG_MAX - start); > + } > + } > + > + dout("searched negative tree: found: %d\n", negative_found); > + > + if (positive_found) > + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node); > + else if (negative_found) > + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node); > + > + return positive_found || negative_found; > +} > + > +void register_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment) > +{ > + if (suppliment) { > + interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); > + } else > + interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops); > +} > + > +void unregister_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment) > +{ > + if (suppliment) > + interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); > + else > + interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops); > +} > + > +void ceph_put_aggregated_read_op(struct kref* kref) > +{ > + struct ceph_aggregated_read_op* ag_op = container_of(kref, > + struct ceph_aggregated_read_op, > + kref); > + if (ag_op->num_pages) > + ceph_release_page_vector(ag_op->pages, ag_op->num_pages); > + kfree(ag_op); > +} > + > /* > * Completely synchronous read and write methods. Direct from __user > * buffer to osd, or directly to user pages (if O_DIRECT). > @@ -575,9 +661,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > struct ceph_inode_info *ci = ceph_inode(inode); > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_osd_client *osdc = &fsc->client->osdc; > - ssize_t ret; > - u64 off = iocb->ki_pos; > u64 len = iov_iter_count(to); > + struct page **pages; > + u64 off = iocb->ki_pos; > + int num_pages; > + ssize_t ret; > + bool found_previous_req = false; > + bool repeated_low_endpoint = false; > + bool first_round = true; > + struct ceph_aggregated_read_op *ag_op = NULL; > > dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, > (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); > @@ -595,24 +687,46 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > return ret; > > ret = 0; > - while ((len = iov_iter_count(to)) > 0) { > + if (likely(!iov_iter_is_pipe(to)) && (off + len) < (ULONG_MAX / 2)) { > + mutex_lock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %lld.\n", off, len); > + found_previous_req = find_previous_aggregated_read_op(ci, off, > + off + len, &repeated_low_endpoint, &ag_op); > + if (found_previous_req) { > + dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %lld.\n", off, len); > + kref_get(&ag_op->kref); > + mutex_unlock(&ci->aggregated_ops_lock); > + ret = ceph_wait_for_aggregated_read_op(ag_op); > + dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %lld.\n", off, len); > + } else { > + ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL); > + kref_init(&ag_op->kref); > + ag_op->pos_node.start = off; > + ag_op->pos_node.last = off + len; > + ag_op->neg_node.start = ULONG_MAX - off - len; > + ag_op->neg_node.last = ULONG_MAX - off; > + init_completion(&ag_op->comp); > + register_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %lld.\n", off, len); > + mutex_unlock(&ci->aggregated_ops_lock); > + } > + } > + > + while (!found_previous_req && (len = iov_iter_count(to)) > 0) { > struct ceph_osd_request *req; > - struct page **pages; > - int num_pages; > size_t page_off; > u64 i_size; > bool more; > > req = ceph_osdc_new_request(osdc, &ci->i_layout, > - ci->i_vino, off, &len, 0, 1, > - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, > - NULL, ci->i_truncate_seq, > - ci->i_truncate_size, false); > + ci->i_vino, off, &len, 0, 1, > + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, > + NULL, ci->i_truncate_seq, > + ci->i_truncate_size, false); > if (IS_ERR(req)) { > ret = PTR_ERR(req); > break; > } > - > more = len < iov_iter_count(to); > > if (unlikely(iov_iter_is_pipe(to))) { > @@ -630,34 +744,51 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > more = false; > } > } else { > + int last_num_pages = num_pages; > num_pages = calc_pages_for(off, len); > page_off = off & ~PAGE_MASK; > - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); > - if (IS_ERR(pages)) { > - ceph_osdc_put_request(req); > - ret = PTR_ERR(pages); > - break; > + > + if (first_round) { > + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); > + if (IS_ERR(pages)) { > + ceph_osdc_put_request(req); > + ret = PTR_ERR(pages); > + dout("ceph_sync_read: aggregated read op got err, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + ag_op->result = ret; > + complete_all(&ag_op->comp); > + mutex_lock(&ci->aggregated_ops_lock); > + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + mutex_unlock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); > + break; > + } > + ag_op->pages = pages; > + ag_op->num_pages = num_pages; > + } else { > + pages += last_num_pages; > + if (page_off) > + pages--; > } > } > - > osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, > - false, false); > + false, false); > ret = ceph_osdc_start_request(osdc, req, false); > if (!ret) > ret = ceph_osdc_wait_request(osdc, req); > ceph_osdc_put_request(req); > - > + > i_size = i_size_read(inode); > dout("sync_read %llu~%llu got %zd i_size %llu%s\n", > - off, len, ret, i_size, (more ? " MORE" : "")); > - > + off, len, ret, i_size, (more ? " MORE" : "")); > + > if (ret == -ENOENT) > ret = 0; > if (ret >= 0 && ret < len && (off + ret < i_size)) { > int zlen = min(len - ret, i_size - off - ret); > int zoff = page_off + ret; > dout("sync_read zero gap %llu~%llu\n", > - off + ret, off + ret + zlen); > + off + ret, off + ret + zlen); > ceph_zero_page_vector_range(zoff, zlen, pages); > ret += zlen; > } > @@ -686,12 +817,42 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, > break; > } > } > - ceph_release_page_vector(pages, num_pages); > } > > - if (ret <= 0 || off >= i_size || !more) > + if (ret <= 0 || off >= i_size || !more) { > + dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > + ag_op->result = ret; > + complete_all(&ag_op->comp); > + mutex_lock(&ci->aggregated_ops_lock); > + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); > + mutex_unlock(&ci->aggregated_ops_lock); > + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); > break; > + } > + first_round = false; > + } > + > + if (found_previous_req) { > + int idx = (off >> PAGE_SHIFT) - (ag_op->pos_node.start >> PAGE_SHIFT); > + size_t left = ret > 0 ? ret : 0, page_off; > + while (left > 0) { > + size_t len, copied; > + page_off = off & ~PAGE_MASK; > + len = min_t(size_t, left, PAGE_SIZE - page_off); > + dout("%s: copy pages after waiting, idx: %d, off: %lld, len: %ld," > + " ag_op: %p, ag_op->pages: %p, ag_op->num_pages: %d\n", > + __func__, idx, off, len, ag_op, ag_op->pages, ag_op->num_pages); > + copied = copy_page_to_iter(ag_op->pages[idx++], > + page_off, len, to); > + off += copied; > + left -= copied; > + if (copied < len) { > + ret = -EFAULT; > + break; > + } > + } > } > + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); > > if (off > iocb->ki_pos) { > if (ret >= 0 && > diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c > index 8705f0645a24..1dfc0afa1dde 100644 > --- a/fs/ceph/inode.c > +++ b/fs/ceph/inode.c > @@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) > spin_lock_init(&ci->i_ceph_lock); > mutex_init(&ci->getattrs_inflight_lock); > mutex_init(&ci->lookups_inflight_lock); > + mutex_init(&ci->aggregated_ops_lock); > > ci->i_version = 0; > ci->i_inline_version = 0; > @@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) > ci->i_caps = RB_ROOT; > ci->getattrs_inflight = RB_ROOT; > ci->lookups_inflight = RB_ROOT; > + ci->aggregated_read_ops = RB_ROOT_CACHED; > + ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED; > ci->i_auth_cap = NULL; > ci->i_dirty_caps = 0; > ci->i_flushing_caps = 0; > diff --git a/fs/ceph/super.h b/fs/ceph/super.h > index abf761f2a122..a7ed8deaf836 100644 > --- a/fs/ceph/super.h > +++ b/fs/ceph/super.h > @@ -16,6 +16,7 @@ > #include <linux/slab.h> > #include <linux/posix_acl.h> > #include <linux/refcount.h> > +#include <linux/interval_tree.h> > > #include <linux/ceph/libceph.h> > > @@ -289,6 +290,31 @@ struct ceph_inode_xattrs_info { > u64 version, index_version; > }; > > +struct ceph_aggregated_read_op { > + struct kref kref; > + struct page** pages; > + int num_pages; > + unsigned long timeout; > + int result; > + struct interval_tree_node pos_node, neg_node; > + struct completion comp; > +}; > + > +extern void ceph_put_aggregated_read_op(struct kref* kref); > + > +extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, > + unsigned long start, unsigned long end, > + bool* repeated_low_endpoint, > + struct ceph_aggregated_read_op** ag_op); > + > +extern void register_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment); > + > +extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode, > + struct ceph_aggregated_read_op* ag_op, > + bool suppliment); > + > /* > * Ceph inode. > */ > @@ -296,8 +322,9 @@ struct ceph_inode_info { > struct ceph_vino i_vino; /* ceph ino + snap */ > > spinlock_t i_ceph_lock; > - struct mutex getattrs_inflight_lock, lookups_inflight_lock; > + struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock; > struct rb_root getattrs_inflight, lookups_inflight; > + struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment; > > u64 i_version; > u64 i_inline_version; > -- > 2.19.1 >
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 189df668b6a0..718ee163dce1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -557,6 +557,92 @@ enum { READ_INLINE = 3, }; +int ceph_wait_for_aggregated_read_op (struct ceph_aggregated_read_op* op) +{ + long timeleft = wait_for_completion_killable_timeout(&op->comp, ceph_timeout_jiffies(op->timeout)); + if (timeleft > 0) + return op->result; + else + return timeleft ? timeleft : -ETIMEDOUT; +} + +bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, + unsigned long start, unsigned long end, + bool* repeated_low_endpoint, + struct ceph_aggregated_read_op** ag_op) +{ + struct interval_tree_node* node_p = interval_tree_iter_first(&cinode->aggregated_read_ops, start, end); + bool positive_found = false, negative_found = false; + while (node_p) { + if (node_p->start == start) + *repeated_low_endpoint = true; + if (node_p->start <= start && + node_p->last >= end) { + positive_found = true; + break; + } + + node_p = interval_tree_iter_next(node_p, start, end); + } + + dout("searched positive tree: found: %d\n", positive_found); + + if (!positive_found) { + node_p = interval_tree_iter_first(&cinode->aggregated_read_ops_suppliment, + ULONG_MAX - end, + ULONG_MAX - start); + while (node_p) { + if (node_p->start <= ULONG_MAX - end && + node_p->last >= ULONG_MAX - start) { + negative_found = true; + break; + } + node_p = interval_tree_iter_next(node_p, + ULONG_MAX - end, + ULONG_MAX - start); + } + } + + dout("searched negative tree: found: %d\n", negative_found); + + if (positive_found) + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, pos_node); + else if (negative_found) + *ag_op = container_of(node_p, struct ceph_aggregated_read_op, neg_node); + + return positive_found || negative_found; +} + +void register_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment) +{ + if (suppliment) { + interval_tree_insert(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); + } else + interval_tree_insert(&ag_op->pos_node, &cinode->aggregated_read_ops); +} + +void unregister_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment) +{ + if (suppliment) + interval_tree_remove(&ag_op->neg_node, &cinode->aggregated_read_ops_suppliment); + else + interval_tree_remove(&ag_op->pos_node, &cinode->aggregated_read_ops); +} + +void ceph_put_aggregated_read_op(struct kref* kref) +{ + struct ceph_aggregated_read_op* ag_op = container_of(kref, + struct ceph_aggregated_read_op, + kref); + if (ag_op->num_pages) + ceph_release_page_vector(ag_op->pages, ag_op->num_pages); + kfree(ag_op); +} + /* * Completely synchronous read and write methods. Direct from __user * buffer to osd, or directly to user pages (if O_DIRECT). @@ -575,9 +661,15 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_osd_client *osdc = &fsc->client->osdc; - ssize_t ret; - u64 off = iocb->ki_pos; u64 len = iov_iter_count(to); + struct page **pages; + u64 off = iocb->ki_pos; + int num_pages; + ssize_t ret; + bool found_previous_req = false; + bool repeated_low_endpoint = false; + bool first_round = true; + struct ceph_aggregated_read_op *ag_op = NULL; dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -595,24 +687,46 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, return ret; ret = 0; - while ((len = iov_iter_count(to)) > 0) { + if (likely(!iov_iter_is_pipe(to)) && (off + len) < (ULONG_MAX / 2)) { + mutex_lock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: trying to find previous aggregated read op, off: %lld, len: %lld.\n", off, len); + found_previous_req = find_previous_aggregated_read_op(ci, off, + off + len, &repeated_low_endpoint, &ag_op); + if (found_previous_req) { + dout("ceph_sync_read: found previous aggregated read op, off: %lld, len: %lld.\n", off, len); + kref_get(&ag_op->kref); + mutex_unlock(&ci->aggregated_ops_lock); + ret = ceph_wait_for_aggregated_read_op(ag_op); + dout("ceph_sync_read: waited aggregated read op, off: %lld, len: %lld.\n", off, len); + } else { + ag_op = kzalloc(sizeof(struct ceph_aggregated_read_op), GFP_KERNEL); + kref_init(&ag_op->kref); + ag_op->pos_node.start = off; + ag_op->pos_node.last = off + len; + ag_op->neg_node.start = ULONG_MAX - off - len; + ag_op->neg_node.last = ULONG_MAX - off; + init_completion(&ag_op->comp); + register_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + dout("ceph_sync_read: register new aggregated read op, off: %lld, len: %lld.\n", off, len); + mutex_unlock(&ci->aggregated_ops_lock); + } + } + + while (!found_previous_req && (len = iov_iter_count(to)) > 0) { struct ceph_osd_request *req; - struct page **pages; - int num_pages; size_t page_off; u64 i_size; bool more; req = ceph_osdc_new_request(osdc, &ci->i_layout, - ci->i_vino, off, &len, 0, 1, - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, - NULL, ci->i_truncate_seq, - ci->i_truncate_size, false); + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); if (IS_ERR(req)) { ret = PTR_ERR(req); break; } - more = len < iov_iter_count(to); if (unlikely(iov_iter_is_pipe(to))) { @@ -630,34 +744,51 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, more = false; } } else { + int last_num_pages = num_pages; num_pages = calc_pages_for(off, len); page_off = off & ~PAGE_MASK; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) { - ceph_osdc_put_request(req); - ret = PTR_ERR(pages); - break; + + if (first_round) { + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + dout("ceph_sync_read: aggregated read op got err, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + ag_op->result = ret; + complete_all(&ag_op->comp); + mutex_lock(&ci->aggregated_ops_lock); + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + mutex_unlock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); + break; + } + ag_op->pages = pages; + ag_op->num_pages = num_pages; + } else { + pages += last_num_pages; + if (page_off) + pages--; } } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, - false, false); + false, false); ret = ceph_osdc_start_request(osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(osdc, req); ceph_osdc_put_request(req); - + i_size = i_size_read(inode); dout("sync_read %llu~%llu got %zd i_size %llu%s\n", - off, len, ret, i_size, (more ? " MORE" : "")); - + off, len, ret, i_size, (more ? " MORE" : "")); + if (ret == -ENOENT) ret = 0; if (ret >= 0 && ret < len && (off + ret < i_size)) { int zlen = min(len - ret, i_size - off - ret); int zoff = page_off + ret; dout("sync_read zero gap %llu~%llu\n", - off + ret, off + ret + zlen); + off + ret, off + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); ret += zlen; } @@ -686,12 +817,42 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, break; } } - ceph_release_page_vector(pages, num_pages); } - if (ret <= 0 || off >= i_size || !more) + if (ret <= 0 || off >= i_size || !more) { + dout("ceph_sync_read: aggregated read op striped_readed, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); + ag_op->result = ret; + complete_all(&ag_op->comp); + mutex_lock(&ci->aggregated_ops_lock); + unregister_aggregated_read_op(ci, ag_op, repeated_low_endpoint); + mutex_unlock(&ci->aggregated_ops_lock); + dout("ceph_sync_read: unregistered aggregated read op, off: %lld, len: %lld, ret: %ld.\n", off, len, ret); break; + } + first_round = false; + } + + if (found_previous_req) { + int idx = (off >> PAGE_SHIFT) - (ag_op->pos_node.start >> PAGE_SHIFT); + size_t left = ret > 0 ? ret : 0, page_off; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + dout("%s: copy pages after waiting, idx: %d, off: %lld, len: %ld," + " ag_op: %p, ag_op->pages: %p, ag_op->num_pages: %d\n", + __func__, idx, off, len, ag_op, ag_op->pages, ag_op->num_pages); + copied = copy_page_to_iter(ag_op->pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; + break; + } + } } + kref_put(&ag_op->kref, ceph_put_aggregated_read_op); if (off > iocb->ki_pos) { if (ret >= 0 && diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 8705f0645a24..1dfc0afa1dde 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -432,6 +432,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) spin_lock_init(&ci->i_ceph_lock); mutex_init(&ci->getattrs_inflight_lock); mutex_init(&ci->lookups_inflight_lock); + mutex_init(&ci->aggregated_ops_lock); ci->i_version = 0; ci->i_inline_version = 0; @@ -465,6 +466,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_caps = RB_ROOT; ci->getattrs_inflight = RB_ROOT; ci->lookups_inflight = RB_ROOT; + ci->aggregated_read_ops = RB_ROOT_CACHED; + ci->aggregated_read_ops_suppliment = RB_ROOT_CACHED; ci->i_auth_cap = NULL; ci->i_dirty_caps = 0; ci->i_flushing_caps = 0; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index abf761f2a122..a7ed8deaf836 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -16,6 +16,7 @@ #include <linux/slab.h> #include <linux/posix_acl.h> #include <linux/refcount.h> +#include <linux/interval_tree.h> #include <linux/ceph/libceph.h> @@ -289,6 +290,31 @@ struct ceph_inode_xattrs_info { u64 version, index_version; }; +struct ceph_aggregated_read_op { + struct kref kref; + struct page** pages; + int num_pages; + unsigned long timeout; + int result; + struct interval_tree_node pos_node, neg_node; + struct completion comp; +}; + +extern void ceph_put_aggregated_read_op(struct kref* kref); + +extern bool find_previous_aggregated_read_op(struct ceph_inode_info* cinode, + unsigned long start, unsigned long end, + bool* repeated_low_endpoint, + struct ceph_aggregated_read_op** ag_op); + +extern void register_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment); + +extern void unregister_aggregated_read_op(struct ceph_inode_info* cinode, + struct ceph_aggregated_read_op* ag_op, + bool suppliment); + /* * Ceph inode. */ @@ -296,8 +322,9 @@ struct ceph_inode_info { struct ceph_vino i_vino; /* ceph ino + snap */ spinlock_t i_ceph_lock; - struct mutex getattrs_inflight_lock, lookups_inflight_lock; + struct mutex getattrs_inflight_lock, lookups_inflight_lock, aggregated_ops_lock; struct rb_root getattrs_inflight, lookups_inflight; + struct rb_root_cached aggregated_read_ops, aggregated_read_ops_suppliment; u64 i_version; u64 i_inline_version;