diff mbox series

[13/24] lustre: sec: use enc pool for bounce pages

Message ID 1662429337-18737-14-git-send-email-jsimmons@infradead.org (mailing list archive)
State New, archived
Headers show
Series lustre: update to OpenSFS tree Sept 5, 2022 | expand

Commit Message

James Simmons Sept. 6, 2022, 1:55 a.m. UTC
From: Sebastien Buisson <sbuisson@ddn.com>

Take pages from the enc pool so that they can be used for
encryption, instead of letting fscrypt allocate a bounce page
for every call to the encryption primitives.
Pages are taken from the enc pool a whole array at a time.

This requires modifying the fscrypt API, so that new functions
fscrypt_encrypt_block() and fsrypt_decrypt_block() are exported.
These functions take a destination page parameter.

Using enc pool for bounce pages is a worthwhile performance win. Here
are performance penalties incurred by encryption, without this patch,
and with this patch:

                     ||=====================|=====================||
                     || Performance penalty | Performance penalty ||
                     ||    without patch    |     with patch      ||
||==========================================|=====================||
|| Bandwidth – write |        30%-35%       |   5%-10% large IOs  ||
||                   |                      |    15% small IOs    ||
||------------------------------------------|---------------------||
|| Bandwidth – read  |         20%          |    less than 10%    ||
||------------------------------------------|---------------------||
||      Metadata     |         N/A          |         5%          ||
|| creat,stat,remove |                      |                     ||
||==========================================|=====================||

WC-bug-id: https://jira.whamcloud.com/browse/LU-15003
Lustre-commit: f3fe144b8572e9e75b ("LU-15003 sec: use enc pool for bounce pages")
Signed-off-by: Sebastien Buisson <sbuisson@ddn.com>
Signed-off-by: James Simmons <jsimmons@infradead.org>
Reviewed-on: https://review.whamcloud.com/47149
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
---
 fs/crypto/crypto.c             |  34 ++--
 fs/lustre/include/lustre_sec.h |   3 +
 fs/lustre/llite/dir.c          |   5 +-
 fs/lustre/osc/osc_request.c    | 134 +++++++++++-
 fs/lustre/ptlrpc/sec_bulk.c    | 452 +++++++++++++++++++++++++++++++++++++++--
 include/linux/fscrypt.h        |  49 ++++-
 6 files changed, 632 insertions(+), 45 deletions(-)
diff mbox series

Patch

