@@ -185,6 +185,7 @@ static int ceph_releasepage(struct page *page, gfp_t g)
struct ceph_fscache_req {
struct fscache_io_request fscache_req;
+ struct ceph_snap_context *snapc;
refcount_t ref;
};
@@ -376,77 +377,6 @@ static int ceph_readpage(struct file *filp, struct page *page)
return err;
}
-/* read a single page, without unlocking it. */
-static int ceph_do_readpage(struct file *filp, struct page *page)
-{
- struct inode *inode = file_inode(filp);
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_osd_client *osdc = &fsc->client->osdc;
- struct ceph_osd_request *req;
- struct ceph_vino vino = ceph_vino(inode);
- int err = 0;
- u64 off = page_offset(page);
- u64 len = PAGE_SIZE;
-
- if (off >= i_size_read(inode)) {
- zero_user_segment(page, 0, PAGE_SIZE);
- SetPageUptodate(page);
- return 0;
- }
-
- if (ci->i_inline_version != CEPH_INLINE_NONE) {
- /*
- * Uptodate inline data should have been added
- * into page cache while getting Fcr caps.
- */
- if (off == 0)
- return -EINVAL;
- zero_user_segment(page, 0, PAGE_SIZE);
- SetPageUptodate(page);
- return 0;
- }
-
- dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
- vino.ino, vino.snap, filp, off, len, page, page->index);
- req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
- CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
- ci->i_truncate_seq, ci->i_truncate_size,
- false);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
-
- err = ceph_osdc_start_request(osdc, req, false);
- if (!err)
- err = ceph_osdc_wait_request(osdc, req);
-
- ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
- req->r_end_latency, err);
-
- ceph_osdc_put_request(req);
- dout("readpage result %d\n", err);
-
- if (err == -ENOENT)
- err = 0;
- if (err < 0) {
- SetPageError(page);
- if (err == -EBLACKLISTED)
- fsc->blacklisted = true;
- goto out;
- }
- if (err < PAGE_SIZE)
- /* zero fill remainder of page */
- zero_user_segment(page, err, PAGE_SIZE);
- else
- flush_dcache_page(page);
-
- SetPageUptodate(page);
-out:
- return err < 0 ? err : 0;
-}
-
/*
* Finish an async read(ahead) op.
*/
@@ -1473,6 +1403,30 @@ ceph_find_incompatible(struct inode *inode, struct page *page)
return NULL;
}
+static int ceph_fsreq_is_req_valid(struct fscache_io_request *fsreq)
+{
+ struct ceph_snap_context *snapc;
+ struct ceph_fscache_req *req = container_of(fsreq, struct ceph_fscache_req, fscache_req);
+
+ snapc = ceph_find_incompatible(fsreq->mapping->host, fsreq->no_unlock_page);
+ if (snapc) {
+ if (IS_ERR(snapc))
+ return PTR_ERR(snapc);
+ req->snapc = snapc;
+ return -EAGAIN;
+ }
+ return 0;
+}
+
+const struct fscache_io_request_ops ceph_read_for_write_fsreq_ops = {
+ .issue_op = ceph_fsreq_issue_op,
+ .reshape = ceph_fsreq_reshape,
+ .is_req_valid = ceph_fsreq_is_req_valid,
+ .done = ceph_fsreq_done,
+ .get = ceph_fsreq_get,
+ .put = ceph_fsreq_put,
+};
+
/*
* We are only allowed to write into/dirty the page if the page is
* clean, or already dirty within the same snap context.
@@ -1483,76 +1437,131 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_snap_context *snapc;
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct fscache_cookie *cookie = ceph_fscache_cookie(ci);
struct page *page = NULL;
pgoff_t index = pos >> PAGE_SHIFT;
- loff_t page_off = pos & PAGE_MASK;
int pos_in_page = pos & ~PAGE_MASK;
- int end_in_page = pos_in_page + len;
- loff_t i_size;
int r;
-refind:
- /* get a page */
- page = grab_cache_page_write_begin(mapping, index, 0);
- if (!page)
- return -ENOMEM;
- dout("write_begin file %p inode %p page %p %d~%d\n", file,
- inode, page, (int)pos, (int)len);
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ /*
+ * In principle, we should never get here, as the inode should have been uninlined
+ * before we're allowed to write to the page (in write_iter or page_mkwrite).
+ */
+ WARN_ONCE(1, "ceph: write_begin called on still-inlined inode!\n");
- for (;;) {
- snapc = ceph_find_incompatible(inode, page);
- if (snapc) {
- if (IS_ERR(snapc)) {
- r = PTR_ERR(snapc);
- break;
- }
- unlock_page(page);
- ceph_queue_writeback(inode);
- r = wait_event_killable(ci->i_cap_wq,
- context_is_writeable_or_written(inode, snapc));
- ceph_put_snap_context(snapc);
- put_page(page);
- goto refind;
+ /*
+ * Uptodate inline data should have been added
+ * into page cache while getting Fcr caps.
+ */
+ if (index == 0) {
+ r = -EINVAL;
+ goto out;
}
- if (PageUptodate(page)) {
- dout(" page %p already uptodate\n", page);
- break;
+ page = grab_cache_page_write_begin(mapping, index, 0);
+ if (!page)
+ return -ENOMEM;
+
+ zero_user_segment(page, 0, PAGE_SIZE);
+ SetPageUptodate(page);
+ r = 0;
+ goto out;
+ }
+
+ do {
+ struct ceph_fscache_req *req;
+ struct ceph_snap_context *snapc = NULL;
+
+ page = pagecache_get_page(mapping, index, FGP_WRITE, 0);
+ if (page) {
+ r = 0;
+ if (PageUptodate(page)) {
+ lock_page(page);
+ if (PageUptodate(page))
+ goto out;
+ unlock_page(page);
+ }
}
- /* full page? */
- if (pos_in_page == 0 && len == PAGE_SIZE)
- break;
+ /*
+ * In some cases we don't need to read at all:
+ * - full page write
+ * - write that lies completely beyond EOF
+ * - write that covers the the page from start to EOF or beyond it
+ */
+ if ((pos_in_page == 0 && len == PAGE_SIZE) ||
+ (pos >= i_size_read(inode)) ||
+ (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
+ if (!page) {
+ page = grab_cache_page_write_begin(mapping, index, 0);
+ if (!page) {
+ r = -ENOMEM;
+ break;
+ }
+ } else {
+ lock_page(page);
+ }
+
+ snapc = ceph_find_incompatible(inode, page);
+ if (!snapc) {
+ zero_user_segments(page, 0, pos_in_page,
+ pos_in_page + len, PAGE_SIZE);
+ r = 0;
+ goto out;
+ }
+
+ unlock_page(page);
- /* past end of file? */
- i_size = i_size_read(inode);
- if (page_off >= i_size ||
- (pos_in_page == 0 && (pos+len) >= i_size &&
- end_in_page - pos_in_page != PAGE_SIZE)) {
- dout(" zeroing %p 0 - %d and %d - %d\n",
- page, pos_in_page, end_in_page, (int)PAGE_SIZE);
- zero_user_segments(page,
- 0, pos_in_page,
- end_in_page, PAGE_SIZE);
+ if (IS_ERR(snapc)) {
+ r = PTR_ERR(snapc);
+ goto out;
+ }
+ goto flush_incompat;
+ }
+
+ req = ceph_fsreq_alloc();
+ if (!req) {
+ unlock_page(page);
+ r = -ENOMEM;
break;
}
- /* we need to read it. */
- r = ceph_do_readpage(file, page);
- if (r) {
- if (r == -EINPROGRESS)
- continue;
+ /*
+ * Do the read. If we find out that we need to wait on writeback, then kick that
+ * off, wait for it and then resubmit the read.
+ */
+ fscache_init_io_request(&req->fscache_req, cookie, &ceph_read_for_write_fsreq_ops);
+ req->fscache_req.mapping = inode->i_mapping;
+
+ r = fscache_read_helper_for_write(&req->fscache_req, &page, index,
+ fsc->mount_options->rsize >> PAGE_SHIFT, 0);
+ if (r != -EAGAIN) {
+ if (r == 0)
+ r = wait_on_bit(&req->fscache_req.flags,
+ FSCACHE_IO_READ_IN_PROGRESS, TASK_KILLABLE);
+ ceph_fsreq_put(&req->fscache_req);
break;
}
- }
+ BUG_ON(!req->snapc);
+ snapc = ceph_get_snap_context(req->snapc);
+ ceph_fsreq_put(&req->fscache_req);
+flush_incompat:
+ put_page(page);
+ page = NULL;
+ ceph_queue_writeback(inode);
+ r = wait_event_killable(ci->i_cap_wq,
+ context_is_writeable_or_written(inode, snapc));
+ ceph_put_snap_context(snapc);
+ } while (r == 0);
+out:
if (r < 0) {
- if (page) {
- unlock_page(page);
+ if (page)
put_page(page);
- }
} else {
+ WARN_ON_ONCE(!PageLocked(page));
*pagep = page;
}
return r;
Plug write_begin into the read helper routine. This requires adding a new is_req_valid op that we can use to vet whether there is an incompatible snap context that needs to be flushed before we can fill the page. Signed-off-by: Jeff Layton <jlayton@kernel.org> --- fs/ceph/addr.c | 251 +++++++++++++++++++++++++------------------------ 1 file changed, 130 insertions(+), 121 deletions(-)