diff mbox series

[v3,13/13] mm: zswap: Compress batching with Intel IAA in zswap_store() of large folios.

Message ID 20241106192105.6731-14-kanchana.p.sridhar@intel.com (mailing list archive)
State Under Review
Delegated to: Herbert Xu
Headers show
Series zswap IAA compress batching | expand

Commit Message

Sridhar, Kanchana P Nov. 6, 2024, 7:21 p.m. UTC
If the system has Intel IAA, and if sysctl vm.compress-batching is set to
"1", zswap_store() will call crypto_acomp_batch_compress() to compress up
to SWAP_CRYPTO_BATCH_SIZE (i.e. 8) pages in large folios in parallel using
the multiple compress engines available in IAA hardware.

On platforms with multiple IAA devices per socket, compress jobs from all
cores in a socket will be distributed among all IAA devices on the socket
by the iaa_crypto driver.

With deflate-iaa configured as the zswap compressor, and
sysctl vm.compress-batching is enabled, the first time zswap_store() has to
swapout a large folio on any given cpu, it will allocate the
pool->acomp_batch_ctx resources on that cpu, and set pool->can_batch_comp
to BATCH_COMP_ENABLED. It will then proceed to call the main
__zswap_store_batch_core() compress batching function. Subsequent calls to
zswap_store() on the same cpu will go ahead and use the acomp_batch_ctx by
checking the pool->can_batch_comp status.

Hence, we allocate the per-cpu pool->acomp_batch_ctx resources only on an
as-needed basis, to reduce memory footprint cost. The cost is not incurred
on cores that never get to swapout a large folio.

This patch introduces the main __zswap_store_batch_core() function for
compress batching. This interface represents the extensible compress
batching architecture that can potentially be called with a batch of
any-order folios from shrink_folio_list(). In other words, although
zswap_store() calls __zswap_store_batch_core() with exactly one large folio
in this patch, we can reuse this interface to reclaim a batch of folios, to
significantly improve the reclaim path efficiency due to IAA's parallel
compression capability.

The newly added functions that implement batched stores follow the
general structure of zswap_store() of a large folio. Some amount of
restructuring and optimization is done to minimize failure points
for a batch, fail early and maximize the zswap store pipeline occupancy
with SWAP_CRYPTO_BATCH_SIZE pages, potentially from multiple
folios. This is intended to maximize reclaim throughput with the IAA
hardware parallel compressions.

Signed-off-by: Kanchana P Sridhar <kanchana.p.sridhar@intel.com>
---
 include/linux/zswap.h |  84 ++++++
 mm/zswap.c            | 625 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 709 insertions(+)

Comments

Johannes Weiner Nov. 7, 2024, 6:16 p.m. UTC | #1
On Wed, Nov 06, 2024 at 11:21:05AM -0800, Kanchana P Sridhar wrote:
> If the system has Intel IAA, and if sysctl vm.compress-batching is set to
> "1", zswap_store() will call crypto_acomp_batch_compress() to compress up
> to SWAP_CRYPTO_BATCH_SIZE (i.e. 8) pages in large folios in parallel using
> the multiple compress engines available in IAA hardware.
> 
> On platforms with multiple IAA devices per socket, compress jobs from all
> cores in a socket will be distributed among all IAA devices on the socket
> by the iaa_crypto driver.
> 
> With deflate-iaa configured as the zswap compressor, and
> sysctl vm.compress-batching is enabled, the first time zswap_store() has to
> swapout a large folio on any given cpu, it will allocate the
> pool->acomp_batch_ctx resources on that cpu, and set pool->can_batch_comp
> to BATCH_COMP_ENABLED. It will then proceed to call the main
> __zswap_store_batch_core() compress batching function. Subsequent calls to
> zswap_store() on the same cpu will go ahead and use the acomp_batch_ctx by
> checking the pool->can_batch_comp status.
> 
> Hence, we allocate the per-cpu pool->acomp_batch_ctx resources only on an
> as-needed basis, to reduce memory footprint cost. The cost is not incurred
> on cores that never get to swapout a large folio.
> 
> This patch introduces the main __zswap_store_batch_core() function for
> compress batching. This interface represents the extensible compress
> batching architecture that can potentially be called with a batch of
> any-order folios from shrink_folio_list(). In other words, although
> zswap_store() calls __zswap_store_batch_core() with exactly one large folio
> in this patch, we can reuse this interface to reclaim a batch of folios, to
> significantly improve the reclaim path efficiency due to IAA's parallel
> compression capability.
> 
> The newly added functions that implement batched stores follow the
> general structure of zswap_store() of a large folio. Some amount of
> restructuring and optimization is done to minimize failure points
> for a batch, fail early and maximize the zswap store pipeline occupancy
> with SWAP_CRYPTO_BATCH_SIZE pages, potentially from multiple
> folios. This is intended to maximize reclaim throughput with the IAA
> hardware parallel compressions.
> 
> Signed-off-by: Kanchana P Sridhar <kanchana.p.sridhar@intel.com>
> ---
>  include/linux/zswap.h |  84 ++++++
>  mm/zswap.c            | 625 ++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 709 insertions(+)
> 
> diff --git a/include/linux/zswap.h b/include/linux/zswap.h
> index 9ad27ab3d222..6d3ef4780c69 100644
> --- a/include/linux/zswap.h
> +++ b/include/linux/zswap.h
> @@ -31,6 +31,88 @@ struct zswap_lruvec_state {
>  	atomic_long_t nr_disk_swapins;
>  };
>  
> +/*
> + * struct zswap_store_sub_batch_page:
> + *
> + * This represents one "zswap batching element", namely, the
> + * attributes associated with a page in a large folio that will
> + * be compressed and stored in zswap. The term "batch" is reserved
> + * for a conceptual "batch" of folios that can be sent to
> + * zswap_store() by reclaim. The term "sub-batch" is used to describe
> + * a collection of "zswap batching elements", i.e., an array of
> + * "struct zswap_store_sub_batch_page *".
> + *
> + * The zswap compress sub-batch size is specified by
> + * SWAP_CRYPTO_BATCH_SIZE, currently set as 8UL if the
> + * platform has Intel IAA. This means zswap can store a large folio
> + * by creating sub-batches of up to 8 pages and compressing this
> + * batch using IAA to parallelize the 8 compress jobs in hardware.
> + * For e.g., a 64KB folio can be compressed as 2 sub-batches of
> + * 8 pages each. This can significantly improve the zswap_store()
> + * performance for large folios.
> + *
> + * Although the page itself is represented directly, the structure
> + * adds a "u8 batch_idx" to represent an index for the folio in a
> + * conceptual "batch of folios" that can be passed to zswap_store().
> + * Conceptually, this allows for up to 256 folios that can be passed
> + * to zswap_store(). If this conceptual number of folios sent to
> + * zswap_store() exceeds 256, the "batch_idx" needs to become u16.
> + */
> +struct zswap_store_sub_batch_page {
> +	u8 batch_idx;
> +	swp_entry_t swpentry;
> +	struct obj_cgroup *objcg;
> +	struct zswap_entry *entry;
> +	int error; /* folio error status. */
> +};
> +
> +/*
> + * struct zswap_store_pipeline_state:
> + *
> + * This stores state during IAA compress batching of (conceptually, a batch of)
> + * folios. The term pipelining in this context, refers to breaking down
> + * the batch of folios being reclaimed into sub-batches of
> + * SWAP_CRYPTO_BATCH_SIZE pages, batch compressing and storing the
> + * sub-batch. This concept could be further evolved to use overlap of CPU
> + * computes with IAA computes. For instance, we could stage the post-compress
> + * computes for sub-batch "N-1" to happen in parallel with IAA batch
> + * compression of sub-batch "N".
> + *
> + * We begin by developing the concept of compress batching. Pipelining with
> + * overlap can be future work.
> + *
> + * @errors: The errors status for the batch of reclaim folios passed in from
> + *          a higher mm layer such as swap_writepage().
> + * @pool: A valid zswap_pool.
> + * @acomp_ctx: The per-cpu pointer to the crypto_acomp_ctx for the @pool.
> + * @sub_batch: This is an array that represents the sub-batch of up to
> + *             SWAP_CRYPTO_BATCH_SIZE pages that are being stored
> + *             in zswap.
> + * @comp_dsts: The destination buffers for crypto_acomp_compress() for each
> + *             page being compressed.
> + * @comp_dlens: The destination buffers' lengths from crypto_acomp_compress()
> + *              obtained after crypto_acomp_poll() returns completion status,
> + *              for each page being compressed.
> + * @comp_errors: Compression errors for each page being compressed.
> + * @nr_comp_pages: Total number of pages in @sub_batch.
> + *
> + * Note:
> + * The max sub-batch size is SWAP_CRYPTO_BATCH_SIZE, currently 8UL.
> + * Hence, if SWAP_CRYPTO_BATCH_SIZE exceeds 256, some of the
> + * u8 members (except @comp_dsts) need to become u16.
> + */
> +struct zswap_store_pipeline_state {
> +	int *errors;
> +	struct zswap_pool *pool;
> +	struct crypto_acomp_ctx *acomp_ctx;
> +	struct zswap_store_sub_batch_page *sub_batch;
> +	struct page **comp_pages;
> +	u8 **comp_dsts;
> +	unsigned int *comp_dlens;
> +	int *comp_errors;
> +	u8 nr_comp_pages;
> +};

Why are these in the public header?

