@@ -1395,6 +1395,245 @@ int ceph_process_folio_batch(struct address_space *mapping,
return rc;
}
+static inline
+void ceph_shift_unused_folios_left(struct folio_batch *fbatch)
+{
+ unsigned j, n = 0;
+
+ /* shift unused page to beginning of fbatch */
+ for (j = 0; j < folio_batch_count(fbatch); j++) {
+ if (!fbatch->folios[j])
+ continue;
+
+ if (n < j) {
+ fbatch->folios[n] = fbatch->folios[j];
+ }
+
+ n++;
+ }
+
+ fbatch->nr = n;
+}
+
+static
+int ceph_submit_write(struct address_space *mapping,
+ struct writeback_control *wbc,
+ struct ceph_writeback_ctl *ceph_wbc)
+{
+ struct inode *inode = mapping->host;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
+ struct ceph_client *cl = fsc->client;
+ struct ceph_vino vino = ceph_vino(inode);
+ struct ceph_osd_request *req = NULL;
+ struct page *page = NULL;
+ bool caching = ceph_is_cache_enabled(inode);
+ u64 offset;
+ u64 len;
+ unsigned i;
+
+new_request:
+ offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]);
+ len = ceph_wbc->wsize;
+
+ req = ceph_osdc_new_request(&fsc->client->osdc,
+ &ci->i_layout, vino,
+ offset, &len, 0, ceph_wbc->num_ops,
+ CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+ ceph_wbc->snapc, ceph_wbc->truncate_seq,
+ ceph_wbc->truncate_size, false);
+ if (IS_ERR(req)) {
+ req = ceph_osdc_new_request(&fsc->client->osdc,
+ &ci->i_layout, vino,
+ offset, &len, 0,
+ min(ceph_wbc->num_ops,
+ CEPH_OSD_SLAB_OPS),
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE,
+ ceph_wbc->snapc,
+ ceph_wbc->truncate_seq,
+ ceph_wbc->truncate_size,
+ true);
+ BUG_ON(IS_ERR(req));
+ }
+
+ page = ceph_wbc->pages[ceph_wbc->locked_pages - 1];
+ BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset);
+
+ if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
+ for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) {
+ struct folio *folio = ceph_wbc->fbatch.folios[i];
+
+ if (!folio)
+ continue;
+
+ page = &folio->page;
+ redirty_page_for_writepage(wbc, page);
+ unlock_page(page);
+ }
+
+ for (i = 0; i < ceph_wbc->locked_pages; i++) {
+ page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
+
+ if (!page)
+ continue;
+
+ redirty_page_for_writepage(wbc, page);
+ unlock_page(page);
+ }
+
+ ceph_osdc_put_request(req);
+ return -EIO;
+ }
+
+ req->r_callback = writepages_finish;
+ req->r_inode = inode;
+
+ /* Format the osd request message and submit the write */
+ len = 0;
+ ceph_wbc->data_pages = ceph_wbc->pages;
+ ceph_wbc->op_idx = 0;
+ for (i = 0; i < ceph_wbc->locked_pages; i++) {
+ u64 cur_offset;
+
+ page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
+ cur_offset = page_offset(page);
+
+ /*
+ * Discontinuity in page range? Ceph can handle that by just passing
+ * multiple extents in the write op.
+ */
+ if (offset + len != cur_offset) {
+ /* If it's full, stop here */
+ if (ceph_wbc->op_idx + 1 == req->r_num_ops)
+ break;
+
+ /* Kick off an fscache write with what we have so far. */
+ ceph_fscache_write_to_cache(inode, offset, len, caching);
+
+ /* Start a new extent */
+ osd_req_op_extent_dup_last(req, ceph_wbc->op_idx,
+ cur_offset - offset);
+
+ doutc(cl, "got pages at %llu~%llu\n", offset, len);
+
+ osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
+ ceph_wbc->data_pages,
+ len, 0,
+ ceph_wbc->from_pool,
+ false);
+ osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
+
+ len = 0;
+ offset = cur_offset;
+ ceph_wbc->data_pages = ceph_wbc->pages + i;
+ ceph_wbc->op_idx++;
+ }
+
+ set_page_writeback(page);
+
+ if (caching)
+ ceph_set_page_fscache(page);
+
+ len += thp_size(page);
+ }
+
+ ceph_fscache_write_to_cache(inode, offset, len, caching);
+
+ if (ceph_wbc->size_stable) {
+ len = min(len, ceph_wbc->i_size - offset);
+ } else if (i == ceph_wbc->locked_pages) {
+ /* writepages_finish() clears writeback pages
+ * according to the data length, so make sure
+ * data length covers all locked pages */
+ u64 min_len = len + 1 - thp_size(page);
+ len = get_writepages_data_length(inode,
+ ceph_wbc->pages[i - 1],
+ offset);
+ len = max(len, min_len);
+ }
+
+ if (IS_ENCRYPTED(inode))
+ len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE);
+
+ doutc(cl, "got pages at %llu~%llu\n", offset, len);
+
+ if (IS_ENCRYPTED(inode) &&
+ ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) {
+ pr_warn_client(cl,
+ "bad encrypted write offset=%lld len=%llu\n",
+ offset, len);
+ }
+
+ osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
+ ceph_wbc->data_pages, len,
+ 0, ceph_wbc->from_pool, false);
+ osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
+
+ BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops);
+
+ ceph_wbc->from_pool = false;
+ if (i < ceph_wbc->locked_pages) {
+ BUG_ON(ceph_wbc->num_ops <= req->r_num_ops);
+ ceph_wbc->num_ops -= req->r_num_ops;
+ ceph_wbc->locked_pages -= i;
+
+ /* allocate new pages array for next request */
+ ceph_wbc->data_pages = ceph_wbc->pages;
+ __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages);
+ memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i,
+ ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
+ memset(ceph_wbc->data_pages + i, 0,
+ ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
+ } else {
+ BUG_ON(ceph_wbc->num_ops != req->r_num_ops);
+ /* request message now owns the pages array */
+ ceph_wbc->pages = NULL;
+ }
+
+ req->r_mtime = inode_get_mtime(inode);
+ ceph_osdc_start_request(&fsc->client->osdc, req);
+ req = NULL;
+
+ wbc->nr_to_write -= i;
+ if (ceph_wbc->pages)
+ goto new_request;
+
+ return 0;
+}
+
+static
+void ceph_wait_until_current_writes_complete(struct address_space *mapping,
+ struct writeback_control *wbc,
+ struct ceph_writeback_ctl *ceph_wbc)
+{
+ struct page *page;
+ unsigned i, nr;
+
+ if (wbc->sync_mode != WB_SYNC_NONE &&
+ ceph_wbc->start_index == 0 && /* all dirty pages were checked */
+ !ceph_wbc->head_snapc) {
+ ceph_wbc->index = 0;
+
+ while ((ceph_wbc->index <= ceph_wbc->end) &&
+ (nr = filemap_get_folios_tag(mapping,
+ &ceph_wbc->index,
+ (pgoff_t)-1,
+ PAGECACHE_TAG_WRITEBACK,
+ &ceph_wbc->fbatch))) {
+ for (i = 0; i < nr; i++) {
+ page = &ceph_wbc->fbatch.folios[i]->page;
+ if (page_snap_context(page) != ceph_wbc->snapc)
+ continue;
+ wait_on_page_writeback(page);
+ }
+
+ folio_batch_release(&ceph_wbc->fbatch);
+ cond_resched();
+ }
+ }
+}
+
/*
* initiate async writeback
*/
@@ -1402,17 +1641,12 @@ static int ceph_writepages_start(struct address_space *mapping,
struct writeback_control *wbc)
{
struct inode *inode = mapping->host;
- struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
struct ceph_client *cl = fsc->client;
- struct ceph_vino vino = ceph_vino(inode);
struct ceph_writeback_ctl ceph_wbc;
- struct ceph_osd_request *req = NULL;
int rc = 0;
- bool caching = ceph_is_cache_enabled(inode);
- if (wbc->sync_mode == WB_SYNC_NONE &&
- fsc->write_congested)
+ if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested)
return 0;
doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode),
@@ -1439,9 +1673,6 @@ static int ceph_writepages_start(struct address_space *mapping,
tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);
while (!has_writeback_done(&ceph_wbc)) {
- unsigned i;
- struct page *page;
-
ceph_wbc.locked_pages = 0;
ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT;
@@ -1459,6 +1690,7 @@ static int ceph_writepages_start(struct address_space *mapping,
if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages)
break;
+process_folio_batch:
rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc);
if (rc)
goto release_folios;
@@ -1466,187 +1698,30 @@ static int ceph_writepages_start(struct address_space *mapping,
/* did we get anything? */
if (!ceph_wbc.locked_pages)
goto release_folios;
- if (i) {
- unsigned j, n = 0;
- /* shift unused page to beginning of fbatch */
- for (j = 0; j < ceph_wbc.nr_folios; j++) {
- if (!ceph_wbc.fbatch.folios[j])
- continue;
- if (n < j) {
- ceph_wbc.fbatch.folios[n] =
- ceph_wbc.fbatch.folios[j];
- }
- n++;
- }
- ceph_wbc.fbatch.nr = n;
- if (ceph_wbc.nr_folios && i == ceph_wbc.nr_folios &&
+ if (ceph_wbc.processed_in_fbatch) {
+ ceph_shift_unused_folios_left(&ceph_wbc.fbatch);
+
+ if (folio_batch_count(&ceph_wbc.fbatch) == 0 &&
ceph_wbc.locked_pages < ceph_wbc.max_pages) {
doutc(cl, "reached end fbatch, trying for more\n");
- folio_batch_release(&ceph_wbc.fbatch);
goto get_more_pages;
}
}
-new_request:
- ceph_wbc.offset = ceph_fscrypt_page_offset(ceph_wbc.pages[0]);
- ceph_wbc.len = ceph_wbc.wsize;
-
- req = ceph_osdc_new_request(&fsc->client->osdc,
- &ci->i_layout, vino,
- ceph_wbc.offset, &ceph_wbc.len,
- 0, ceph_wbc.num_ops,
- CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
- ceph_wbc.snapc, ceph_wbc.truncate_seq,
- ceph_wbc.truncate_size, false);
- if (IS_ERR(req)) {
- req = ceph_osdc_new_request(&fsc->client->osdc,
- &ci->i_layout, vino,
- ceph_wbc.offset, &ceph_wbc.len,
- 0, min(ceph_wbc.num_ops,
- CEPH_OSD_SLAB_OPS),
- CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_WRITE,
- ceph_wbc.snapc,
- ceph_wbc.truncate_seq,
- ceph_wbc.truncate_size, true);
- BUG_ON(IS_ERR(req));
- }
- BUG_ON(ceph_wbc.len <
- ceph_fscrypt_page_offset(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) +
- thp_size(ceph_wbc.pages[ceph_wbc.locked_pages - 1]) -
- ceph_wbc.offset);
-
- if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
- rc = -EIO;
+ rc = ceph_submit_write(mapping, wbc, &ceph_wbc);
+ if (rc)
goto release_folios;
- }
- req->r_callback = writepages_finish;
- req->r_inode = inode;
-
- /* Format the osd request message and submit the write */
- ceph_wbc.len = 0;
- ceph_wbc.data_pages = ceph_wbc.pages;
- ceph_wbc.op_idx = 0;
- for (i = 0; i < ceph_wbc.locked_pages; i++) {
- struct page *page =
- ceph_fscrypt_pagecache_page(ceph_wbc.pages[i]);
-
- u64 cur_offset = page_offset(page);
- /*
- * Discontinuity in page range? Ceph can handle that by just passing
- * multiple extents in the write op.
- */
- if (ceph_wbc.offset + ceph_wbc.len != cur_offset) {
- /* If it's full, stop here */
- if (ceph_wbc.op_idx + 1 == req->r_num_ops)
- break;
-
- /* Kick off an fscache write with what we have so far. */
- ceph_fscache_write_to_cache(inode, ceph_wbc.offset,
- ceph_wbc.len, caching);
-
- /* Start a new extent */
- osd_req_op_extent_dup_last(req, ceph_wbc.op_idx,
- cur_offset -
- ceph_wbc.offset);
- doutc(cl, "got pages at %llu~%llu\n",
- ceph_wbc.offset,
- ceph_wbc.len);
- osd_req_op_extent_osd_data_pages(req,
- ceph_wbc.op_idx,
- ceph_wbc.data_pages,
- ceph_wbc.len, 0,
- ceph_wbc.from_pool, false);
- osd_req_op_extent_update(req, ceph_wbc.op_idx,
- ceph_wbc.len);
-
- ceph_wbc.len = 0;
- ceph_wbc.offset = cur_offset;
- ceph_wbc.data_pages = ceph_wbc.pages + i;
- ceph_wbc.op_idx++;
- }
-
- set_page_writeback(page);
- if (caching)
- ceph_set_page_fscache(page);
- ceph_wbc.len += thp_size(page);
- }
- ceph_fscache_write_to_cache(inode, ceph_wbc.offset,
- ceph_wbc.len, caching);
-
- if (ceph_wbc.size_stable) {
- ceph_wbc.len = min(ceph_wbc.len,
- ceph_wbc.i_size - ceph_wbc.offset);
- } else if (i == ceph_wbc.locked_pages) {
- /* writepages_finish() clears writeback pages
- * according to the data length, so make sure
- * data length covers all locked pages */
- u64 min_len = ceph_wbc.len + 1 - thp_size(page);
- ceph_wbc.len =
- get_writepages_data_length(inode,
- ceph_wbc.pages[i - 1],
- ceph_wbc.offset);
- ceph_wbc.len = max(ceph_wbc.len, min_len);
- }
- if (IS_ENCRYPTED(inode)) {
- ceph_wbc.len = round_up(ceph_wbc.len,
- CEPH_FSCRYPT_BLOCK_SIZE);
- }
- doutc(cl, "got pages at %llu~%llu\n",
- ceph_wbc.offset, ceph_wbc.len);
+ ceph_wbc.locked_pages = 0;
+ ceph_wbc.strip_unit_end = 0;
- if (IS_ENCRYPTED(inode) &&
- ((ceph_wbc.offset | ceph_wbc.len) & ~CEPH_FSCRYPT_BLOCK_MASK))
- pr_warn_client(cl,
- "bad encrypted write offset=%lld len=%llu\n",
- ceph_wbc.offset, ceph_wbc.len);
-
- osd_req_op_extent_osd_data_pages(req, ceph_wbc.op_idx,
- ceph_wbc.data_pages,
- ceph_wbc.len,
- 0, ceph_wbc.from_pool, false);
- osd_req_op_extent_update(req, ceph_wbc.op_idx, ceph_wbc.len);
-
- BUG_ON(ceph_wbc.op_idx + 1 != req->r_num_ops);
-
- ceph_wbc.from_pool = false;
- if (i < ceph_wbc.locked_pages) {
- BUG_ON(ceph_wbc.num_ops <= req->r_num_ops);
- ceph_wbc.num_ops -= req->r_num_ops;
- ceph_wbc.locked_pages -= i;
-
- /* allocate new pages array for next request */
- ceph_wbc.data_pages = ceph_wbc.pages;
- ceph_wbc.pages = kmalloc_array(ceph_wbc.locked_pages,
- sizeof(*ceph_wbc.pages),
- GFP_NOFS);
- if (!ceph_wbc.pages) {
- ceph_wbc.from_pool = true;
- ceph_wbc.pages =
- mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
- BUG_ON(!ceph_wbc.pages);
- }
- memcpy(ceph_wbc.pages, ceph_wbc.data_pages + i,
- ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages));
- memset(ceph_wbc.data_pages + i, 0,
- ceph_wbc.locked_pages * sizeof(*ceph_wbc.pages));
- } else {
- BUG_ON(ceph_wbc.num_ops != req->r_num_ops);
- ceph_wbc.index = ceph_wbc.pages[i - 1]->index + 1;
- /* request message now owns the pages array */
- ceph_wbc.pages = NULL;
+ if (folio_batch_count(&ceph_wbc.fbatch) > 0) {
+ ceph_wbc.nr_folios =
+ folio_batch_count(&ceph_wbc.fbatch);
+ goto process_folio_batch;
}
- req->r_mtime = inode_get_mtime(inode);
- ceph_osdc_start_request(&fsc->client->osdc, req);
- req = NULL;
-
- wbc->nr_to_write -= i;
- if (ceph_wbc.pages)
- goto new_request;
-
/*
* We stop writing back only if we are not doing
* integrity sync. In case of integrity sync we have to
@@ -1666,32 +1741,12 @@ static int ceph_writepages_start(struct address_space *mapping,
if (ceph_wbc.should_loop && !ceph_wbc.done) {
/* more to do; loop back to beginning of file */
doutc(cl, "looping back to beginning of file\n");
- ceph_wbc.end = ceph_wbc.start_index - 1; /* OK even when start_index == 0 */
+ /* OK even when start_index == 0 */
+ ceph_wbc.end = ceph_wbc.start_index - 1;
/* to write dirty pages associated with next snapc,
* we need to wait until current writes complete */
- if (wbc->sync_mode != WB_SYNC_NONE &&
- ceph_wbc.start_index == 0 && /* all dirty pages were checked */
- !ceph_wbc.head_snapc) {
- struct page *page;
- unsigned i, nr;
- ceph_wbc.index = 0;
- while ((ceph_wbc.index <= ceph_wbc.end) &&
- (nr = filemap_get_folios_tag(mapping,
- &ceph_wbc.index,
- (pgoff_t)-1,
- PAGECACHE_TAG_WRITEBACK,
- &ceph_wbc.fbatch))) {
- for (i = 0; i < nr; i++) {
- page = &ceph_wbc.fbatch.folios[i]->page;
- if (page_snap_context(page) != ceph_wbc.snapc)
- continue;
- wait_on_page_writeback(page);
- }
- folio_batch_release(&ceph_wbc.fbatch);
- cond_resched();
- }
- }
+ ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc);
ceph_wbc.start_index = 0;
ceph_wbc.index = 0;
@@ -1702,15 +1757,13 @@ static int ceph_writepages_start(struct address_space *mapping,
mapping->writeback_index = ceph_wbc.index;
out:
- ceph_osdc_put_request(req);
ceph_put_snap_context(ceph_wbc.last_snapc);
doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode),
rc);
+
return rc;
}
-
-
/*
* See if a given @snapc is either writeable, or already written.
*/