diff --git a/fs/crypto/crypto.c b/fs/crypto/crypto.c
index 92123257..de3e040 100644
--- a/fs/crypto/crypto.c
+++ b/fs/crypto/crypto.c
@@ -202,9 +202,10 @@  struct page *fscrypt_encrypt_pagecache_blocks(struct page *page,
 EXPORT_SYMBOL(fscrypt_encrypt_pagecache_blocks);
 
 /**
- * fscrypt_encrypt_block_inplace() - Encrypt a filesystem block in-place
+ * fscrypt_encrypt_block() - Cache an encrypted filesystem block in a page
  * @inode:     The inode to which this block belongs
- * @page:      The page containing the block to encrypt
+ * @src:       The page containing the block to encrypt
+ * @dst:       The page which will contain the encrypted data
  * @len:       Size of block to encrypt.  Doesn't need to be a multiple of the
  *		fs block size, but must be a multiple of FS_CRYPTO_BLOCK_SIZE.
  * @offs:      Byte offset within @page at which the block to encrypt begins
@@ -215,17 +216,18 @@  struct page *fscrypt_encrypt_pagecache_blocks(struct page *page,
  * Encrypt a possibly-compressed filesystem block that is located in an
  * arbitrary page, not necessarily in the original pagecache page.  The @inode
  * and @lblk_num must be specified, as they can't be determined from @page.
+ * The decrypted data will be stored in @dst.
  *
  * Return: 0 on success; -errno on failure
  */
-int fscrypt_encrypt_block_inplace(const struct inode *inode, struct page *page,
-				  unsigned int len, unsigned int offs,
-				  u64 lblk_num, gfp_t gfp_flags)
+int fscrypt_encrypt_block(const struct inode *inode, struct page *src,
+			   struct page *dst, unsigned int len, unsigned int offs,
+			   u64 lblk_num, gfp_t gfp_flags)
 {
-	return fscrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, page, page,
+	return fscrypt_crypt_block(inode, FS_ENCRYPT, lblk_num, src, dst,
 				   len, offs, gfp_flags);
 }
-EXPORT_SYMBOL(fscrypt_encrypt_block_inplace);
+EXPORT_SYMBOL(fscrypt_encrypt_block);
 
 /**
  * fscrypt_decrypt_pagecache_blocks() - Decrypt filesystem blocks in a
@@ -272,9 +274,10 @@  int fscrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len,
 EXPORT_SYMBOL(fscrypt_decrypt_pagecache_blocks);
 
 /**
- * fscrypt_decrypt_block_inplace() - Decrypt a filesystem block in-place
+ * fscrypt_decrypt_block() - Cache a decrypted a filesystem block in a page
  * @inode:     The inode to which this block belongs
- * @page:      The page containing the block to decrypt
+ * @src:       The page containing the block to decrypt
+ * @dst:       The page which will contain the plain data
  * @len:       Size of block to decrypt.  Doesn't need to be a multiple of the
  *		fs block size, but must be a multiple of FS_CRYPTO_BLOCK_SIZE.
  * @offs:      Byte offset within @page at which the block to decrypt begins
@@ -284,17 +287,18 @@  int fscrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len,
  * Decrypt a possibly-compressed filesystem block that is located in an
  * arbitrary page, not necessarily in the original pagecache page.  The @inode
  * and @lblk_num must be specified, as they can't be determined from @page.
+ * The encrypted data will be stored in @dst.
  *
  * Return: 0 on success; -errno on failure
  */
-int fscrypt_decrypt_block_inplace(const struct inode *inode, struct page *page,
-				  unsigned int len, unsigned int offs,
-				  u64 lblk_num)
+int fscrypt_decrypt_block(const struct inode *inode, struct page *src,
+			   struct page *dst, unsigned int len, unsigned int offs,
+			   u64 lblk_num, gfp_t gfp_flags)
 {
-	return fscrypt_crypt_block(inode, FS_DECRYPT, lblk_num, page, page,
-				   len, offs, GFP_NOFS);
+	return fscrypt_crypt_block(inode, FS_DECRYPT, lblk_num, src, dst,
+				   len, offs, gfp_flags);
 }
-EXPORT_SYMBOL(fscrypt_decrypt_block_inplace);
+EXPORT_SYMBOL(fscrypt_decrypt_block);
 
 /**
  * fscrypt_initialize() - allocate major buffers for fs encryption.
diff --git a/fs/lustre/include/lustre_sec.h b/fs/lustre/include/lustre_sec.h
index e8410e1..7c3c12a 100644
--- a/fs/lustre/include/lustre_sec.h
+++ b/fs/lustre/include/lustre_sec.h
@@ -1048,6 +1048,9 @@  int sptlrpc_target_export_check(struct obd_export *exp,
 				struct ptlrpc_request *req);
 
 /* bulk security api */
+int sptlrpc_enc_pool_add_user(void);
+int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count);
+void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count);
 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc);
 int get_free_pages_in_pool(void);
 int pool_is_at_full_capacity(void);
diff --git a/fs/lustre/llite/dir.c b/fs/lustre/llite/dir.c
index aea15f5..bffd34c 100644
--- a/fs/lustre/llite/dir.c
+++ b/fs/lustre/llite/dir.c
@@ -2292,7 +2292,10 @@  static long ll_dir_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 	case FS_IOC_ADD_ENCRYPTION_KEY:
 		if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
 			return -EOPNOTSUPP;
-		return fscrypt_ioctl_add_key(file, (void __user *)arg);
+		rc = fscrypt_ioctl_add_key(file, (void __user *)arg);
+		if (!rc)
+			sptlrpc_enc_pool_add_user();
+		return rc;
 	case FS_IOC_REMOVE_ENCRYPTION_KEY:
 		if (!ll_sbi_has_encrypt(ll_i2sbi(inode)))
 			return -EOPNOTSUPP;
diff --git a/fs/lustre/osc/osc_request.c b/fs/lustre/osc/osc_request.c
index 5a4db29..d66185b 100644
--- a/fs/lustre/osc/osc_request.c
+++ b/fs/lustre/osc/osc_request.c
@@ -1378,13 +1378,109 @@  static int osc_checksum_bulk_rw(const char *obd_name,
 	return rc;
 }
 