>  unsigned long zswap_total_pages(void);
>  bool zswap_store(struct folio *folio);
>  bool zswap_load(struct folio *folio);
> @@ -45,6 +127,8 @@ bool zswap_never_enabled(void);
>  #else
>  
>  struct zswap_lruvec_state {};
> +struct zswap_store_sub_batch_page {};
> +struct zswap_store_pipeline_state {};
> 
>  static inline bool zswap_store(struct folio *folio)
>  {
> diff --git a/mm/zswap.c b/mm/zswap.c
> index 2af736e38213..538aac3fb552 100644
> --- a/mm/zswap.c
> +++ b/mm/zswap.c
> @@ -255,6 +255,12 @@ static int zswap_create_acomp_ctx(unsigned int cpu,
>  				  char *tfm_name,
>  				  unsigned int nr_reqs);
>  
> +static bool __zswap_store_batch_core(
> +	int node_id,
> +	struct folio **folios,
> +	int *errors,
> +	unsigned int nr_folios);
> +

Please reorder the functions to avoid forward decls.

>  /*********************************
>  * pool functions
>  **********************************/
> @@ -1626,6 +1632,12 @@ static ssize_t zswap_store_page(struct page *page,
>  	return -EINVAL;
>  }
>  
> +/*
> + * Modified to use the IAA compress batching framework implemented in
> + * __zswap_store_batch_core() if sysctl vm.compress-batching is 1.
> + * The batching code is intended to significantly improve folio store
> + * performance over the sequential code.

This isn't helpful, please delete.

>  bool zswap_store(struct folio *folio)
>  {
>  	long nr_pages = folio_nr_pages(folio);
> @@ -1638,6 +1650,38 @@ bool zswap_store(struct folio *folio)
>  	bool ret = false;
>  	long index;
>  
> +	/*
> +	 * Improve large folio zswap_store() latency with IAA compress batching,
> +	 * if this is enabled by setting sysctl vm.compress-batching to "1".
> +	 * If enabled, the large folio's pages are compressed in parallel in
> +	 * batches of SWAP_CRYPTO_BATCH_SIZE pages. If disabled, every page in
> +	 * the large folio is compressed sequentially.
> +	 */

Same here. Reduce to "Try to batch compress large folios, fall back to
processing individual subpages if that fails."

> +	if (folio_test_large(folio) && READ_ONCE(compress_batching)) {
> +		pool = zswap_pool_current_get();

There is an existing zswap_pool_current_get() in zswap_store(), please
reorder the sequence so you don't need to add an extra one.

> +		if (!pool) {
> +			pr_err("Cannot setup acomp_batch_ctx for compress batching: no current pool found\n");

This is unnecessary.

> +			goto sequential_store;
> +		}
> +
> +		if (zswap_pool_can_batch(pool)) {

This function is introduced in another patch, where it isn't
used. Please add functions and callers in the same patch.

> +			int error = -1;
> +			bool store_batch = __zswap_store_batch_core(
> +						folio_nid(folio),
> +						&folio, &error, 1);
> +
> +			if (store_batch) {
> +				zswap_pool_put(pool);
> +				if (!error)
> +					ret = true;
> +				return ret;
> +			}
> +		}

Please don't future proof code like this, only implement what is
strictly necessary for the functionality in this patch. You're only
adding a single caller with nr_folios=1, so it shouldn't be a
parameter, and the function shouldn't have a that batch_idx loop.

> +		zswap_pool_put(pool);
> +	}
> +
> +sequential_store:
> +
>  	VM_WARN_ON_ONCE(!folio_test_locked(folio));
>  	VM_WARN_ON_ONCE(!folio_test_swapcache(folio));
>  
> @@ -1724,6 +1768,587 @@ bool zswap_store(struct folio *folio)
>  	return ret;
>  }
>  
> +/*
> + * Note: If SWAP_CRYPTO_BATCH_SIZE exceeds 256, change the
> + * u8 stack variables in the next several functions, to u16.
> + */
> +
> +/*
> + * Propagate the "sbp" error condition to other batch elements belonging to
> + * the same folio as "sbp".
> + */
> +static __always_inline void zswap_store_propagate_errors(
> +	struct zswap_store_pipeline_state *zst,
> +	u8 error_batch_idx)
> +{

Please observe surrounding coding style on how to wrap >80 col
function signatures.

Don't use __always_inline unless there is a clear, spelled out
performance reason. Since it's an error path, that's doubtful.

Please use a consistent namespace for all this:

CONFIG_ZSWAP_BATCH
zswap_batch_store()
zswap_batch_alloc_entries()
zswap_batch_add_folios()
zswap_batch_compress()

etc.

Again, order to avoid forward decls.

Try to keep the overall sequence of events between zswap_store() and
zswap_batch_store() similar as much as possible for readability.
Johannes Weiner Nov. 7, 2024, 6:53 p.m. UTC | #2
On Wed, Nov 06, 2024 at 11:21:05AM -0800, Kanchana P Sridhar wrote:
> +static void zswap_zpool_store_sub_batch(
> +	struct zswap_store_pipeline_state *zst)

There is a zswap_store_sub_batch() below, which does something
completely different. Naming is hard, but please invest a bit more
time into this to make this readable.

> +{
> +	u8 i;
> +
> +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> +		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
> +		struct zpool *zpool;
> +		unsigned long handle;
> +		char *buf;
> +		gfp_t gfp;
> +		int err;
> +
> +		/* Skip pages that had compress errors. */
> +		if (sbp->error)
> +			continue;
> +
> +		zpool = zst->pool->zpool;
> +		gfp = __GFP_NORETRY | __GFP_NOWARN | __GFP_KSWAPD_RECLAIM;
> +		if (zpool_malloc_support_movable(zpool))
> +			gfp |= __GFP_HIGHMEM | __GFP_MOVABLE;
> +		err = zpool_malloc(zpool, zst->comp_dlens[i], gfp, &handle);
> +
> +		if (err) {
> +			if (err == -ENOSPC)
> +				zswap_reject_compress_poor++;
> +			else
> +				zswap_reject_alloc_fail++;
> +
> +			/*
> +			 * An error should be propagated to other pages of the
> +			 * same folio in the sub-batch, and zpool resources for
> +			 * those pages (in sub-batch order prior to this zpool
> +			 * error) should be de-allocated.
> +			 */
> +			zswap_store_propagate_errors(zst, sbp->batch_idx);
> +			continue;
> +		}
> +
> +		buf = zpool_map_handle(zpool, handle, ZPOOL_MM_WO);
> +		memcpy(buf, zst->comp_dsts[i], zst->comp_dlens[i]);
> +		zpool_unmap_handle(zpool, handle);
> +
> +		sbp->entry->handle = handle;
> +		sbp->entry->length = zst->comp_dlens[i];
> +	}
> +}
> +
> +/*
> + * Returns true if the entry was successfully
> + * stored in the xarray, and false otherwise.
> + */
> +static bool zswap_store_entry(swp_entry_t page_swpentry,
> +			      struct zswap_entry *entry)
> +{
> +	struct zswap_entry *old = xa_store(swap_zswap_tree(page_swpentry),
> +					   swp_offset(page_swpentry),
> +					   entry, GFP_KERNEL);
> +	if (xa_is_err(old)) {
> +		int err = xa_err(old);
> +
> +		WARN_ONCE(err != -ENOMEM, "unexpected xarray error: %d\n", err);
> +		zswap_reject_alloc_fail++;
> +		return false;
> +	}
> +
> +	/*
> +	 * We may have had an existing entry that became stale when
> +	 * the folio was redirtied and now the new version is being
> +	 * swapped out. Get rid of the old.
> +	 */
> +	if (old)
> +		zswap_entry_free(old);
> +
> +	return true;
> +}
> +
> +static void zswap_batch_compress_post_proc(
> +	struct zswap_store_pipeline_state *zst)
> +{
> +	int nr_objcg_pages = 0, nr_pages = 0;
> +	struct obj_cgroup *objcg = NULL;
> +	size_t compressed_bytes = 0;
> +	u8 i;
> +
> +	zswap_zpool_store_sub_batch(zst);
> +
> +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> +		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
> +
> +		if (sbp->error)
> +			continue;
> +
> +		if (!zswap_store_entry(sbp->swpentry, sbp->entry)) {
> +			zswap_store_propagate_errors(zst, sbp->batch_idx);
> +			continue;
> +		}
> +
> +		/*
> +		 * The entry is successfully compressed and stored in the tree,
> +		 * there is no further possibility of failure. Grab refs to the
> +		 * pool and objcg. These refs will be dropped by
> +		 * zswap_entry_free() when the entry is removed from the tree.
> +		 */
> +		zswap_pool_get(zst->pool);
> +		if (sbp->objcg)
> +			obj_cgroup_get(sbp->objcg);
> +
> +		/*
> +		 * We finish initializing the entry while it's already in xarray.
> +		 * This is safe because:
> +		 *
> +		 * 1. Concurrent stores and invalidations are excluded by folio
> +		 *    lock.
> +		 *
> +		 * 2. Writeback is excluded by the entry not being on the LRU yet.
> +		 *    The publishing order matters to prevent writeback from seeing
> +		 *    an incoherent entry.
> +		 */
> +		sbp->entry->pool = zst->pool;
> +		sbp->entry->swpentry = sbp->swpentry;
> +		sbp->entry->objcg = sbp->objcg;
> +		sbp->entry->referenced = true;
> +		if (sbp->entry->length) {
> +			INIT_LIST_HEAD(&sbp->entry->lru);
> +			zswap_lru_add(&zswap_list_lru, sbp->entry);
> +		}
> +
> +		if (!objcg && sbp->objcg) {
> +			objcg = sbp->objcg;
> +		} else if (objcg && sbp->objcg && (objcg != sbp->objcg)) {
> +			obj_cgroup_charge_zswap(objcg, compressed_bytes);
> +			count_objcg_events(objcg, ZSWPOUT, nr_objcg_pages);
> +			compressed_bytes = 0;
> +			nr_objcg_pages = 0;
> +			objcg = sbp->objcg;
> +		}
> +
> +		if (sbp->objcg) {
> +			compressed_bytes += sbp->entry->length;
> +			++nr_objcg_pages;
> +		}
> +
> +		++nr_pages;
> +	} /* for sub-batch pages. */
> +
> +	if (objcg) {
> +		obj_cgroup_charge_zswap(objcg, compressed_bytes);
> +		count_objcg_events(objcg, ZSWPOUT, nr_objcg_pages);
> +	}
> +
> +	atomic_long_add(nr_pages, &zswap_stored_pages);
> +	count_vm_events(ZSWPOUT, nr_pages);
> +}
> +
> +static void zswap_store_sub_batch(struct zswap_store_pipeline_state *zst)
> +{
> +	u8 i;
> +
> +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> +		zst->comp_dsts[i] = zst->acomp_ctx->buffers[i];
> +		zst->comp_dlens[i] = PAGE_SIZE;
> +	} /* for sub-batch pages. */
> +
> +	/*
> +	 * Batch compress sub-batch "N". If IAA is the compressor, the
> +	 * hardware will compress multiple pages in parallel.
> +	 */
> +	zswap_compress_batch(zst);
> +
> +	zswap_batch_compress_post_proc(zst);

The control flow here is a mess. Keep loops over the same batch at the
same function level. IOW, pull the nr_comp_pages loop out of
zswap_batch_compress_post_proc() and call the function from the loop.

Also give it a more descriptive name. If that's hard to do, then
you're probably doing too many different things in it. Create
functions for a specific purpose, don't carve up sequences at
arbitrary points.

My impression after trying to read this is that the existing
zswap_store() sequence could be a subset of the batched store, where
you can reuse most code to get the pool, charge the cgroup, allocate
entries, store entries, bump the stats etc. for both cases. Alas, your
naming choices make it a bit difficult to be sure.

Please explore this direction. Don't worry about the CONFIG symbol for
now, we can still look at this later.

Right now, it's basically

	if (special case)
		lots of duplicative code in slightly different order
	regular store sequence

and that isn't going to be maintainable.

Look for a high-level sequence that makes sense for both cases. E.g.:

	if (!zswap_enabled)
		goto check_old;

	get objcg

	check limits

	allocate memcg list lru

	for each batch {
		for each entry {
			allocate entry
			acquire objcg ref
			acquire pool ref
		}
		compress
		for each entry {
			store in tree
			add to lru
			bump stats and counters
		}
	}

	put objcg

	return true;

check_error:
	...

and then set up the two loops such that they also makes sense when the
folio is just a single page.
Sridhar, Kanchana P Nov. 7, 2024, 10:32 p.m. UTC | #3
> -----Original Message-----
> From: Johannes Weiner <hannes@cmpxchg.org>
> Sent: Thursday, November 7, 2024 10:16 AM
> To: Sridhar, Kanchana P <kanchana.p.sridhar@intel.com>
> Cc: linux-kernel@vger.kernel.org; linux-mm@kvack.org;
> yosryahmed@google.com; nphamcs@gmail.com;
> chengming.zhou@linux.dev; usamaarif642@gmail.com;
> ryan.roberts@arm.com; Huang, Ying <ying.huang@intel.com>;
> 21cnbao@gmail.com; akpm@linux-foundation.org; linux-
> crypto@vger.kernel.org; herbert@gondor.apana.org.au;
> davem@davemloft.net; clabbe@baylibre.com; ardb@kernel.org;
> ebiggers@google.com; surenb@google.com; Accardi, Kristen C
> <kristen.c.accardi@intel.com>; zanussi@kernel.org; Feghali, Wajdi K
> <wajdi.k.feghali@intel.com>; Gopal, Vinodh <vinodh.gopal@intel.com>
> Subject: Re: [PATCH v3 13/13] mm: zswap: Compress batching with Intel IAA
> in zswap_store() of large folios.
> 
> On Wed, Nov 06, 2024 at 11:21:05AM -0800, Kanchana P Sridhar wrote:
> > If the system has Intel IAA, and if sysctl vm.compress-batching is set to
> > "1", zswap_store() will call crypto_acomp_batch_compress() to compress up
> > to SWAP_CRYPTO_BATCH_SIZE (i.e. 8) pages in large folios in parallel using
> > the multiple compress engines available in IAA hardware.
> >
> > On platforms with multiple IAA devices per socket, compress jobs from all
> > cores in a socket will be distributed among all IAA devices on the socket
> > by the iaa_crypto driver.
> >
> > With deflate-iaa configured as the zswap compressor, and
> > sysctl vm.compress-batching is enabled, the first time zswap_store() has to
> > swapout a large folio on any given cpu, it will allocate the
> > pool->acomp_batch_ctx resources on that cpu, and set pool-
> >can_batch_comp
> > to BATCH_COMP_ENABLED. It will then proceed to call the main
> > __zswap_store_batch_core() compress batching function. Subsequent calls
> to
> > zswap_store() on the same cpu will go ahead and use the acomp_batch_ctx
> by
> > checking the pool->can_batch_comp status.
> >
> > Hence, we allocate the per-cpu pool->acomp_batch_ctx resources only on
> an
> > as-needed basis, to reduce memory footprint cost. The cost is not incurred
> > on cores that never get to swapout a large folio.
> >
> > This patch introduces the main __zswap_store_batch_core() function for
> > compress batching. This interface represents the extensible compress
> > batching architecture that can potentially be called with a batch of
> > any-order folios from shrink_folio_list(). In other words, although
> > zswap_store() calls __zswap_store_batch_core() with exactly one large folio
> > in this patch, we can reuse this interface to reclaim a batch of folios, to
> > significantly improve the reclaim path efficiency due to IAA's parallel
> > compression capability.
> >
> > The newly added functions that implement batched stores follow the
> > general structure of zswap_store() of a large folio. Some amount of
> > restructuring and optimization is done to minimize failure points
> > for a batch, fail early and maximize the zswap store pipeline occupancy
> > with SWAP_CRYPTO_BATCH_SIZE pages, potentially from multiple
> > folios. This is intended to maximize reclaim throughput with the IAA
> > hardware parallel compressions.
> >
> > Signed-off-by: Kanchana P Sridhar <kanchana.p.sridhar@intel.com>
> > ---
> >  include/linux/zswap.h |  84 ++++++
> >  mm/zswap.c            | 625
> ++++++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 709 insertions(+)
> >
> > diff --git a/include/linux/zswap.h b/include/linux/zswap.h
> > index 9ad27ab3d222..6d3ef4780c69 100644
> > --- a/include/linux/zswap.h
> > +++ b/include/linux/zswap.h
> > @@ -31,6 +31,88 @@ struct zswap_lruvec_state {
> >  	atomic_long_t nr_disk_swapins;
> >  };
> >
> > +/*
> > + * struct zswap_store_sub_batch_page:
> > + *
> > + * This represents one "zswap batching element", namely, the
> > + * attributes associated with a page in a large folio that will
> > + * be compressed and stored in zswap. The term "batch" is reserved
> > + * for a conceptual "batch" of folios that can be sent to
> > + * zswap_store() by reclaim. The term "sub-batch" is used to describe
> > + * a collection of "zswap batching elements", i.e., an array of
> > + * "struct zswap_store_sub_batch_page *".
> > + *
> > + * The zswap compress sub-batch size is specified by
> > + * SWAP_CRYPTO_BATCH_SIZE, currently set as 8UL if the
> > + * platform has Intel IAA. This means zswap can store a large folio
> > + * by creating sub-batches of up to 8 pages and compressing this
> > + * batch using IAA to parallelize the 8 compress jobs in hardware.
> > + * For e.g., a 64KB folio can be compressed as 2 sub-batches of
> > + * 8 pages each. This can significantly improve the zswap_store()
> > + * performance for large folios.
> > + *
> > + * Although the page itself is represented directly, the structure
> > + * adds a "u8 batch_idx" to represent an index for the folio in a
> > + * conceptual "batch of folios" that can be passed to zswap_store().
> > + * Conceptually, this allows for up to 256 folios that can be passed
> > + * to zswap_store(). If this conceptual number of folios sent to
> > + * zswap_store() exceeds 256, the "batch_idx" needs to become u16.
> > + */
> > +struct zswap_store_sub_batch_page {
> > +	u8 batch_idx;
> > +	swp_entry_t swpentry;
> > +	struct obj_cgroup *objcg;
> > +	struct zswap_entry *entry;
> > +	int error; /* folio error status. */
> > +};
> > +
> > +/*
> > + * struct zswap_store_pipeline_state:
> > + *
> > + * This stores state during IAA compress batching of (conceptually, a batch
> of)
> > + * folios. The term pipelining in this context, refers to breaking down
> > + * the batch of folios being reclaimed into sub-batches of
> > + * SWAP_CRYPTO_BATCH_SIZE pages, batch compressing and storing the
> > + * sub-batch. This concept could be further evolved to use overlap of CPU
> > + * computes with IAA computes. For instance, we could stage the post-
> compress
> > + * computes for sub-batch "N-1" to happen in parallel with IAA batch
> > + * compression of sub-batch "N".
> > + *
> > + * We begin by developing the concept of compress batching. Pipelining
> with
> > + * overlap can be future work.
> > + *
> > + * @errors: The errors status for the batch of reclaim folios passed in from
> > + *          a higher mm layer such as swap_writepage().
> > + * @pool: A valid zswap_pool.
> > + * @acomp_ctx: The per-cpu pointer to the crypto_acomp_ctx for the
> @pool.
> > + * @sub_batch: This is an array that represents the sub-batch of up to
> > + *             SWAP_CRYPTO_BATCH_SIZE pages that are being stored
> > + *             in zswap.
> > + * @comp_dsts: The destination buffers for crypto_acomp_compress() for
> each
> > + *             page being compressed.
> > + * @comp_dlens: The destination buffers' lengths from
> crypto_acomp_compress()
> > + *              obtained after crypto_acomp_poll() returns completion status,
> > + *              for each page being compressed.
> > + * @comp_errors: Compression errors for each page being compressed.
> > + * @nr_comp_pages: Total number of pages in @sub_batch.
> > + *
> > + * Note:
> > + * The max sub-batch size is SWAP_CRYPTO_BATCH_SIZE, currently 8UL.
> > + * Hence, if SWAP_CRYPTO_BATCH_SIZE exceeds 256, some of the
> > + * u8 members (except @comp_dsts) need to become u16.
> > + */
> > +struct zswap_store_pipeline_state {
> > +	int *errors;
> > +	struct zswap_pool *pool;
> > +	struct crypto_acomp_ctx *acomp_ctx;
> > +	struct zswap_store_sub_batch_page *sub_batch;
> > +	struct page **comp_pages;
> > +	u8 **comp_dsts;
> > +	unsigned int *comp_dlens;
> > +	int *comp_errors;
> > +	u8 nr_comp_pages;
> > +};
> 
> Why are these in the public header?