+/**
+ * osc_encrypt_pagecache_blocks() - overlay to fscrypt_encrypt_pagecache_blocks
+ * @srcpage:	The locked pagecache page containing the block(s) to encrypt
+ * @dstpage:	The page to put encryption result
+ * @len:	Total size of the block(s) to encrypt.  Must be a nonzero
+ *		multiple of the filesystem's block size.
+ * @offs:	Byte offset within @page of the first block to encrypt.  Must be
+ *		a multiple of the filesystem's block size.
+ * @gfp_flags:	Memory allocation flags
+ *
+ * This overlay function is necessary to be able to provide our own bounce page.
+ */
+static struct page *osc_encrypt_pagecache_blocks(struct page *srcpage,
+						 struct page *dstpage,
+						 unsigned int len,
+						 unsigned int offs,
+						 gfp_t gfp_flags)
+{
+	const struct inode *inode = srcpage->mapping->host;
+	const unsigned int blockbits = inode->i_blkbits;
+	const unsigned int blocksize = 1 << blockbits;
+	u64 lblk_num = ((u64)srcpage->index << (PAGE_SHIFT - blockbits)) +
+		       (offs >> blockbits);
+	unsigned int i;
+	int err;
+
+	if (unlikely(!dstpage))
+		return fscrypt_encrypt_pagecache_blocks(srcpage, len, offs,
+							gfp_flags);
+
+	if (WARN_ON_ONCE(!PageLocked(srcpage)))
+		return ERR_PTR(-EINVAL);
+
+	if (WARN_ON_ONCE(len <= 0 || !IS_ALIGNED(len | offs, blocksize)))
+		return ERR_PTR(-EINVAL);
+
+	/* Set PagePrivate2 for disambiguation in
+	 * osc_finalize_bounce_page().
+	 * It means cipher page was not allocated by llcrypt.
+	 */
+	SetPagePrivate2(dstpage);
+
+	for (i = offs; i < offs + len; i += blocksize, lblk_num++) {
+		err = fscrypt_encrypt_block(inode, srcpage, dstpage, blocksize,
+					    i, lblk_num, gfp_flags);
+		if (err)
+			return ERR_PTR(err);
+	}
+	SetPagePrivate(dstpage);
+	set_page_private(dstpage, (unsigned long)srcpage);
+	return dstpage;
+}
+
+/**
+ * osc_finalize_bounce_page() - overlay to llcrypt_finalize_bounce_page
+ *
+ * This overlay function is necessary to handle bounce pages
+ * allocated by ourselves.
+ */
+static inline void osc_finalize_bounce_page(struct page **pagep)
+{
+	struct page *page = *pagep;
+
+	/* PagePrivate2 was set in osc_encrypt_pagecache_blocks
+	 * to indicate the cipher page was allocated by ourselves.
+	 * So we must not free it via fscrypt.
+	 */
+	if (unlikely(!page || !PagePrivate2(page)))
+		return fscrypt_finalize_bounce_page(pagep);
+
+	if (fscrypt_is_bounce_page(page)) {
+		*pagep = fscrypt_pagecache_page(page);
+		ClearPagePrivate2(page);
+		set_page_private(page, (unsigned long)NULL);
+		ClearPagePrivate(page);
+	}
+}
+
 static inline void osc_release_bounce_pages(struct brw_page **pga,
 					    u32 page_count)
 {
 #ifdef CONFIG_FS_ENCRYPTION
-	int i;
+	struct page **pa = NULL;
+	int i, j = 0;
+
+	if (PageChecked(pga[0]->pg)) {
+		pa = kvmalloc_array(page_count, sizeof(*pa),
+				    GFP_KERNEL | __GFP_ZERO);
+		if (!pa)
+			return;
+	}
 
 	for (i = 0; i < page_count; i++) {
+		/* Bounce pages used by osc_encrypt_pagecache_blocks()
+		 * called from osc_brw_prep_request()
+		 * are identified thanks to the PageChecked flag.
+		 */
+		if (PageChecked(pga[i]->pg)) {
+			if (pa)
+				pa[j++] = pga[i]->pg;
+			osc_finalize_bounce_page(&pga[i]->pg);
+		}
+
 		/* Bounce pages allocated by a call to
 		 * fscrypt_encrypt_pagecache_blocks() in osc_brw_prep_request()
 		 * are identified thanks to the PageChecked flag.
@@ -1394,6 +1490,11 @@  static inline void osc_release_bounce_pages(struct brw_page **pga,
 		pga[i]->count -= pga[i]->bp_count_diff;
 		pga[i]->off += pga[i]->bp_off_diff;
 	}
+
+	if (pa) {
+		sptlrpc_enc_pool_put_pages_array(pa, j);
+		kvfree(pa);
+	}
 #endif
 }
 
@@ -1445,6 +1546,22 @@  static int osc_brw_prep_request(int cmd, struct client_obd *cli,
 
 	if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode) &&
 	    fscrypt_has_encryption_key(inode)) {
+		struct page **pa = NULL;
+
+		kvfree(pa);
+		if (!pa) {
+			ptlrpc_request_free(req);
+			return -ENOMEM;
+		}
+
+		rc = sptlrpc_enc_pool_get_pages_array(pa, page_count);
+		if (rc) {
+			CDEBUG(D_SEC, "failed to allocate from enc pool: %d\n",
+			       rc);
+			ptlrpc_request_free(req);
+			return rc;
+		}
+
 		for (i = 0; i < page_count; i++) {
 			struct brw_page *brwpg = pga[i];
 			struct page *data_page = NULL;
@@ -1474,9 +1591,10 @@  static int osc_brw_prep_request(int cmd, struct client_obd *cli,
 				brwpg->pg->index = clpage->cp_page_index;
 			}
 			data_page =
-				fscrypt_encrypt_pagecache_blocks(brwpg->pg,
-								 nunits, 0,
-								 GFP_NOFS);
+				osc_encrypt_pagecache_blocks(brwpg->pg,
+							     pa ? pa[i] : NULL,
+							     nunits, 0,
+							     GFP_NOFS);
 			if (directio) {
 				brwpg->pg->mapping = map_orig;
 				brwpg->pg->index = index_orig;
@@ -1490,6 +1608,11 @@  static int osc_brw_prep_request(int cmd, struct client_obd *cli,
 					rc = 0;
 					goto retry_encrypt;
 				}
+				if (pa) {
+					sptlrpc_enc_pool_put_pages_array(pa + i,
+									 page_count - i);
+					kvfree(pa);
+				}
 				ptlrpc_request_free(req);
 				return rc;
 			}
@@ -1515,6 +1638,9 @@  static int osc_brw_prep_request(int cmd, struct client_obd *cli,
 			brwpg->bp_off_diff = brwpg->off & ~PAGE_MASK;
 			brwpg->off = brwpg->off & PAGE_MASK;
 		}
+
+		if (pa)
+			kvfree(pa);
 	} else if (opc == OST_WRITE && inode && IS_ENCRYPTED(inode)) {
 		struct osc_async_page *oap = brw_page2oap(pga[0]);
 		struct cl_page *clpage = oap2cl_page(oap);
diff --git a/fs/lustre/ptlrpc/sec_bulk.c b/fs/lustre/ptlrpc/sec_bulk.c
index b6ae77b..5dad83f 100644
--- a/fs/lustre/ptlrpc/sec_bulk.c
+++ b/fs/lustre/ptlrpc/sec_bulk.c
@@ -297,14 +297,190 @@  static unsigned long enc_pools_cleanup(struct page ***pools, int npools)
 	return cleaned;
 }
 
+/*
+ * merge @npools pointed by @pools which contains @npages new pages
+ * into current pools.
+ *
+ * we have options to avoid most memory copy with some tricks. but we choose
+ * the simplest way to avoid complexity. It's not frequently called.
+ */
+static void enc_pools_insert(struct page ***pools, int npools, int npages)
+{
+	int freeslot;
+	int op_idx, np_idx, og_idx, ng_idx;
+	int cur_npools, end_npools;
+
+	LASSERT(npages > 0);
+	LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages);
+	LASSERT(npages_to_npools(npages) == npools);
+	LASSERT(page_pools.epp_growing);
+
+	spin_lock(&page_pools.epp_lock);
+
+	/*
+	 * (1) fill all the free slots of current pools.
+	 */
+	/*
+	 * free slots are those left by rent pages, and the extra ones with
+	 * index >= total_pages, locate at the tail of last pool.
+	 */
+	freeslot = page_pools.epp_total_pages % PAGES_PER_POOL;
+	if (freeslot != 0)
+		freeslot = PAGES_PER_POOL - freeslot;
+	freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages;
+
+	op_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
+	og_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+	np_idx = npools - 1;
+	ng_idx = (npages - 1) % PAGES_PER_POOL;
+
+	while (freeslot) {
+		LASSERT(!page_pools.epp_pools[op_idx][og_idx]);
+		LASSERT(!pools[np_idx][ng_idx]);
+
+		page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
+		pools[np_idx][ng_idx] = NULL;
+
+		freeslot--;
+
+		if (++og_idx == PAGES_PER_POOL) {
+			op_idx++;
+			og_idx = 0;
+		}
+		if (--ng_idx < 0) {
+			if (np_idx == 0)
+				break;
+			np_idx--;
+			ng_idx = PAGES_PER_POOL - 1;
+		}
+	}
+
+	/*
+	 * (2) add pools if needed.
+	 */
+	cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) /
+		      PAGES_PER_POOL;
+	end_npools = (page_pools.epp_total_pages + npages +
+		      PAGES_PER_POOL - 1) / PAGES_PER_POOL;
+	LASSERT(end_npools <= page_pools.epp_max_pools);
+
+	np_idx = 0;
+	while (cur_npools < end_npools) {
+		LASSERT(page_pools.epp_pools[cur_npools] == NULL);
+		LASSERT(np_idx < npools);
+		LASSERT(pools[np_idx] != NULL);
+
+		page_pools.epp_pools[cur_npools++] = pools[np_idx];
+		pools[np_idx++] = NULL;
+	}
+
+	page_pools.epp_total_pages += npages;
+	page_pools.epp_free_pages += npages;
+	page_pools.epp_st_lowfree = page_pools.epp_free_pages;
+
+	if (page_pools.epp_total_pages > page_pools.epp_st_max_pages)
+		page_pools.epp_st_max_pages = page_pools.epp_total_pages;
+
+	CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
+	       page_pools.epp_total_pages);
+
+	spin_unlock(&page_pools.epp_lock);
+}
+
+static int enc_pools_add_pages(int npages)
+{
+	static DEFINE_MUTEX(add_pages_mutex);
+	struct page ***pools;
+	int npools, alloced = 0;
+	int i, j, rc = -ENOMEM;
+
+	if (npages < PTLRPC_MAX_BRW_PAGES)
+		npages = PTLRPC_MAX_BRW_PAGES;
+
+	mutex_lock(&add_pages_mutex);
+
+	if (npages + page_pools.epp_total_pages > page_pools.epp_max_pages)
+		npages = page_pools.epp_max_pages - page_pools.epp_total_pages;
+	LASSERT(npages > 0);
+
+	page_pools.epp_st_grows++;
+
+	npools = npages_to_npools(npages);
+
+	pools = kvmalloc_array(npools, sizeof(*pools),
+			       GFP_KERNEL | __GFP_ZERO);
+	if (!pools)
+		goto out;
+
+	for (i = 0; i < npools; i++) {
+		pools[i] = kzalloc(PAGE_SIZE, GFP_NOFS);
+		if (!pools[i])
+			goto out_pools;
+
+		for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
+			pools[i][j] = alloc_page(GFP_NOFS |
+						 __GFP_HIGHMEM);
+			if (!pools[i][j])
+				goto out_pools;
+
+			alloced++;
+		}
+	}
+	LASSERT(alloced == npages);
+
+	enc_pools_insert(pools, npools, npages);
+	CDEBUG(D_SEC, "added %d pages into pools\n", npages);
+	rc = 0;
+
+out_pools:
+	enc_pools_cleanup(pools, npools);
+	kvfree(pools);
+out:
+	if (rc) {
+		page_pools.epp_st_grow_fails++;
+		CERROR("Failed to allocate %d enc pages\n", npages);
+	}
+
+	mutex_unlock(&add_pages_mutex);
+	return rc;
+}
+
 static inline void enc_pools_wakeup(void)
 {
 	assert_spin_locked(&page_pools.epp_lock);
 
-	if (unlikely(page_pools.epp_waitqlen)) {
-		LASSERT(waitqueue_active(&page_pools.epp_waitq));
+	/* waitqueue_active */
+	if (unlikely(waitqueue_active(&page_pools.epp_waitq)))
 		wake_up(&page_pools.epp_waitq);
-	}
+}
+
+static int enc_pools_should_grow(int page_needed, time64_t now)
+{
+	/*
+	 * don't grow if someone else is growing the pools right now,
+	 * or the pools has reached its full capacity
+	 */
+	if (page_pools.epp_growing ||
+	    page_pools.epp_total_pages == page_pools.epp_max_pages)
+		return 0;
+
+	/* if total pages is not enough, we need to grow */
+	if (page_pools.epp_total_pages < page_needed)
+		return 1;
+
+	/*
+	 * we wanted to return 0 here if there was a shrink just
+	 * happened a moment ago, but this may cause deadlock if both
+	 * client and ost live on single node.
+	 */
+
+	/*
+	 * here we perhaps need consider other factors like wait queue
+	 * length, idle index, etc. ?
+	 */
+
+	/* grow the pools in any other cases */
+	return 1;
 }
 
 /*
@@ -323,49 +499,287 @@  int pool_is_at_full_capacity(void)
 	return (page_pools.epp_total_pages == page_pools.epp_max_pages);
 }
 
-void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
+static inline struct page **page_from_bulkdesc(void *array, int index)
 {
+	struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array;
+
+	return &desc->bd_enc_vec[index].bv_page;
+}
+
+static inline struct page **page_from_pagearray(void *array, int index)
+{
+	struct page **pa = (struct page **)array;
+
+	return &pa[index];
+}
+
+/*
+ * we allocate the requested pages atomically.
+ */
+static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count,
+					       struct page **(*page_from)(void *, int))
+{
+	wait_queue_entry_t waitlink;
+	unsigned long this_idle = -1;
+	u64 tick_ns = 0;
+	time64_t now;
 	int p_idx, g_idx;
-	int i;
+	int i, rc = 0;
 
-	if (!desc->bd_enc_vec)
-		return;
+	if (!array || count <= 0 || count > page_pools.epp_max_pages)
+		return -EINVAL;
+
+	spin_lock(&page_pools.epp_lock);
+
+	page_pools.epp_st_access++;
+again:
+	if (unlikely(page_pools.epp_free_pages < count)) {
+		if (tick_ns == 0)
+			tick_ns = ktime_get_ns();
+
+		now = ktime_get_real_seconds();
+
+		page_pools.epp_st_missings++;
+		page_pools.epp_pages_short += count;
+
+		if (enc_pools_should_grow(count, now)) {
+			page_pools.epp_growing = 1;
+
+			spin_unlock(&page_pools.epp_lock);
+			enc_pools_add_pages(page_pools.epp_pages_short / 2);
+			spin_lock(&page_pools.epp_lock);
+
+			page_pools.epp_growing = 0;
+
+			enc_pools_wakeup();
+		} else {
+			if (page_pools.epp_growing) {
+				if (++page_pools.epp_waitqlen >
+				    page_pools.epp_st_max_wqlen)
+					page_pools.epp_st_max_wqlen =
+						page_pools.epp_waitqlen;
+
+				set_current_state(TASK_UNINTERRUPTIBLE);
+				init_wait(&waitlink);
+				add_wait_queue(&page_pools.epp_waitq,
+					       &waitlink);
+
+				spin_unlock(&page_pools.epp_lock);
+				schedule();
+				remove_wait_queue(&page_pools.epp_waitq,
+						  &waitlink);
+				spin_lock(&page_pools.epp_lock);
+				page_pools.epp_waitqlen--;
+			} else {
+				/*
+				 * ptlrpcd thread should not sleep in that case,
+				 * or deadlock may occur!
+				 * Instead, return -ENOMEM so that upper layers
+				 * will put request back in queue.
+				 */
+				page_pools.epp_st_outofmem++;
+				rc = -ENOMEM;
+				goto out_unlock;
+			}
+		}
+
+		if (page_pools.epp_pages_short < count) {
+			rc = -EPROTO;
+			goto out_unlock;
+		}
+		page_pools.epp_pages_short -= count;
+
+		this_idle = 0;
+		goto again;
+	}
+
+	/* record max wait time */
+	if (unlikely(tick_ns)) {
+		ktime_t tick = ktime_sub_ns(ktime_get(), tick_ns);
+
+		if (ktime_after(tick, page_pools.epp_st_max_wait))
+			page_pools.epp_st_max_wait = tick;
+	}
+
+	/* proceed with rest of allocation */
+	page_pools.epp_free_pages -= count;
+
+	p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
+	g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
+
+	for (i = 0; i < count; i++) {
+		struct page **pagep = page_from(array, i);
+
+		if (!page_pools.epp_pools[p_idx][g_idx]) {
+			rc = -EPROTO;
+			goto out_unlock;
+		}
+		*pagep = page_pools.epp_pools[p_idx][g_idx];
+		page_pools.epp_pools[p_idx][g_idx] = NULL;
+
+		if (++g_idx == PAGES_PER_POOL) {
+			p_idx++;
+			g_idx = 0;
+		}
+	}
+
+	if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
+		page_pools.epp_st_lowfree = page_pools.epp_free_pages;
+
+	/*
+	 * new idle index = (old * weight + new) / (weight + 1)
+	 */
+	if (this_idle == -1) {
+		this_idle = page_pools.epp_free_pages * IDLE_IDX_MAX /
+			    page_pools.epp_total_pages;
+	}
+	page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT +
+				   this_idle) / (IDLE_IDX_WEIGHT + 1);
+
+	page_pools.epp_last_access = ktime_get_seconds();
+
+out_unlock:
+	spin_unlock(&page_pools.epp_lock);
+	return rc;
+}
+
+int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
+{
+	int rc;
 
 	LASSERT(desc->bd_iov_count > 0);
+	LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
+
+	/* resent bulk, enc iov might have been allocated previously */
+	if (desc->bd_enc_vec)
+		return 0;
+
+	desc->bd_enc_vec = kvmalloc_array(desc->bd_iov_count,
+					  sizeof(*desc->bd_enc_vec),
+					  GFP_KERNEL | __GFP_ZERO);
+	if (!desc->bd_enc_vec)
+		return -ENOMEM;
+
+	rc = __sptlrpc_enc_pool_get_pages((void *)desc, desc->bd_iov_count,
+					  page_from_bulkdesc);
+	if (rc) {
+		kvfree(desc->bd_enc_vec);
+		desc->bd_enc_vec = NULL;
+	}
+	return rc;
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
+
+int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count)
+{
+	return __sptlrpc_enc_pool_get_pages((void *)pa, count,
+					    page_from_pagearray);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages_array);
+
+static int __sptlrpc_enc_pool_put_pages(void *array, unsigned int count,
+					struct page **(*page_from)(void *, int))
+{
+	int p_idx, g_idx;
+	int i, rc = 0;
+
+	if (!array || count <= 0)
+		return -EINVAL;
 
 	spin_lock(&page_pools.epp_lock);
 
 	p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
 	g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-	LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
-		page_pools.epp_total_pages);
-	LASSERT(page_pools.epp_pools[p_idx]);
+	if (page_pools.epp_free_pages + count > page_pools.epp_total_pages) {
+		rc = -EPROTO;
+		goto out_unlock;
+	}
+	if (!page_pools.epp_pools[p_idx]) {
+		rc = -EPROTO;
+		goto out_unlock;
+	}
 