Thanks Johannes, for the detailed code review comments! Yes, these don't
need to belong in the public header. I will move them to zswap.c.

> 
> >  unsigned long zswap_total_pages(void);
> >  bool zswap_store(struct folio *folio);
> >  bool zswap_load(struct folio *folio);
> > @@ -45,6 +127,8 @@ bool zswap_never_enabled(void);
> >  #else
> >
> >  struct zswap_lruvec_state {};
> > +struct zswap_store_sub_batch_page {};
> > +struct zswap_store_pipeline_state {};
> >
> >  static inline bool zswap_store(struct folio *folio)
> >  {
> > diff --git a/mm/zswap.c b/mm/zswap.c
> > index 2af736e38213..538aac3fb552 100644
> > --- a/mm/zswap.c
> > +++ b/mm/zswap.c
> > @@ -255,6 +255,12 @@ static int zswap_create_acomp_ctx(unsigned int
> cpu,
> >  				  char *tfm_name,
> >  				  unsigned int nr_reqs);
> >
> > +static bool __zswap_store_batch_core(
> > +	int node_id,
> > +	struct folio **folios,
> > +	int *errors,
> > +	unsigned int nr_folios);
> > +
> 
> Please reorder the functions to avoid forward decls.

Sure.

> 
> >  /*********************************
> >  * pool functions
> >  **********************************/
> > @@ -1626,6 +1632,12 @@ static ssize_t zswap_store_page(struct page
> *page,
> >  	return -EINVAL;
> >  }
> >
> > +/*
> > + * Modified to use the IAA compress batching framework implemented in
> > + * __zswap_store_batch_core() if sysctl vm.compress-batching is 1.
> > + * The batching code is intended to significantly improve folio store
> > + * performance over the sequential code.
> 
> This isn't helpful, please delete.

Ok.

> 
> >  bool zswap_store(struct folio *folio)
> >  {
> >  	long nr_pages = folio_nr_pages(folio);
> > @@ -1638,6 +1650,38 @@ bool zswap_store(struct folio *folio)
> >  	bool ret = false;
> >  	long index;
> >
> > +	/*
> > +	 * Improve large folio zswap_store() latency with IAA compress
> batching,
> > +	 * if this is enabled by setting sysctl vm.compress-batching to "1".
> > +	 * If enabled, the large folio's pages are compressed in parallel in
> > +	 * batches of SWAP_CRYPTO_BATCH_SIZE pages. If disabled, every
> page in
> > +	 * the large folio is compressed sequentially.
> > +	 */
> 
> Same here. Reduce to "Try to batch compress large folios, fall back to
> processing individual subpages if that fails."

Ok.

> 
> > +	if (folio_test_large(folio) && READ_ONCE(compress_batching)) {
> > +		pool = zswap_pool_current_get();
> 
> There is an existing zswap_pool_current_get() in zswap_store(), please
> reorder the sequence so you don't need to add an extra one.

Ok, will do this. I was trying to make the code less messy, but will find
a cleaner way.

> 
> > +		if (!pool) {
> > +			pr_err("Cannot setup acomp_batch_ctx for compress
> batching: no current pool found\n");
> 
> This is unnecessary.

Ok.

> 
> > +			goto sequential_store;
> > +		}
> > +
> > +		if (zswap_pool_can_batch(pool)) {
> 
> This function is introduced in another patch, where it isn't
> used. Please add functions and callers in the same patch.

Ok. Unintended side effects of trying to break down the changes
into smaller commits. Will address this in v4.

> 
> > +			int error = -1;
> > +			bool store_batch = __zswap_store_batch_core(
> > +						folio_nid(folio),
> > +						&folio, &error, 1);
> > +
> > +			if (store_batch) {
> > +				zswap_pool_put(pool);
> > +				if (!error)
> > +					ret = true;
> > +				return ret;
> > +			}
> > +		}
> 
> Please don't future proof code like this, only implement what is
> strictly necessary for the functionality in this patch. You're only
> adding a single caller with nr_folios=1, so it shouldn't be a
> parameter, and the function shouldn't have a that batch_idx loop.

Ok.

> 
> > +		zswap_pool_put(pool);
> > +	}
> > +
> > +sequential_store:
> > +
> >  	VM_WARN_ON_ONCE(!folio_test_locked(folio));
> >  	VM_WARN_ON_ONCE(!folio_test_swapcache(folio));
> >
> > @@ -1724,6 +1768,587 @@ bool zswap_store(struct folio *folio)
> >  	return ret;
> >  }
> >
> > +/*
> > + * Note: If SWAP_CRYPTO_BATCH_SIZE exceeds 256, change the
> > + * u8 stack variables in the next several functions, to u16.
> > + */
> > +
> > +/*
> > + * Propagate the "sbp" error condition to other batch elements belonging
> to
> > + * the same folio as "sbp".
> > + */
> > +static __always_inline void zswap_store_propagate_errors(
> > +	struct zswap_store_pipeline_state *zst,
> > +	u8 error_batch_idx)
> > +{
> 
> Please observe surrounding coding style on how to wrap >80 col
> function signatures.

I see. Ok.

> 
> Don't use __always_inline unless there is a clear, spelled out
> performance reason. Since it's an error path, that's doubtful.

The motivation was incompressible pages, but sure, will address in v4.

> 
> Please use a consistent namespace for all this:
> 
> CONFIG_ZSWAP_BATCH
> zswap_batch_store()
> zswap_batch_alloc_entries()
> zswap_batch_add_folios()
> zswap_batch_compress()
> 
> etc.
> 
> Again, order to avoid forward decls.
> 
> Try to keep the overall sequence of events between zswap_store() and
> zswap_batch_store() similar as much as possible for readability.

Definitely.

Thanks,
Kanchana
Sridhar, Kanchana P Nov. 7, 2024, 10:50 p.m. UTC | #4
> -----Original Message-----
> From: Johannes Weiner <hannes@cmpxchg.org>
> Sent: Thursday, November 7, 2024 10:54 AM
> To: Sridhar, Kanchana P <kanchana.p.sridhar@intel.com>
> Cc: linux-kernel@vger.kernel.org; linux-mm@kvack.org;
> yosryahmed@google.com; nphamcs@gmail.com;
> chengming.zhou@linux.dev; usamaarif642@gmail.com;
> ryan.roberts@arm.com; Huang, Ying <ying.huang@intel.com>;
> 21cnbao@gmail.com; akpm@linux-foundation.org; linux-
> crypto@vger.kernel.org; herbert@gondor.apana.org.au;
> davem@davemloft.net; clabbe@baylibre.com; ardb@kernel.org;
> ebiggers@google.com; surenb@google.com; Accardi, Kristen C
> <kristen.c.accardi@intel.com>; zanussi@kernel.org; Feghali, Wajdi K
> <wajdi.k.feghali@intel.com>; Gopal, Vinodh <vinodh.gopal@intel.com>
> Subject: Re: [PATCH v3 13/13] mm: zswap: Compress batching with Intel IAA
> in zswap_store() of large folios.
> 
> On Wed, Nov 06, 2024 at 11:21:05AM -0800, Kanchana P Sridhar wrote:
> > +static void zswap_zpool_store_sub_batch(
> > +	struct zswap_store_pipeline_state *zst)
> 
> There is a zswap_store_sub_batch() below, which does something
> completely different. Naming is hard, but please invest a bit more
> time into this to make this readable.

Thanks Johannes, for the comments. Yes, I agree the naming could
be better.

> 
> > +{
> > +	u8 i;
> > +
> > +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> > +		struct zswap_store_sub_batch_page *sbp = &zst-
> >sub_batch[i];
> > +		struct zpool *zpool;
> > +		unsigned long handle;
> > +		char *buf;
> > +		gfp_t gfp;
> > +		int err;
> > +
> > +		/* Skip pages that had compress errors. */
> > +		if (sbp->error)
> > +			continue;
> > +
> > +		zpool = zst->pool->zpool;
> > +		gfp = __GFP_NORETRY | __GFP_NOWARN |
> __GFP_KSWAPD_RECLAIM;
> > +		if (zpool_malloc_support_movable(zpool))
> > +			gfp |= __GFP_HIGHMEM | __GFP_MOVABLE;
> > +		err = zpool_malloc(zpool, zst->comp_dlens[i], gfp, &handle);
> > +
> > +		if (err) {
> > +			if (err == -ENOSPC)
> > +				zswap_reject_compress_poor++;
> > +			else
> > +				zswap_reject_alloc_fail++;
> > +
> > +			/*
> > +			 * An error should be propagated to other pages of
> the
> > +			 * same folio in the sub-batch, and zpool resources for
> > +			 * those pages (in sub-batch order prior to this zpool
> > +			 * error) should be de-allocated.
> > +			 */
> > +			zswap_store_propagate_errors(zst, sbp->batch_idx);
> > +			continue;
> > +		}
> > +
> > +		buf = zpool_map_handle(zpool, handle, ZPOOL_MM_WO);
> > +		memcpy(buf, zst->comp_dsts[i], zst->comp_dlens[i]);
> > +		zpool_unmap_handle(zpool, handle);
> > +
> > +		sbp->entry->handle = handle;
> > +		sbp->entry->length = zst->comp_dlens[i];
> > +	}
> > +}
> > +
> > +/*
> > + * Returns true if the entry was successfully
> > + * stored in the xarray, and false otherwise.
> > + */
> > +static bool zswap_store_entry(swp_entry_t page_swpentry,
> > +			      struct zswap_entry *entry)
> > +{
> > +	struct zswap_entry *old =
> xa_store(swap_zswap_tree(page_swpentry),
> > +					   swp_offset(page_swpentry),
> > +					   entry, GFP_KERNEL);
> > +	if (xa_is_err(old)) {
> > +		int err = xa_err(old);
> > +
> > +		WARN_ONCE(err != -ENOMEM, "unexpected xarray error:
> %d\n", err);
> > +		zswap_reject_alloc_fail++;
> > +		return false;
> > +	}
> > +
> > +	/*
> > +	 * We may have had an existing entry that became stale when
> > +	 * the folio was redirtied and now the new version is being
> > +	 * swapped out. Get rid of the old.
> > +	 */
> > +	if (old)
> > +		zswap_entry_free(old);
> > +
> > +	return true;
> > +}
> > +
> > +static void zswap_batch_compress_post_proc(
> > +	struct zswap_store_pipeline_state *zst)
> > +{
> > +	int nr_objcg_pages = 0, nr_pages = 0;
> > +	struct obj_cgroup *objcg = NULL;
> > +	size_t compressed_bytes = 0;
> > +	u8 i;
> > +
> > +	zswap_zpool_store_sub_batch(zst);
> > +
> > +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> > +		struct zswap_store_sub_batch_page *sbp = &zst-
> >sub_batch[i];
> > +
> > +		if (sbp->error)
> > +			continue;
> > +
> > +		if (!zswap_store_entry(sbp->swpentry, sbp->entry)) {
> > +			zswap_store_propagate_errors(zst, sbp->batch_idx);
> > +			continue;
> > +		}
> > +
> > +		/*
> > +		 * The entry is successfully compressed and stored in the tree,
> > +		 * there is no further possibility of failure. Grab refs to the
> > +		 * pool and objcg. These refs will be dropped by
> > +		 * zswap_entry_free() when the entry is removed from the
> tree.
> > +		 */
> > +		zswap_pool_get(zst->pool);
> > +		if (sbp->objcg)
> > +			obj_cgroup_get(sbp->objcg);
> > +
> > +		/*
> > +		 * We finish initializing the entry while it's already in xarray.
> > +		 * This is safe because:
> > +		 *
> > +		 * 1. Concurrent stores and invalidations are excluded by folio
> > +		 *    lock.
> > +		 *
> > +		 * 2. Writeback is excluded by the entry not being on the LRU
> yet.
> > +		 *    The publishing order matters to prevent writeback from
> seeing
> > +		 *    an incoherent entry.
> > +		 */
> > +		sbp->entry->pool = zst->pool;
> > +		sbp->entry->swpentry = sbp->swpentry;
> > +		sbp->entry->objcg = sbp->objcg;
> > +		sbp->entry->referenced = true;
> > +		if (sbp->entry->length) {
> > +			INIT_LIST_HEAD(&sbp->entry->lru);
> > +			zswap_lru_add(&zswap_list_lru, sbp->entry);
> > +		}
> > +
> > +		if (!objcg && sbp->objcg) {
> > +			objcg = sbp->objcg;
> > +		} else if (objcg && sbp->objcg && (objcg != sbp->objcg)) {
> > +			obj_cgroup_charge_zswap(objcg,
> compressed_bytes);
> > +			count_objcg_events(objcg, ZSWPOUT,
> nr_objcg_pages);
> > +			compressed_bytes = 0;
> > +			nr_objcg_pages = 0;
> > +			objcg = sbp->objcg;
> > +		}
> > +
> > +		if (sbp->objcg) {
> > +			compressed_bytes += sbp->entry->length;
> > +			++nr_objcg_pages;
> > +		}
> > +
> > +		++nr_pages;
> > +	} /* for sub-batch pages. */
> > +
> > +	if (objcg) {
> > +		obj_cgroup_charge_zswap(objcg, compressed_bytes);
> > +		count_objcg_events(objcg, ZSWPOUT, nr_objcg_pages);
> > +	}
> > +
> > +	atomic_long_add(nr_pages, &zswap_stored_pages);
> > +	count_vm_events(ZSWPOUT, nr_pages);
> > +}
> > +
> > +static void zswap_store_sub_batch(struct zswap_store_pipeline_state
> *zst)
> > +{
> > +	u8 i;
> > +
> > +	for (i = 0; i < zst->nr_comp_pages; ++i) {
> > +		zst->comp_dsts[i] = zst->acomp_ctx->buffers[i];
> > +		zst->comp_dlens[i] = PAGE_SIZE;
> > +	} /* for sub-batch pages. */
> > +
> > +	/*
> > +	 * Batch compress sub-batch "N". If IAA is the compressor, the
> > +	 * hardware will compress multiple pages in parallel.
> > +	 */
> > +	zswap_compress_batch(zst);
> > +
> > +	zswap_batch_compress_post_proc(zst);
> 
> The control flow here is a mess. Keep loops over the same batch at the
> same function level. IOW, pull the nr_comp_pages loop out of
> zswap_batch_compress_post_proc() and call the function from the loop.

I see. Got it.

> 
> Also give it a more descriptive name. If that's hard to do, then
> you're probably doing too many different things in it. Create
> functions for a specific purpose, don't carve up sequences at
> arbitrary points.
> 
> My impression after trying to read this is that the existing
> zswap_store() sequence could be a subset of the batched store, where
> you can reuse most code to get the pool, charge the cgroup, allocate
> entries, store entries, bump the stats etc. for both cases. Alas, your
> naming choices make it a bit difficult to be sure.

Apologies for the naming choices. I will fix this. As I was trying to explain
in the commit log, my goal was to minimize failure points, since each failure
point requires unwinding state, which adds latency. Towards this goal, I tried
to alloc all entries upfront, and fail early to prevent unwinding state.
Especially since the upfront work being done for the batch, is needed in
any case (e.g. zswap_alloc_entries()).

This is where the trade-offs between treating the existing zswap_store()
sequence as a subset of the batched store are not very clear. I tried to
optimize the batched store for batching, while following the logical
structure of zswap_store().

> 
> Please explore this direction. Don't worry about the CONFIG symbol for
> now, we can still look at this later.

Definitely, I will think some more about this.

> 
> Right now, it's basically
> 
> 	if (special case)
> 		lots of duplicative code in slightly different order
> 	regular store sequence
> 
> and that isn't going to be maintainable.
> 
> Look for a high-level sequence that makes sense for both cases. E.g.:
> 
> 	if (!zswap_enabled)
> 		goto check_old;
> 
> 	get objcg
> 
> 	check limits
> 
> 	allocate memcg list lru
> 
> 	for each batch {
> 		for each entry {
> 			allocate entry
> 			acquire objcg ref
> 			acquire pool ref
> 		}
> 		compress
> 		for each entry {
> 			store in tree
> 			add to lru
> 			bump stats and counters
> 		}
> 	}
> 
> 	put objcg
> 
> 	return true;
> 
> check_error:
> 	...
> 
> and then set up the two loops such that they also makes sense when the
> folio is just a single page.

Thanks for this suggestion! I will explore this kind of structure. I hope
I have provided some explanations as to why I pursued the existing
batching structure. One other thing I wanted to add was the
"future proofing" you alluded to earlier (which I will fix). Many of
my design choices were motivated by minimizing latency gaps
(e.g. from state unwinding in case of errors) in the batch compress
pipeline when a reclaim batch of any-order folios is potentially
sent to zswap.

Thanks again,
Kanchana
diff mbox series

Patch

diff --git a/include/linux/zswap.h b/include/linux/zswap.h
index 9ad27ab3d222..6d3ef4780c69 100644
--- a/include/linux/zswap.h
+++ b/include/linux/zswap.h
@@ -31,6 +31,88 @@  struct zswap_lruvec_state {
 	atomic_long_t nr_disk_swapins;
 };
 
+/*
+ * struct zswap_store_sub_batch_page:
+ *
+ * This represents one "zswap batching element", namely, the
+ * attributes associated with a page in a large folio that will
+ * be compressed and stored in zswap. The term "batch" is reserved
+ * for a conceptual "batch" of folios that can be sent to
+ * zswap_store() by reclaim. The term "sub-batch" is used to describe
+ * a collection of "zswap batching elements", i.e., an array of
+ * "struct zswap_store_sub_batch_page *".
+ *
+ * The zswap compress sub-batch size is specified by
+ * SWAP_CRYPTO_BATCH_SIZE, currently set as 8UL if the
+ * platform has Intel IAA. This means zswap can store a large folio
+ * by creating sub-batches of up to 8 pages and compressing this
+ * batch using IAA to parallelize the 8 compress jobs in hardware.
+ * For e.g., a 64KB folio can be compressed as 2 sub-batches of
+ * 8 pages each. This can significantly improve the zswap_store()
+ * performance for large folios.
+ *
+ * Although the page itself is represented directly, the structure
+ * adds a "u8 batch_idx" to represent an index for the folio in a
+ * conceptual "batch of folios" that can be passed to zswap_store().
+ * Conceptually, this allows for up to 256 folios that can be passed
+ * to zswap_store(). If this conceptual number of folios sent to
+ * zswap_store() exceeds 256, the "batch_idx" needs to become u16.
+ */
+struct zswap_store_sub_batch_page {
+	u8 batch_idx;
+	swp_entry_t swpentry;
+	struct obj_cgroup *objcg;
+	struct zswap_entry *entry;
+	int error; /* folio error status. */
+};
+
+/*
+ * struct zswap_store_pipeline_state:
+ *
+ * This stores state during IAA compress batching of (conceptually, a batch of)
+ * folios. The term pipelining in this context, refers to breaking down
+ * the batch of folios being reclaimed into sub-batches of
+ * SWAP_CRYPTO_BATCH_SIZE pages, batch compressing and storing the
+ * sub-batch. This concept could be further evolved to use overlap of CPU
+ * computes with IAA computes. For instance, we could stage the post-compress
+ * computes for sub-batch "N-1" to happen in parallel with IAA batch
+ * compression of sub-batch "N".
+ *
+ * We begin by developing the concept of compress batching. Pipelining with
+ * overlap can be future work.
+ *
+ * @errors: The errors status for the batch of reclaim folios passed in from
+ *          a higher mm layer such as swap_writepage().
+ * @pool: A valid zswap_pool.
+ * @acomp_ctx: The per-cpu pointer to the crypto_acomp_ctx for the @pool.
+ * @sub_batch: This is an array that represents the sub-batch of up to
+ *             SWAP_CRYPTO_BATCH_SIZE pages that are being stored
+ *             in zswap.
+ * @comp_dsts: The destination buffers for crypto_acomp_compress() for each
+ *             page being compressed.
+ * @comp_dlens: The destination buffers' lengths from crypto_acomp_compress()
+ *              obtained after crypto_acomp_poll() returns completion status,
+ *              for each page being compressed.
+ * @comp_errors: Compression errors for each page being compressed.
+ * @nr_comp_pages: Total number of pages in @sub_batch.
+ *
+ * Note:
+ * The max sub-batch size is SWAP_CRYPTO_BATCH_SIZE, currently 8UL.
+ * Hence, if SWAP_CRYPTO_BATCH_SIZE exceeds 256, some of the
+ * u8 members (except @comp_dsts) need to become u16.
+ */
+struct zswap_store_pipeline_state {
+	int *errors;
+	struct zswap_pool *pool;
+	struct crypto_acomp_ctx *acomp_ctx;
+	struct zswap_store_sub_batch_page *sub_batch;
+	struct page **comp_pages;
+	u8 **comp_dsts;
+	unsigned int *comp_dlens;
+	int *comp_errors;
+	u8 nr_comp_pages;
+};
+
 unsigned long zswap_total_pages(void);
 bool zswap_store(struct folio *folio);
 bool zswap_load(struct folio *folio);
@@ -45,6 +127,8 @@  bool zswap_never_enabled(void);
 #else
 
 struct zswap_lruvec_state {};
+struct zswap_store_sub_batch_page {};
+struct zswap_store_pipeline_state {};
 
 static inline bool zswap_store(struct folio *folio)
 {
diff --git a/mm/zswap.c b/mm/zswap.c
index 2af736e38213..538aac3fb552 100644
--- a/mm/zswap.c
+++ b/mm/zswap.c
@@ -255,6 +255,12 @@  static int zswap_create_acomp_ctx(unsigned int cpu,
 				  char *tfm_name,
 				  unsigned int nr_reqs);
 
+static bool __zswap_store_batch_core(
+	int node_id,
+	struct folio **folios,
+	int *errors,
+	unsigned int nr_folios);
+
 /*********************************
 * pool functions
 **********************************/
@@ -1626,6 +1632,12 @@  static ssize_t zswap_store_page(struct page *page,
 	return -EINVAL;
 }
 
+/*
+ * Modified to use the IAA compress batching framework implemented in
+ * __zswap_store_batch_core() if sysctl vm.compress-batching is 1.
+ * The batching code is intended to significantly improve folio store
+ * performance over the sequential code.
+ */
 bool zswap_store(struct folio *folio)
 {
 	long nr_pages = folio_nr_pages(folio);
@@ -1638,6 +1650,38 @@  bool zswap_store(struct folio *folio)
 	bool ret = false;
 	long index;
 
+	/*
+	 * Improve large folio zswap_store() latency with IAA compress batching,
+	 * if this is enabled by setting sysctl vm.compress-batching to "1".
+	 * If enabled, the large folio's pages are compressed in parallel in
+	 * batches of SWAP_CRYPTO_BATCH_SIZE pages. If disabled, every page in
+	 * the large folio is compressed sequentially.
+	 */
+	if (folio_test_large(folio) && READ_ONCE(compress_batching)) {
+		pool = zswap_pool_current_get();
+		if (!pool) {
+			pr_err("Cannot setup acomp_batch_ctx for compress batching: no current pool found\n");
+			goto sequential_store;
+		}
+
+		if (zswap_pool_can_batch(pool)) {
+			int error = -1;
+			bool store_batch = __zswap_store_batch_core(
+						folio_nid(folio),
+						&folio, &error, 1);
+
+			if (store_batch) {
+				zswap_pool_put(pool);
+				if (!error)
+					ret = true;
+				return ret;
+			}
+		}
+		zswap_pool_put(pool);
+	}
+
+sequential_store:
+
 	VM_WARN_ON_ONCE(!folio_test_locked(folio));
 	VM_WARN_ON_ONCE(!folio_test_swapcache(folio));
 
@@ -1724,6 +1768,587 @@  bool zswap_store(struct folio *folio)
 	return ret;
 }
 
+/*
+ * Note: If SWAP_CRYPTO_BATCH_SIZE exceeds 256, change the
+ * u8 stack variables in the next several functions, to u16.
+ */
+
+/*
+ * Propagate the "sbp" error condition to other batch elements belonging to
+ * the same folio as "sbp".
+ */
+static __always_inline void zswap_store_propagate_errors(
+	struct zswap_store_pipeline_state *zst,
+	u8 error_batch_idx)
+{
+	u8 i;
+
+	if (zst->errors[error_batch_idx])
+		return;
+
+	for (i = 0; i < zst->nr_comp_pages; ++i) {
+		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
+
+		if (sbp->batch_idx == error_batch_idx) {
+			if (!sbp->error) {
+				if (sbp->entry) {
+					if (!IS_ERR_VALUE(sbp->entry->handle))
+						zpool_free(zst->pool->zpool, sbp->entry->handle);
+
+					zswap_entry_cache_free(sbp->entry);
+					sbp->entry = NULL;
+				}
+				sbp->error = -EINVAL;
+			}
+		}
+	}
+
+	/*
+	 * Set zswap status for the folio to "error"
+	 * for use in swap_writepage.
+	 */
+	zst->errors[error_batch_idx] = -EINVAL;
+}
+
+static __always_inline void zswap_process_comp_errors(
+	struct zswap_store_pipeline_state *zst)
+{
+	u8 i;
+
+	for (i = 0; i < zst->nr_comp_pages; ++i) {
+		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
+
+		if (zst->comp_errors[i]) {
+			if (zst->comp_errors[i] == -ENOSPC)
+				zswap_reject_compress_poor++;
+			else
+				zswap_reject_compress_fail++;
+
+			if (!sbp->error)
+				zswap_store_propagate_errors(zst,
+							     sbp->batch_idx);
+		}
+	}
+}
+
+static void zswap_compress_batch(struct zswap_store_pipeline_state *zst)
+{
+	/*
+	 * Compress up to SWAP_CRYPTO_BATCH_SIZE pages.
+	 * It is important to note that the zswap pool's per-cpu "acomp_batch_ctx"
+	 * resources are allocated only if the crypto_acomp has registered both,
+	 * crypto_acomp_batch_compress() and crypto_acomp_batch_decompress() API.
+	 * The iaa_crypto driver registers implementations for both these API.
+	 * Hence, if IAA is the zswap compressor and sysctl vm.compress-batching
+	 * is set to "1", the call to crypto_acomp_batch_compress() will
+	 * compresses the pages in parallel, leading to significant performance
+	 * improvements as compared to software compressors.
+	 */
+	crypto_acomp_batch_compress(
+		zst->acomp_ctx->reqs,
+		&zst->acomp_ctx->wait,
+		zst->comp_pages,
+		zst->comp_dsts,
+		zst->comp_dlens,
+		zst->comp_errors,
+		zst->nr_comp_pages);
+
+	/*
+	 * Scan the sub-batch for any compression errors,
+	 * and invalidate pages with errors, along with other
+	 * pages belonging to the same folio as the error pages.
+	 */
+	zswap_process_comp_errors(zst);
+}
+
+static void zswap_zpool_store_sub_batch(
+	struct zswap_store_pipeline_state *zst)
+{
+	u8 i;
+
+	for (i = 0; i < zst->nr_comp_pages; ++i) {
+		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
+		struct zpool *zpool;
+		unsigned long handle;
+		char *buf;
+		gfp_t gfp;
+		int err;
+
+		/* Skip pages that had compress errors. */
+		if (sbp->error)
+			continue;
+
+		zpool = zst->pool->zpool;
+		gfp = __GFP_NORETRY | __GFP_NOWARN | __GFP_KSWAPD_RECLAIM;
+		if (zpool_malloc_support_movable(zpool))
+			gfp |= __GFP_HIGHMEM | __GFP_MOVABLE;
+		err = zpool_malloc(zpool, zst->comp_dlens[i], gfp, &handle);
+
+		if (err) {
+			if (err == -ENOSPC)
+				zswap_reject_compress_poor++;
+			else
+				zswap_reject_alloc_fail++;
+
+			/*
+			 * An error should be propagated to other pages of the
+			 * same folio in the sub-batch, and zpool resources for
+			 * those pages (in sub-batch order prior to this zpool
+			 * error) should be de-allocated.
+			 */
+			zswap_store_propagate_errors(zst, sbp->batch_idx);
+			continue;
+		}
+
+		buf = zpool_map_handle(zpool, handle, ZPOOL_MM_WO);
+		memcpy(buf, zst->comp_dsts[i], zst->comp_dlens[i]);
+		zpool_unmap_handle(zpool, handle);
+
+		sbp->entry->handle = handle;
+		sbp->entry->length = zst->comp_dlens[i];
+	}
+}
+
+/*
+ * Returns true if the entry was successfully
+ * stored in the xarray, and false otherwise.
+ */
+static bool zswap_store_entry(swp_entry_t page_swpentry,
+			      struct zswap_entry *entry)
+{
+	struct zswap_entry *old = xa_store(swap_zswap_tree(page_swpentry),
+					   swp_offset(page_swpentry),
+					   entry, GFP_KERNEL);
+	if (xa_is_err(old)) {
+		int err = xa_err(old);
+
+		WARN_ONCE(err != -ENOMEM, "unexpected xarray error: %d\n", err);
+		zswap_reject_alloc_fail++;
+		return false;
+	}
+
+	/*
+	 * We may have had an existing entry that became stale when
+	 * the folio was redirtied and now the new version is being
+	 * swapped out. Get rid of the old.
+	 */
+	if (old)
+		zswap_entry_free(old);
+
+	return true;
+}
+
+static void zswap_batch_compress_post_proc(
+	struct zswap_store_pipeline_state *zst)
+{
+	int nr_objcg_pages = 0, nr_pages = 0;
+	struct obj_cgroup *objcg = NULL;
+	size_t compressed_bytes = 0;
+	u8 i;
+
+	zswap_zpool_store_sub_batch(zst);
+
+	for (i = 0; i < zst->nr_comp_pages; ++i) {
+		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
+
+		if (sbp->error)
+			continue;
+
+		if (!zswap_store_entry(sbp->swpentry, sbp->entry)) {
+			zswap_store_propagate_errors(zst, sbp->batch_idx);
+			continue;
+		}
+
+		/*
+		 * The entry is successfully compressed and stored in the tree,
+		 * there is no further possibility of failure. Grab refs to the
+		 * pool and objcg. These refs will be dropped by
+		 * zswap_entry_free() when the entry is removed from the tree.
+		 */
+		zswap_pool_get(zst->pool);
+		if (sbp->objcg)
+			obj_cgroup_get(sbp->objcg);
+
+		/*
+		 * We finish initializing the entry while it's already in xarray.
+		 * This is safe because:
+		 *
+		 * 1. Concurrent stores and invalidations are excluded by folio
+		 *    lock.
+		 *
+		 * 2. Writeback is excluded by the entry not being on the LRU yet.
+		 *    The publishing order matters to prevent writeback from seeing
+		 *    an incoherent entry.
+		 */
+		sbp->entry->pool = zst->pool;
+		sbp->entry->swpentry = sbp->swpentry;
+		sbp->entry->objcg = sbp->objcg;
+		sbp->entry->referenced = true;
+		if (sbp->entry->length) {
+			INIT_LIST_HEAD(&sbp->entry->lru);
+			zswap_lru_add(&zswap_list_lru, sbp->entry);
+		}
+
+		if (!objcg && sbp->objcg) {
+			objcg = sbp->objcg;
+		} else if (objcg && sbp->objcg && (objcg != sbp->objcg)) {
+			obj_cgroup_charge_zswap(objcg, compressed_bytes);
+			count_objcg_events(objcg, ZSWPOUT, nr_objcg_pages);
+			compressed_bytes = 0;
+			nr_objcg_pages = 0;
+			objcg = sbp->objcg;
+		}
+
+		if (sbp->objcg) {
+			compressed_bytes += sbp->entry->length;
+			++nr_objcg_pages;
+		}
+
+		++nr_pages;
+	} /* for sub-batch pages. */
+
+	if (objcg) {
+		obj_cgroup_charge_zswap(objcg, compressed_bytes);
+		count_objcg_events(objcg, ZSWPOUT, nr_objcg_pages);
+	}
+
+	atomic_long_add(nr_pages, &zswap_stored_pages);
+	count_vm_events(ZSWPOUT, nr_pages);
+}
+
+static void zswap_store_sub_batch(struct zswap_store_pipeline_state *zst)
+{
+	u8 i;
+
+	for (i = 0; i < zst->nr_comp_pages; ++i) {
+		zst->comp_dsts[i] = zst->acomp_ctx->buffers[i];
+		zst->comp_dlens[i] = PAGE_SIZE;
+	} /* for sub-batch pages. */
+
+	/*
+	 * Batch compress sub-batch "N". If IAA is the compressor, the
+	 * hardware will compress multiple pages in parallel.
+	 */
+	zswap_compress_batch(zst);
+
+	zswap_batch_compress_post_proc(zst);
+}
+
+static void zswap_add_folio_pages_to_sb(
+	struct zswap_store_pipeline_state *zst,
+	struct folio* folio,
+	u8 batch_idx,
+	struct obj_cgroup *objcg,
+	struct zswap_entry *entries[],
+	long start_idx,
+	u8 add_nr_pages)
+{
+	long index;
+
+	for (index = start_idx; index < (start_idx + add_nr_pages); ++index) {
+		u8 i = zst->nr_comp_pages;
+		struct zswap_store_sub_batch_page *sbp = &zst->sub_batch[i];
+		struct page *page = folio_page(folio, index);
+		zst->comp_pages[i] = page;
+		sbp->swpentry = page_swap_entry(page);
+		sbp->batch_idx = batch_idx;
+		sbp->objcg = objcg;
+		sbp->entry = entries[index - start_idx];
+		sbp->error = 0;
+		++zst->nr_comp_pages;
+	}
+}
+
+static __always_inline void zswap_store_reset_sub_batch(
+	struct zswap_store_pipeline_state *zst)
+{
+	zst->nr_comp_pages = 0;
+}
+
+/* Allocate entries for the next sub-batch. */
+static int zswap_alloc_entries(u8 nr_entries,
+			       struct zswap_entry *entries[],
+			       int node_id)
+{
+	u8 i;
+
+	for (i = 0; i < nr_entries; ++i) {
+		entries[i] = zswap_entry_cache_alloc(GFP_KERNEL, node_id);
+		if (!entries[i]) {
+			u8 j;
+
+			zswap_reject_kmemcache_fail++;
+			for (j = 0; j < i; ++j)
+				zswap_entry_cache_free(entries[j]);
+			return -EINVAL;
+		}
+
+		entries[i]->handle = (unsigned long)ERR_PTR(-EINVAL);
+	}
+
+	return 0;
+}
+
+/*
+ * If the zswap store fails or zswap is disabled, we must invalidate
+ * the possibly stale entries which were previously stored at the
+ * offsets corresponding to each page of the folio. Otherwise,
+ * writeback could overwrite the new data in the swapfile.
+ */
+static void zswap_delete_stored_entries(struct folio *folio)
+{
+	swp_entry_t swp = folio->swap;
+	unsigned type = swp_type(swp);
+	pgoff_t offset = swp_offset(swp);
+	struct zswap_entry *entry;
+	struct xarray *tree;
+	long index;
+
+	for (index = 0; index < folio_nr_pages(folio); ++index) {
+		tree = swap_zswap_tree(swp_entry(type, offset + index));
+		entry = xa_erase(tree, offset + index);
+		if (entry)
+			zswap_entry_free(entry);
+	}
+}
+
+static void zswap_store_process_folio_errors(
+	struct folio **folios,
+	int *errors,
+	unsigned int nr_folios)
+{
+	u8 batch_idx;
+
+	for (batch_idx = 0; batch_idx < nr_folios; ++batch_idx)
+		if (errors[batch_idx])
+			zswap_delete_stored_entries(folios[batch_idx]);
+}
+
+/*
+ * Store a (batch of) any-order large folio(s) in zswap. Each folio will be
+ * broken into sub-batches of SWAP_CRYPTO_BATCH_SIZE pages, the
+ * sub-batch will be compressed by IAA in parallel, and stored in zpool/xarray.
+ *
+ * This the main procedure for batching of folios, and batching within
+ * large folios.
+ *
+ * This procedure should only be called if zswap supports batching of stores.
+ * Otherwise, the sequential implementation for storing folios as in the
+ * current zswap_store() should be used.
+ *
+ * The signature of this procedure is meant to allow the calling function,
+ * (for instance, swap_writepage()) to pass an array @folios
+ * (the "reclaim batch") of @nr_folios folios to be stored in zswap.
+ * All folios in the batch must have the same swap type and folio_nid @node_id
+ * (simplifying assumptions only to manage code complexity).
+ *
+ * @errors and @folios have @nr_folios number of entries, with one-one
+ * correspondence (@errors[i] represents the error status of @folios[i],
+ * for i in @nr_folios).
+ * The calling function (for instance, swap_writepage()) should initialize
+ * @errors[i] to a non-0 value.
+ * If zswap successfully stores @folios[i], it will set @errors[i] to 0.
+ * If there is an error in zswap, it will set @errors[i] to -EINVAL.
+ */
+static bool __zswap_store_batch_core(
+	int node_id,
+	struct folio **folios,
+	int *errors,
+	unsigned int nr_folios)
+{
+	struct zswap_store_sub_batch_page sub_batch[SWAP_CRYPTO_BATCH_SIZE];
+	struct page *comp_pages[SWAP_CRYPTO_BATCH_SIZE];
+	u8 *comp_dsts[SWAP_CRYPTO_BATCH_SIZE] = { NULL };
+	unsigned int comp_dlens[SWAP_CRYPTO_BATCH_SIZE];
+	int comp_errors[SWAP_CRYPTO_BATCH_SIZE];
+	struct crypto_acomp_ctx *acomp_ctx, *acomp_batch_ctx;
+	struct zswap_pool *pool;
+	/*
+	 * For now, lets say a max of 256 large folios can be reclaimed
+	 * at a time, as a batch. If this exceeds 256, change this to u16.
+	 */
+	u8 batch_idx;
+
+	/* Initialize the compress batching pipeline state. */
+	struct zswap_store_pipeline_state zst = {
+		.errors = errors,
+		.pool = NULL,
+		.acomp_ctx = NULL,
+		.sub_batch = sub_batch,
+		.comp_pages = comp_pages,
+		.comp_dsts = comp_dsts,
+		.comp_dlens = comp_dlens,
+		.comp_errors = comp_errors,
+		.nr_comp_pages = 0,
+	};
+
+	pool = zswap_pool_current_get();
+	if (!pool) {
+		if (zswap_check_limits())
+			queue_work(shrink_wq, &zswap_shrink_work);
+		goto check_old;
+	}
+
+	/*
+	 * Caller should make sure that __zswap_store_batch_core() is
+	 * invoked only if sysctl vm.compress-batching is set to "1".
+	 *
+	 * Verify if we are still on the same cpu for which batching
+	 * resources in acomp_batch_ctx were allocated in zswap_store().
+	 * If not, return to zswap_store() for sequential store of the folio.
+	 */
+	acomp_ctx = raw_cpu_ptr(pool->acomp_ctx);
+	mutex_lock(&acomp_ctx->mutex);
+
+	acomp_batch_ctx = raw_cpu_ptr(pool->acomp_batch_ctx);
+	if (!acomp_batch_ctx || !acomp_batch_ctx->nr_reqs) {
+		mutex_unlock(&acomp_ctx->mutex);
+		zswap_pool_put(pool);
+		return false;
+	}
+
+	mutex_lock(&acomp_batch_ctx->mutex);
+	mutex_unlock(&acomp_ctx->mutex);
+
+	zst.pool = pool;
+	zst.acomp_ctx = acomp_batch_ctx;
+
+	/*
+	 * Iterate over the folios passed in. Construct sub-batches of up to
+	 * SWAP_CRYPTO_BATCH_SIZE pages, if necessary, by iterating through
+	 * multiple folios from the input "folios". Process each sub-batch
+	 * with IAA batch compression. Detect errors from batch compression
+	 * and set the impacted folio's error status (this happens in
+	 * zswap_store_process_errors()).
+	 */
+	for (batch_idx = 0; batch_idx < nr_folios; ++batch_idx) {
+		struct folio *folio = folios[batch_idx];
+		BUG_ON(!folio);
+		long folio_start_idx, nr_pages = folio_nr_pages(folio);
+		struct zswap_entry *entries[SWAP_CRYPTO_BATCH_SIZE];
+		struct obj_cgroup *objcg = NULL;
+		struct mem_cgroup *memcg = NULL;
+
+		VM_WARN_ON_ONCE(!folio_test_locked(folio));
+		VM_WARN_ON_ONCE(!folio_test_swapcache(folio));
+
+		/*
+		 * If zswap is disabled, we must invalidate the possibly stale entry
+		 * which was previously stored at this offset. Otherwise, writeback
+		 * could overwrite the new data in the swapfile.
+		 */
+		if (!zswap_enabled)
+			continue;
+
+		/* Check cgroup limits */
+		objcg = get_obj_cgroup_from_folio(folio);
+		if (objcg && !obj_cgroup_may_zswap(objcg)) {
+			memcg = get_mem_cgroup_from_objcg(objcg);
+			if (shrink_memcg(memcg)) {
+				mem_cgroup_put(memcg);
+				goto put_objcg;
+			}
+			mem_cgroup_put(memcg);
+		}
+
+		if (zswap_check_limits())
+			goto put_objcg;
+
+		if (objcg) {
+			memcg = get_mem_cgroup_from_objcg(objcg);
+			if (memcg_list_lru_alloc(memcg, &zswap_list_lru, GFP_KERNEL)) {
+				mem_cgroup_put(memcg);
+				goto put_objcg;
+			}
+			mem_cgroup_put(memcg);
+		}
+
+		/*
+		 * By default, set zswap status to "success" for use in
+		 * swap_writepage() when this returns. In case of errors,
+		 * a negative error number will over-write this when
+		 * zswap_store_process_errors() is called.
+		 */
+		errors[batch_idx] = 0;
+
+		folio_start_idx = 0;
+
+		while (nr_pages > 0) {
+			u8 add_nr_pages;
+
+			/*
+			 * If we have accumulated SWAP_CRYPTO_BATCH_SIZE
+			 * pages, process the sub-batch: it could contain pages
+			 * from multiple folios.
+			 */
+			if (zst.nr_comp_pages == SWAP_CRYPTO_BATCH_SIZE) {
+				zswap_store_sub_batch(&zst);
+				zswap_store_reset_sub_batch(&zst);
+				/*
+				 * Stop processing this folio if it had
+				 * compress errors.
+				 */
+				if (errors[batch_idx])
+					goto put_objcg;
+			}
+
+			add_nr_pages = min3((
+					(long)SWAP_CRYPTO_BATCH_SIZE -
+					(long)zst.nr_comp_pages),
+					nr_pages,
+					(long)SWAP_CRYPTO_BATCH_SIZE);
+
+			/*
+			 * Allocate zswap_entries for this sub-batch. If we
+			 * get errors while doing so, we can flag an error
+			 * for the folio, call the shrinker and move on.
+			 */
+			if (zswap_alloc_entries(add_nr_pages,
+						entries, node_id)) {
+				zswap_store_reset_sub_batch(&zst);
+				errors[batch_idx] = -EINVAL;
+				goto put_objcg;
+			}
+
+			zswap_add_folio_pages_to_sb(
+				&zst,
+				folio,
+				batch_idx,
+				objcg,
+				entries,
+				folio_start_idx,
+				add_nr_pages);
+
+			nr_pages -= add_nr_pages;
+			folio_start_idx += add_nr_pages;
+		} /* this folio has pages to be compressed. */
+
+		obj_cgroup_put(objcg);
+		continue;
+
+put_objcg:
+		obj_cgroup_put(objcg);
+		if (zswap_pool_reached_full)
+			queue_work(shrink_wq, &zswap_shrink_work);
+	} /* for batch folios */
+
+	if (!zswap_enabled)
+		goto check_old;
+
+	/*
+	 * Process last sub-batch: it could contain pages from
+	 * multiple folios.
+	 */
+	if (zst.nr_comp_pages)
+		zswap_store_sub_batch(&zst);
+
+	mutex_unlock(&acomp_batch_ctx->mutex);
+	zswap_pool_put(pool);
+check_old:
+	zswap_store_process_folio_errors(folios, errors, nr_folios);
+	return true;
+}
+
 bool zswap_load(struct folio *folio)
 {
 	swp_entry_t swp = folio->swap;