-	for (i = 0; i < desc->bd_iov_count; i++) {
-		LASSERT(desc->bd_enc_vec[i].bv_page);
-		LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
-		LASSERT(!page_pools.epp_pools[p_idx][g_idx]);
+	for (i = 0; i < count; i++) {
+		struct page **pagep = page_from(array, i);
 
-		page_pools.epp_pools[p_idx][g_idx] =
-			desc->bd_enc_vec[i].bv_page;
+		if (!*pagep ||
+		    page_pools.epp_pools[p_idx][g_idx]) {
+			rc = -EPROTO;
+			goto out_unlock;
+		}
 
+		page_pools.epp_pools[p_idx][g_idx] = *pagep;
 		if (++g_idx == PAGES_PER_POOL) {
 			p_idx++;
 			g_idx = 0;
 		}
 	}
 
-	page_pools.epp_free_pages += desc->bd_iov_count;
-
+	page_pools.epp_free_pages += count;
 	enc_pools_wakeup();
 
+out_unlock:
 	spin_unlock(&page_pools.epp_lock);
+	return rc;
+}
+
+void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
+{
+	int rc;
+
+	if (!desc->bd_enc_vec)
+		return;
+
+	rc = __sptlrpc_enc_pool_put_pages((void *)desc, desc->bd_iov_count,
+					  page_from_bulkdesc);
+	if (rc)
+		CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
 
-	kfree(desc->bd_enc_vec);
+	kvfree(desc->bd_enc_vec);
 	desc->bd_enc_vec = NULL;
 }
 
+void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count)
+{
+	int rc;
+
+	rc = __sptlrpc_enc_pool_put_pages((void *)pa, count,
+					  page_from_pagearray);
+	if (rc)
+		CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages_array);
+
+/*
+ * we don't do much stuff for add_user/del_user anymore, except adding some
+ * initial pages in add_user() if current pools are empty, rest would be
+ * handled by the pools's self-adaption.
+ */
+int sptlrpc_enc_pool_add_user(void)
+{
+	int need_grow = 0;
+
+	spin_lock(&page_pools.epp_lock);
+	if (page_pools.epp_growing == 0 && page_pools.epp_total_pages == 0) {
+		page_pools.epp_growing = 1;
+		need_grow = 1;
+	}
+	spin_unlock(&page_pools.epp_lock);
+
+	if (need_grow) {
+		enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES +
+				    PTLRPC_MAX_BRW_PAGES);
+
+		spin_lock(&page_pools.epp_lock);
+		page_pools.epp_growing = 0;
+		enc_pools_wakeup();
+		spin_unlock(&page_pools.epp_lock);
+	}
+	return 0;
+}
+EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
+
 static inline void enc_pools_alloc(void)
 {
 	LASSERT(page_pools.epp_max_pools);
diff --git a/include/linux/fscrypt.h b/include/linux/fscrypt.h
index 991ff85..be0490f 100644
--- a/include/linux/fscrypt.h
+++ b/include/linux/fscrypt.h
@@ -128,15 +128,35 @@  struct page *fscrypt_encrypt_pagecache_blocks(struct page *page,
 					      unsigned int len,
 					      unsigned int offs,
 					      gfp_t gfp_flags);
-int fscrypt_encrypt_block_inplace(const struct inode *inode, struct page *page,
-				  unsigned int len, unsigned int offs,
-				  u64 lblk_num, gfp_t gfp_flags);
+int fscrypt_encrypt_block(const struct inode *inode, struct page *src,
+			   struct page *dst, unsigned int len,
+			   unsigned int offs, u64 lblk_num, gfp_t gfp_flags);
+
+static inline int fscrypt_encrypt_block_inplace(const struct inode *inode,
+						struct page *page,
+						unsigned int len,
+						unsigned int offs,
+						u64 lblk_num)
+{
+	return fscrypt_encrypt_block(inode, page, page, len, offs, lblk_num,
+				     GFP_NOFS);
+}
 
 int fscrypt_decrypt_pagecache_blocks(struct page *page, unsigned int len,
 				     unsigned int offs);
-int fscrypt_decrypt_block_inplace(const struct inode *inode, struct page *page,
-				  unsigned int len, unsigned int offs,
-				  u64 lblk_num);
+
+int fscrypt_decrypt_block(const struct inode *inode, struct page *src,
+			   struct page *dst, unsigned int len,
+			   unsigned int offs, u64 lblk_num, gfp_t gfp_flags);
+
+static inline int fscrypt_decrypt_block_inplace(const struct inode *inode,
+						struct page *page,
+						unsigned int len, unsigned int offs,
+						u64 lblk_num)
+{
+	return fscrypt_decrypt_block(inode, page, page, len, offs, lblk_num,
+				     GFP_NOFS);
+}
 
 static inline bool fscrypt_is_bounce_page(struct page *page)
 {
@@ -272,6 +292,15 @@  static inline struct page *fscrypt_encrypt_pagecache_blocks(struct page *page,
 	return ERR_PTR(-EOPNOTSUPP);
 }
 
+static inline int fscrypt_encrypt_block(const struct inode *inode,
+					 struct page *src, struct page *dst,
+					 unsigned int len,
+					 unsigned int offs, u64 lblk_num,
+					 gfp_t gfp_flags)
+{
+	return -EOPNOTSUPP;
+}
+
 static inline int fscrypt_encrypt_block_inplace(const struct inode *inode,
 						struct page *page,
 						unsigned int len,
@@ -296,6 +325,14 @@  static inline int fscrypt_decrypt_block_inplace(const struct inode *inode,
 	return -EOPNOTSUPP;
 }
 
+static inline int fscrypt_decrypt_block(const struct inode *inode,
+					 struct page *src, struct page *dst,
+					 unsigned int len,
+					 unsigned int offs, u64 lblk_num)
+{
+	return -EOPNOTSUPP;
+}
+
 static inline bool fscrypt_is_bounce_page(struct page *page)
 {
 	return false;