diff mbox series

[RFC,04/35] ceph: Convert ceph_mds_request::r_pagelist to a databuf

Message ID 20250313233341.1675324-5-dhowells@redhat.com (mailing list archive)
State New
Headers show
Series ceph, rbd, netfs: Make ceph fully use netfslib | expand

Commit Message

David Howells March 13, 2025, 11:32 p.m. UTC
Convert ceph_mds_request::r_pagelist to a databuf, along with the stuff
that uses it such as setxattr ops.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Viacheslav Dubeyko <slava@dubeyko.com>
cc: Alex Markuze <amarkuze@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: ceph-devel@vger.kernel.org
cc: linux-fsdevel@vger.kernel.org
---
 fs/ceph/acl.c        | 39 ++++++++++----------
 fs/ceph/file.c       | 12 ++++---
 fs/ceph/inode.c      | 85 +++++++++++++++++++-------------------------
 fs/ceph/mds_client.c | 11 +++---
 fs/ceph/mds_client.h |  2 +-
 fs/ceph/super.h      |  2 +-
 fs/ceph/xattr.c      | 68 +++++++++++++++--------------------
 7 files changed, 96 insertions(+), 123 deletions(-)

Comments

Viacheslav Dubeyko March 14, 2025, 10:27 p.m. UTC | #1
On Thu, 2025-03-13 at 23:32 +0000, David Howells wrote:
> Convert ceph_mds_request::r_pagelist to a databuf, along with the
> stuff
> that uses it such as setxattr ops.
> 
> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Viacheslav Dubeyko <slava@dubeyko.com>
> cc: Alex Markuze <amarkuze@redhat.com>
> cc: Ilya Dryomov <idryomov@gmail.com>
> cc: ceph-devel@vger.kernel.org
> cc: linux-fsdevel@vger.kernel.org
> ---
>  fs/ceph/acl.c        | 39 ++++++++++----------
>  fs/ceph/file.c       | 12 ++++---
>  fs/ceph/inode.c      | 85 +++++++++++++++++++-----------------------
> --
>  fs/ceph/mds_client.c | 11 +++---
>  fs/ceph/mds_client.h |  2 +-
>  fs/ceph/super.h      |  2 +-
>  fs/ceph/xattr.c      | 68 +++++++++++++++--------------------
>  7 files changed, 96 insertions(+), 123 deletions(-)
> 
> diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
> index 1564eacc253d..d6da650db83e 100644
> --- a/fs/ceph/acl.c
> +++ b/fs/ceph/acl.c
> @@ -171,7 +171,7 @@ int ceph_pre_init_acls(struct inode *dir, umode_t
> *mode,
>  {
>  	struct posix_acl *acl, *default_acl;
>  	size_t val_size1 = 0, val_size2 = 0;
> -	struct ceph_pagelist *pagelist = NULL;
> +	struct ceph_databuf *dbuf = NULL;
>  	void *tmp_buf = NULL;
>  	int err;
>  
> @@ -201,58 +201,55 @@ int ceph_pre_init_acls(struct inode *dir,
> umode_t *mode,
>  	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
>  	if (!tmp_buf)
>  		goto out_err;
> -	pagelist = ceph_pagelist_alloc(GFP_KERNEL);
> -	if (!pagelist)
> +	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_KERNEL);
> +	if (!dbuf)
>  		goto out_err;
>  
> -	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
> -	if (err)
> -		goto out_err;
> -
> -	ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 :
> 1);
> +	ceph_databuf_encode_32(dbuf, acl && default_acl ? 2 : 1);
>  
>  	if (acl) {
>  		size_t len = strlen(XATTR_NAME_POSIX_ACL_ACCESS);
> -		err = ceph_pagelist_reserve(pagelist, len +
> val_size1 + 8);
> +		err = ceph_databuf_reserve(dbuf, len + val_size1 +
> 8,
> +					   GFP_KERNEL);

I know that it's simple change. But this len + val_size1 + 8 looks
confusing, anyway. What this hardcoded 8 means? :)


>  		if (err)
>  			goto out_err;
> -		ceph_pagelist_encode_string(pagelist,
> XATTR_NAME_POSIX_ACL_ACCESS,
> -					    len);
> +		ceph_databuf_encode_string(dbuf,
> XATTR_NAME_POSIX_ACL_ACCESS,
> +					   len);
>  		err = posix_acl_to_xattr(&init_user_ns, acl,
>  					 tmp_buf, val_size1);
>  		if (err < 0)
>  			goto out_err;
> -		ceph_pagelist_encode_32(pagelist, val_size1);
> -		ceph_pagelist_append(pagelist, tmp_buf, val_size1);
> +		ceph_databuf_encode_32(dbuf, val_size1);
> +		ceph_databuf_append(dbuf, tmp_buf, val_size1);
>  	}
>  	if (default_acl) {
>  		size_t len = strlen(XATTR_NAME_POSIX_ACL_DEFAULT);
> -		err = ceph_pagelist_reserve(pagelist, len +
> val_size2 + 8);
> +		err = ceph_databuf_reserve(dbuf, len + val_size2 +
> 8,
> +					   GFP_KERNEL);

Same question here. :) What this hardcoded 8 means? :)

>  		if (err)
>  			goto out_err;
> -		ceph_pagelist_encode_string(pagelist,
> -					 
> XATTR_NAME_POSIX_ACL_DEFAULT, len);
> +		ceph_databuf_encode_string(dbuf,
> +					  
> XATTR_NAME_POSIX_ACL_DEFAULT, len);
>  		err = posix_acl_to_xattr(&init_user_ns, default_acl,
>  					 tmp_buf, val_size2);
>  		if (err < 0)
>  			goto out_err;
> -		ceph_pagelist_encode_32(pagelist, val_size2);
> -		ceph_pagelist_append(pagelist, tmp_buf, val_size2);
> +		ceph_databuf_encode_32(dbuf, val_size2);
> +		ceph_databuf_append(dbuf, tmp_buf, val_size2);
>  	}
>  
>  	kfree(tmp_buf);
>  
>  	as_ctx->acl = acl;
>  	as_ctx->default_acl = default_acl;
> -	as_ctx->pagelist = pagelist;
> +	as_ctx->dbuf = dbuf;
>  	return 0;
>  
>  out_err:
>  	posix_acl_release(acl);
>  	posix_acl_release(default_acl);
>  	kfree(tmp_buf);
> -	if (pagelist)
> -		ceph_pagelist_release(pagelist);
> +	ceph_databuf_release(dbuf);
>  	return err;
>  }
>  
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 851d70200c6b..9de2960748b9 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -679,9 +679,9 @@ static int ceph_finish_async_create(struct inode
> *dir, struct inode *inode,
>  	iinfo.change_attr = 1;
>  	ceph_encode_timespec64(&iinfo.btime, &now);
>  
> -	if (req->r_pagelist) {
> -		iinfo.xattr_len = req->r_pagelist->length;
> -		iinfo.xattr_data = req->r_pagelist->mapped_tail;
> +	if (req->r_dbuf) {
> +		iinfo.xattr_len = ceph_databuf_len(req->r_dbuf);
> +		iinfo.xattr_data = kmap_ceph_databuf_page(req-
> >r_dbuf, 0);

Possibly, it's in another patch. Have we removed req->r_pagelist from
the structure?

Do we always have memory pages in ceph_databuf? How
kmap_ceph_databuf_page() will behave if it's not memory page.

>  	} else {
>  		/* fake it */
>  		iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
> @@ -731,6 +731,8 @@ static int ceph_finish_async_create(struct inode
> *dir, struct inode *inode,
>  	ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req-
> >r_session,
>  			      req->r_fmode, NULL);
>  	up_read(&mdsc->snap_rwsem);
> +	if (req->r_dbuf)
> +		kunmap_local(iinfo.xattr_data);

Maybe, we need to hide kunmap_local() into something like
kunmap_ceph_databuf_page()?

>  	if (ret) {
>  		doutc(cl, "failed to fill inode: %d\n", ret);
>  		ceph_dir_clear_complete(dir);
> @@ -849,8 +851,8 @@ int ceph_atomic_open(struct inode *dir, struct
> dentry *dentry,
>  			goto out_ctx;
>  		}
>  		/* Async create can't handle more than a page of
> xattrs */
> -		if (as_ctx.pagelist &&
> -		    !list_is_singular(&as_ctx.pagelist->head))
> +		if (as_ctx.dbuf &&
> +		    as_ctx.dbuf->nr_bvec > 1)

Maybe, it makes sense to call something like ceph_databuf_length()
instead of low level access to dbuf->nr_bvec?

>  			try_async = false;
>  	} else if (!d_in_lookup(dentry)) {
>  		/* If it's not being looked up, it's negative */
> diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
> index b060f765ad20..ec9b80fec7be 100644
> --- a/fs/ceph/inode.c
> +++ b/fs/ceph/inode.c
> @@ -112,9 +112,9 @@ struct inode *ceph_new_inode(struct inode *dir,
> struct dentry *dentry,
>  void ceph_as_ctx_to_req(struct ceph_mds_request *req,
>  			struct ceph_acl_sec_ctx *as_ctx)
>  {
> -	if (as_ctx->pagelist) {
> -		req->r_pagelist = as_ctx->pagelist;
> -		as_ctx->pagelist = NULL;
> +	if (as_ctx->dbuf) {
> +		req->r_dbuf = as_ctx->dbuf;
> +		as_ctx->dbuf = NULL;

Maybe, we need something like swap() method? :)

>  	}
>  	ceph_fscrypt_as_ctx_to_req(req, as_ctx);
>  }
> @@ -2341,11 +2341,10 @@ static int fill_fscrypt_truncate(struct inode
> *inode,
>  	loff_t pos, orig_pos = round_down(attr->ia_size,
>  					  CEPH_FSCRYPT_BLOCK_SIZE);
>  	u64 block = orig_pos >> CEPH_FSCRYPT_BLOCK_SHIFT;
> -	struct ceph_pagelist *pagelist = NULL;
> -	struct kvec iov = {0};
> +	struct ceph_databuf *dbuf = NULL;
>  	struct iov_iter iter;
> -	struct page *page = NULL;
> -	struct ceph_fscrypt_truncate_size_header header;
> +	struct ceph_fscrypt_truncate_size_header *header;
> +	void *p;
>  	int retry_op = 0;
>  	int len = CEPH_FSCRYPT_BLOCK_SIZE;
>  	loff_t i_size = i_size_read(inode);
> @@ -2372,37 +2371,35 @@ static int fill_fscrypt_truncate(struct inode
> *inode,
>  			goto out;
>  	}
>  
> -	page = __page_cache_alloc(GFP_KERNEL);
> -	if (page == NULL) {
> -		ret = -ENOMEM;
> +	ret = -ENOMEM;
> +	dbuf = ceph_databuf_req_alloc(2, 0, GFP_KERNEL);

So, do we allocate 2 items of zero length here?

> +	if (!dbuf)
>  		goto out;
> -	}
>  
> -	pagelist = ceph_pagelist_alloc(GFP_KERNEL);
> -	if (!pagelist) {
> -		ret = -ENOMEM;
> +	if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header),
> GFP_KERNEL) < 0)
> +		goto out;
> +	if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL)
> < 0)
>  		goto out;
> -	}
>  
> -	iov.iov_base = kmap_local_page(page);
> -	iov.iov_len = len;
> -	iov_iter_kvec(&iter, READ, &iov, 1, len);
> +	iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);

Is it correct &dbuf->bvec[1]? Why do we work with item #1? I think it
looks confusing.

>  
>  	pos = orig_pos;
>  	ret = __ceph_sync_read(inode, &pos, &iter, &retry_op,
> &objver);
>  	if (ret < 0)
>  		goto out;
>  
> +	header = kmap_ceph_databuf_page(dbuf, 0);
> +
>  	/* Insert the header first */
> -	header.ver = 1;
> -	header.compat = 1;
> -	header.change_attr =
> cpu_to_le64(inode_peek_iversion_raw(inode));
> +	header->ver = 1;
> +	header->compat = 1;
> +	header->change_attr =
> cpu_to_le64(inode_peek_iversion_raw(inode));
>  
>  	/*
>  	 * Always set the block_size to CEPH_FSCRYPT_BLOCK_SIZE,
>  	 * because in MDS it may need this to do the truncate.
>  	 */
> -	header.block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);
> +	header->block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);
>  
>  	/*
>  	 * If we hit a hole here, we should just skip filling
> @@ -2417,51 +2414,41 @@ static int fill_fscrypt_truncate(struct inode
> *inode,
>  	if (!objver) {
>  		doutc(cl, "hit hole, ppos %lld < size %lld\n", pos,
> i_size);
>  
> -		header.data_len = cpu_to_le32(8 + 8 + 4);
> -		header.file_offset = 0;
> +		header->data_len = cpu_to_le32(8 + 8 + 4);

The same problem of understanding here for me. What this hardcoded 8 +
8 + 4 value means? :)

> +		header->file_offset = 0;
>  		ret = 0;
>  	} else {
> -		header.data_len = cpu_to_le32(8 + 8 + 4 +
> CEPH_FSCRYPT_BLOCK_SIZE);
> -		header.file_offset = cpu_to_le64(orig_pos);
> +		header->data_len = cpu_to_le32(8 + 8 + 4 +
> CEPH_FSCRYPT_BLOCK_SIZE);

Ditto.

> +		header->file_offset = cpu_to_le64(orig_pos);
>  
>  		doutc(cl, "encrypt block boff/bsize %d/%lu\n", boff,
>  		      CEPH_FSCRYPT_BLOCK_SIZE);
>  
>  		/* truncate and zero out the extra contents for the
> last block */
> -		memset(iov.iov_base + boff, 0, PAGE_SIZE - boff);
> +		p = kmap_ceph_databuf_page(dbuf, 1);

Maybe, we need to introduce some constants to address #0 and #1 pages?
Because, #0 it's header and I assume #1 is some content.

> +		memset(p + boff, 0, PAGE_SIZE - boff);
> +		kunmap_local(p);
>  
>  		/* encrypt the last block */
> -		ret = ceph_fscrypt_encrypt_block_inplace(inode,
> page,
> -						   
> CEPH_FSCRYPT_BLOCK_SIZE,
> -						    0, block,
> -						    GFP_KERNEL);
> +		ret = ceph_fscrypt_encrypt_block_inplace(
> +			inode, ceph_databuf_page(dbuf, 1),
> +			CEPH_FSCRYPT_BLOCK_SIZE, 0, block,
> GFP_KERNEL);
>  		if (ret)
>  			goto out;
>  	}
>  
> -	/* Insert the header */
> -	ret = ceph_pagelist_append(pagelist, &header,
> sizeof(header));
> -	if (ret)
> -		goto out;
> +	ceph_databuf_added_data(dbuf, sizeof(*header));
> +	if (header->block_size)
> +		ceph_databuf_added_data(dbuf,
> CEPH_FSCRYPT_BLOCK_SIZE);
>  
> -	if (header.block_size) {
> -		/* Append the last block contents to pagelist */
> -		ret = ceph_pagelist_append(pagelist, iov.iov_base,
> -					   CEPH_FSCRYPT_BLOCK_SIZE);
> -		if (ret)
> -			goto out;
> -	}
> -	req->r_pagelist = pagelist;
> +	req->r_dbuf = dbuf;
>  out:
>  	doutc(cl, "%p %llx.%llx size dropping cap refs on %s\n",
> inode,
>  	      ceph_vinop(inode), ceph_cap_string(got));
>  	ceph_put_cap_refs(ci, got);
> -	if (iov.iov_base)
> -		kunmap_local(iov.iov_base);
> -	if (page)
> -		__free_pages(page, 0);
> -	if (ret && pagelist)
> -		ceph_pagelist_release(pagelist);
> +	kunmap_local(header);
> +	if (ret)
> +		ceph_databuf_release(dbuf);
>  	return ret;
>  }
>  
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index 230e0c3f341f..09661a34f287 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -1125,8 +1125,7 @@ void ceph_mdsc_release_request(struct kref
> *kref)
>  	put_cred(req->r_cred);
>  	if (req->r_mnt_idmap)
>  		mnt_idmap_put(req->r_mnt_idmap);
> -	if (req->r_pagelist)
> -		ceph_pagelist_release(req->r_pagelist);
> +	ceph_databuf_release(req->r_dbuf);
>  	kfree(req->r_fscrypt_auth);
>  	kfree(req->r_altname);
>  	put_request_session(req);
> @@ -3207,10 +3206,10 @@ static struct ceph_msg
> *create_request_message(struct ceph_mds_session *session,
>  	msg->front.iov_len = p - msg->front.iov_base;
>  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
>  
> -	if (req->r_pagelist) {
> -		struct ceph_pagelist *pagelist = req->r_pagelist;
> -		ceph_msg_data_add_pagelist(msg, pagelist);
> -		msg->hdr.data_len = cpu_to_le32(pagelist->length);
> +	if (req->r_dbuf) {
> +		struct ceph_databuf *dbuf = req->r_dbuf;
> +		ceph_msg_data_add_databuf(msg, dbuf);
> +		msg->hdr.data_len =
> cpu_to_le32(ceph_databuf_len(dbuf));
>  	} else {
>  		msg->hdr.data_len = 0;
>  	}
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index 3e2a6fa7c19a..a7ee8da07ce7 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -333,7 +333,7 @@ struct ceph_mds_request {
>  	u32 r_direct_hash;      /* choose dir frag based on this
> dentry hash */
>  
>  	/* data payload is used for xattr ops */
> -	struct ceph_pagelist *r_pagelist;
> +	struct ceph_databuf *r_dbuf;
>  
>  	/* what caps shall we drop? */
>  	int r_inode_drop, r_inode_unless;
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index bb0db0cc8003..984a6d2a5378 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -1137,7 +1137,7 @@ struct ceph_acl_sec_ctx {
>  #ifdef CONFIG_FS_ENCRYPTION
>  	struct ceph_fscrypt_auth *fscrypt_auth;
>  #endif
> -	struct ceph_pagelist *pagelist;
> +	struct ceph_databuf *dbuf;
>  };
>  
>  #ifdef CONFIG_SECURITY
> diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
> index 537165db4519..b083cd3b3974 100644
> --- a/fs/ceph/xattr.c
> +++ b/fs/ceph/xattr.c
> @@ -1114,17 +1114,17 @@ static int ceph_sync_setxattr(struct inode
> *inode, const char *name,
>  	struct ceph_mds_request *req;
>  	struct ceph_mds_client *mdsc = fsc->mdsc;
>  	struct ceph_osd_client *osdc = &fsc->client->osdc;
> -	struct ceph_pagelist *pagelist = NULL;
> +	struct ceph_databuf *dbuf = NULL;
>  	int op = CEPH_MDS_OP_SETXATTR;
>  	int err;
>  
>  	if (size > 0) {
> -		/* copy value into pagelist */
> -		pagelist = ceph_pagelist_alloc(GFP_NOFS);
> -		if (!pagelist)
> +		/* copy value into dbuf */
> +		dbuf = ceph_databuf_req_alloc(1, size, GFP_NOFS);
> +		if (!dbuf)
>  			return -ENOMEM;
>  
> -		err = ceph_pagelist_append(pagelist, value, size);
> +		err = ceph_databuf_append(dbuf, value, size);
>  		if (err)
>  			goto out;
>  	} else if (!value) {
> @@ -1154,8 +1154,8 @@ static int ceph_sync_setxattr(struct inode
> *inode, const char *name,
>  		req->r_args.setxattr.flags = cpu_to_le32(flags);
>  		req->r_args.setxattr.osdmap_epoch =
>  			cpu_to_le32(osdc->osdmap->epoch);
> -		req->r_pagelist = pagelist;
> -		pagelist = NULL;
> +		req->r_dbuf = dbuf;
> +		dbuf = NULL;
>  	}
>  
>  	req->r_inode = inode;
> @@ -1169,8 +1169,7 @@ static int ceph_sync_setxattr(struct inode
> *inode, const char *name,
>  	doutc(cl, "xattr.ver (after): %lld\n", ci-
> >i_xattrs.version);
>  
>  out:
> -	if (pagelist)
> -		ceph_pagelist_release(pagelist);
> +	ceph_databuf_release(dbuf);
>  	return err;
>  }
>  
> @@ -1377,7 +1376,7 @@ bool ceph_security_xattr_deadlock(struct inode
> *in)
>  int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
>  			   struct ceph_acl_sec_ctx *as_ctx)
>  {
> -	struct ceph_pagelist *pagelist = as_ctx->pagelist;
> +	struct ceph_databuf *dbuf = as_ctx->dbuf;
>  	const char *name;
>  	size_t name_len;
>  	int err;
> @@ -1391,14 +1390,11 @@ int ceph_security_init_secctx(struct dentry
> *dentry, umode_t mode,
>  	}
>  
>  	err = -ENOMEM;
> -	if (!pagelist) {
> -		pagelist = ceph_pagelist_alloc(GFP_KERNEL);
> -		if (!pagelist)
> +	if (!dbuf) {
> +		dbuf = ceph_databuf_req_alloc(0, PAGE_SIZE,
> GFP_KERNEL);
> +		if (!dbuf)
>  			goto out;
> -		err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
> -		if (err)
> -			goto out;
> -		ceph_pagelist_encode_32(pagelist, 1);
> +		ceph_databuf_encode_32(dbuf, 1);
>  	}
>  
>  	/*
> @@ -1407,38 +1403,31 @@ int ceph_security_init_secctx(struct dentry
> *dentry, umode_t mode,
>  	 * dentry_init_security hook.
>  	 */
>  	name_len = strlen(name);
> -	err = ceph_pagelist_reserve(pagelist,
> -				    4 * 2 + name_len + as_ctx-
> >lsmctx.len);
> +	err = ceph_databuf_reserve(dbuf, 4 * 2 + name_len + as_ctx-
> >lsmctx.len,
> +				   GFP_KERNEL);

The 4 * 2 + name_len + as_ctx->lsmctx.len looks unclear to me. It wil
be good to have some well defined constants here.

>  	if (err)
>  		goto out;
>  
> -	if (as_ctx->pagelist) {
> +	if (as_ctx->dbuf) {
>  		/* update count of KV pairs */
> -		BUG_ON(pagelist->length <= sizeof(__le32));
> -		if (list_is_singular(&pagelist->head)) {
> -			le32_add_cpu((__le32*)pagelist->mapped_tail,
> 1);
> -		} else {
> -			struct page *page =
> list_first_entry(&pagelist->head,
> -							     struct
> page, lru);
> -			void *addr = kmap_atomic(page);
> -			le32_add_cpu((__le32*)addr, 1);
> -			kunmap_atomic(addr);
> -		}
> +		BUG_ON(ceph_databuf_len(dbuf) <= sizeof(__le32));
> +		__le32 *addr = kmap_ceph_databuf_page(dbuf, 0);
> +		le32_add_cpu(addr, 1);
> +		kunmap_local(addr);
>  	} else {
> -		as_ctx->pagelist = pagelist;
> +		as_ctx->dbuf = dbuf;
>  	}
>  
> -	ceph_pagelist_encode_32(pagelist, name_len);
> -	ceph_pagelist_append(pagelist, name, name_len);
> +	ceph_databuf_encode_32(dbuf, name_len);
> +	ceph_databuf_append(dbuf, name, name_len);
>  
> -	ceph_pagelist_encode_32(pagelist, as_ctx->lsmctx.len);
> -	ceph_pagelist_append(pagelist, as_ctx->lsmctx.context,
> -			     as_ctx->lsmctx.len);
> +	ceph_databuf_encode_32(dbuf, as_ctx->lsmctx.len);
> +	ceph_databuf_append(dbuf, as_ctx->lsmctx.context, as_ctx-
> >lsmctx.len);
>  
>  	err = 0;
>  out:
> -	if (pagelist && !as_ctx->pagelist)
> -		ceph_pagelist_release(pagelist);
> +	if (dbuf && !as_ctx->dbuf)
> +		ceph_databuf_release(dbuf);
>  	return err;
>  }
>  #endif /* CONFIG_CEPH_FS_SECURITY_LABEL */
> @@ -1456,8 +1445,7 @@ void ceph_release_acl_sec_ctx(struct
> ceph_acl_sec_ctx *as_ctx)
>  #ifdef CONFIG_FS_ENCRYPTION
>  	kfree(as_ctx->fscrypt_auth);
>  #endif
> -	if (as_ctx->pagelist)
> -		ceph_pagelist_release(as_ctx->pagelist);
> +	ceph_databuf_release(as_ctx->dbuf);
>  }
>  
>  /*
> 

Thanks,
Slava.
David Howells March 17, 2025, 11:52 a.m. UTC | #2
slava@dubeyko.com wrote:

> > -		err = ceph_pagelist_reserve(pagelist, len +
> > val_size1 + 8);
> > +		err = ceph_databuf_reserve(dbuf, len + val_size1 +
> > 8,
> > +					   GFP_KERNEL);
> 
> I know that it's simple change. But this len + val_size1 + 8 looks
> confusing, anyway. What this hardcoded 8 means? :)

You tell me.  The '8' is pre-existing.

> > -	if (req->r_pagelist) {
> > -		iinfo.xattr_len = req->r_pagelist->length;
> > -		iinfo.xattr_data = req->r_pagelist->mapped_tail;
> > +	if (req->r_dbuf) {
> > +		iinfo.xattr_len = ceph_databuf_len(req->r_dbuf);
> > +		iinfo.xattr_data = kmap_ceph_databuf_page(req-
> > >r_dbuf, 0);
> 
> Possibly, it's in another patch. Have we removed req->r_pagelist from
> the structure?

See patch 20 "libceph: Remove ceph_pagelist".

It cannot be removed here as the kernel must still compile and work at this
point.

> Do we always have memory pages in ceph_databuf? How
> kmap_ceph_databuf_page() will behave if it's not memory page.

Are there other sorts of pages?

> Maybe, we need to hide kunmap_local() into something like
> kunmap_ceph_databuf_page()?

Actually, probably better to rename kmap_ceph_databuf_page() to
kmap_local_ceph_databuf().

> Maybe, it makes sense to call something like ceph_databuf_length()
> instead of low level access to dbuf->nr_bvec?

Sounds reasonable.  Better to hide the internal workings.

> > +	if (as_ctx->dbuf) {
> > +		req->r_dbuf = as_ctx->dbuf;
> > +		as_ctx->dbuf = NULL;
> 
> Maybe, we need something like swap() method? :)

I could point out that you were complaining about ceph_databuf_get() returning
a pointer than a void;-).

> > +	dbuf = ceph_databuf_req_alloc(2, 0, GFP_KERNEL);
> 
> So, do we allocate 2 items of zero length here?

You don't.  One is the bvec[] count (2) and one is that amount of memory to
preallocate (0) and attach to that bvec[].

Now, it may make sense to split the API calls to handle a number of different
scenarios, e.g.: request with just protocol, no pages; request with just
pages; request with both protocol bits and page list.

> > +	if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header),
> > GFP_KERNEL) < 0)
> > +		goto out;
> > +	if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL)
> > < 0)
> >  		goto out;
> >  
> > +	iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);
> 
> Is it correct &dbuf->bvec[1]? Why do we work with item #1? I think it
> looks confusing.

Because you have a protocol element (in dbuf->bvec[0]) and a buffer (in
dbuf->bvec[1]).

An iterator is attached to the buffer and the iterator then conveys it to
__ceph_sync_read() as the destination.

If you look a few lines further on in the patch, you can see the first
fragment being accessed:

> +	header = kmap_ceph_databuf_page(dbuf, 0);
> +

Note that, because the read buffer is very likely a whole page, I split them
into separate sections rather than trying to allocate an order-1 page as that
would be more likely to fail.

> > -		header.data_len = cpu_to_le32(8 + 8 + 4);
> > -		header.file_offset = 0;
> > +		header->data_len = cpu_to_le32(8 + 8 + 4);
> 
> The same problem of understanding here for me. What this hardcoded 8 +
> 8 + 4 value means? :)

You need to ask a ceph expert.  This is nothing specifically to do with my
changes.  However, I suspect it's the size of the message element.

> > -		memset(iov.iov_base + boff, 0, PAGE_SIZE - boff);
> > +		p = kmap_ceph_databuf_page(dbuf, 1);
> 
> Maybe, we need to introduce some constants to address #0 and #1 pages?
> Because, #0 it's header and I assume #1 is some content.

Whilst that might be useful, I don't know that the 0 and 1... being header and
content respectively always hold.  I haven't checked, but there could even be
a protocol trailer in some cases as well.

> > -	err = ceph_pagelist_reserve(pagelist,
> > -				    4 * 2 + name_len + as_ctx-
> > >lsmctx.len);
> > +	err = ceph_databuf_reserve(dbuf, 4 * 2 + name_len + as_ctx-
> > >lsmctx.len,
> > +				   GFP_KERNEL);
> 
> The 4 * 2 + name_len + as_ctx->lsmctx.len looks unclear to me. It wil
> be good to have some well defined constants here.

Again, nothing specifically to do with my changes.

David
Viacheslav Dubeyko March 20, 2025, 8:34 p.m. UTC | #3
On Mon, 2025-03-17 at 11:52 +0000, David Howells wrote:
> slava@dubeyko.com wrote:
> 
> > > -		err = ceph_pagelist_reserve(pagelist, len +
> > > val_size1 + 8);
> > > +		err = ceph_databuf_reserve(dbuf, len + val_size1 +
> > > 8,
> > > +					   GFP_KERNEL);
> > 
> > I know that it's simple change. But this len + val_size1 + 8 looks
> > confusing, anyway. What this hardcoded 8 means? :)
> 
> You tell me.  The '8' is pre-existing.
> 

Yeah, I know. I am simply thinking aloud that we need to rework the CephFS code
somehow to make it more clear and easy understandable. But it has no relations
with your change. 

> > > -	if (req->r_pagelist) {
> > > -		iinfo.xattr_len = req->r_pagelist->length;
> > > -		iinfo.xattr_data = req->r_pagelist->mapped_tail;
> > > +	if (req->r_dbuf) {
> > > +		iinfo.xattr_len = ceph_databuf_len(req->r_dbuf);
> > > +		iinfo.xattr_data = kmap_ceph_databuf_page(req-
> > > > r_dbuf, 0);
> > 
> > Possibly, it's in another patch. Have we removed req->r_pagelist from
> > the structure?
> 
> See patch 20 "libceph: Remove ceph_pagelist".
> 
> It cannot be removed here as the kernel must still compile and work at this
> point.
> 
> > Do we always have memory pages in ceph_databuf? How
> > kmap_ceph_databuf_page() will behave if it's not memory page.
> 
> Are there other sorts of pages?
> 

My point is simple. I assumed that if ceph_databuf can handle multiple types of
memory representations, then it could be not only memory pages. Potentially, CXL
memory would require some special management in the future (maybe not). :) But
if we always use regular memory pages under ceph_databuf abstraction, then I
don't see any problem here.

> > Maybe, we need to hide kunmap_local() into something like
> > kunmap_ceph_databuf_page()?
> 
> Actually, probably better to rename kmap_ceph_databuf_page() to
> kmap_local_ceph_databuf().
> 
> > Maybe, it makes sense to call something like ceph_databuf_length()
> > instead of low level access to dbuf->nr_bvec?
> 
> Sounds reasonable.  Better to hide the internal workings.
> 
> > > +	if (as_ctx->dbuf) {
> > > +		req->r_dbuf = as_ctx->dbuf;
> > > +		as_ctx->dbuf = NULL;
> > 
> > Maybe, we need something like swap() method? :)
> 
> I could point out that you were complaining about ceph_databuf_get() returning
> a pointer than a void;-).
> 
> > > +	dbuf = ceph_databuf_req_alloc(2, 0, GFP_KERNEL);
> > 
> > So, do we allocate 2 items of zero length here?
> 
> You don't.  One is the bvec[] count (2) and one is that amount of memory to
> preallocate (0) and attach to that bvec[].
> 

Aaah. I see now. Thanks.

> Now, it may make sense to split the API calls to handle a number of different
> scenarios, e.g.: request with just protocol, no pages; request with just
> pages; request with both protocol bits and page list.
> 
> > > +	if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header),
> > > GFP_KERNEL) < 0)
> > > +		goto out;
> > > +	if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL)
> > > < 0)
> > >  		goto out;
> > >  
> > > +	iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);
> > 
> > Is it correct &dbuf->bvec[1]? Why do we work with item #1? I think it
> > looks confusing.
> 
> Because you have a protocol element (in dbuf->bvec[0]) and a buffer (in
> dbuf->bvec[1]).

It sounds to me that we need to have two declarations (something like this):

#define PROTOCOL_ELEMENT_INDEX    0
#define BUFFER_INDEX              1

> 
> An iterator is attached to the buffer and the iterator then conveys it to
> __ceph_sync_read() as the destination.
> 
> If you look a few lines further on in the patch, you can see the first
> fragment being accessed:
> 
> > +	header = kmap_ceph_databuf_page(dbuf, 0);
> > +
> 
> Note that, because the read buffer is very likely a whole page, I split them
> into separate sections rather than trying to allocate an order-1 page as that
> would be more likely to fail.
> 
> > > -		header.data_len = cpu_to_le32(8 + 8 + 4);
> > > -		header.file_offset = 0;
> > > +		header->data_len = cpu_to_le32(8 + 8 + 4);
> > 
> > The same problem of understanding here for me. What this hardcoded 8 +
> > 8 + 4 value means? :)
> 
> You need to ask a ceph expert.  This is nothing specifically to do with my
> changes.  However, I suspect it's the size of the message element.
> 

Yeah, I see. :)

> > > -		memset(iov.iov_base + boff, 0, PAGE_SIZE - boff);
> > > +		p = kmap_ceph_databuf_page(dbuf, 1);
> > 
> > Maybe, we need to introduce some constants to address #0 and #1 pages?
> > Because, #0 it's header and I assume #1 is some content.
> 
> Whilst that might be useful, I don't know that the 0 and 1... being header and
> content respectively always hold.  I haven't checked, but there could even be
> a protocol trailer in some cases as well.
> 
> > > -	err = ceph_pagelist_reserve(pagelist,
> > > -				    4 * 2 + name_len + as_ctx-
> > > > lsmctx.len);
> > > +	err = ceph_databuf_reserve(dbuf, 4 * 2 + name_len + as_ctx-
> > > > lsmctx.len,
> > > +				   GFP_KERNEL);
> > 
> > The 4 * 2 + name_len + as_ctx->lsmctx.len looks unclear to me. It wil
> > be good to have some well defined constants here.
> 
> Again, nothing specifically to do with my changes.
> 

I completely agree.

Thanks,
Slava.
David Howells March 20, 2025, 10:01 p.m. UTC | #4
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote:

> > > > +	if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header),
> > > > GFP_KERNEL) < 0)
> > > > +		goto out;
> > > > +	if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL)
> > > > < 0)
> > > >  		goto out;
> > > >  
> > > > +	iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);
> > > 
> > > Is it correct &dbuf->bvec[1]? Why do we work with item #1? I think it
> > > looks confusing.
> > 
> > Because you have a protocol element (in dbuf->bvec[0]) and a buffer (in
> > dbuf->bvec[1]).
> 
> It sounds to me that we need to have two declarations (something like this):
> 
> #define PROTOCOL_ELEMENT_INDEX    0
> #define BUFFER_INDEX              1

But that's specific to this particular usage.  There may or may not be a
frag/page allocated to a protocol element and there may or may not be buffer
parts and may be multiple buffer parts.  There could even be multiple pages
allocated to protocol elements.

David
diff mbox series

Patch

diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 1564eacc253d..d6da650db83e 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -171,7 +171,7 @@  int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
 {
 	struct posix_acl *acl, *default_acl;
 	size_t val_size1 = 0, val_size2 = 0;
-	struct ceph_pagelist *pagelist = NULL;
+	struct ceph_databuf *dbuf = NULL;
 	void *tmp_buf = NULL;
 	int err;
 
@@ -201,58 +201,55 @@  int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
 	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
 	if (!tmp_buf)
 		goto out_err;
-	pagelist = ceph_pagelist_alloc(GFP_KERNEL);
-	if (!pagelist)
+	dbuf = ceph_databuf_req_alloc(1, PAGE_SIZE, GFP_KERNEL);
+	if (!dbuf)
 		goto out_err;
 
-	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
-	if (err)
-		goto out_err;
-
-	ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
+	ceph_databuf_encode_32(dbuf, acl && default_acl ? 2 : 1);
 
 	if (acl) {
 		size_t len = strlen(XATTR_NAME_POSIX_ACL_ACCESS);
-		err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
+		err = ceph_databuf_reserve(dbuf, len + val_size1 + 8,
+					   GFP_KERNEL);
 		if (err)
 			goto out_err;
-		ceph_pagelist_encode_string(pagelist, XATTR_NAME_POSIX_ACL_ACCESS,
-					    len);
+		ceph_databuf_encode_string(dbuf, XATTR_NAME_POSIX_ACL_ACCESS,
+					   len);
 		err = posix_acl_to_xattr(&init_user_ns, acl,
 					 tmp_buf, val_size1);
 		if (err < 0)
 			goto out_err;
-		ceph_pagelist_encode_32(pagelist, val_size1);
-		ceph_pagelist_append(pagelist, tmp_buf, val_size1);
+		ceph_databuf_encode_32(dbuf, val_size1);
+		ceph_databuf_append(dbuf, tmp_buf, val_size1);
 	}
 	if (default_acl) {
 		size_t len = strlen(XATTR_NAME_POSIX_ACL_DEFAULT);
-		err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
+		err = ceph_databuf_reserve(dbuf, len + val_size2 + 8,
+					   GFP_KERNEL);
 		if (err)
 			goto out_err;
-		ceph_pagelist_encode_string(pagelist,
-					  XATTR_NAME_POSIX_ACL_DEFAULT, len);
+		ceph_databuf_encode_string(dbuf,
+					   XATTR_NAME_POSIX_ACL_DEFAULT, len);
 		err = posix_acl_to_xattr(&init_user_ns, default_acl,
 					 tmp_buf, val_size2);
 		if (err < 0)
 			goto out_err;
-		ceph_pagelist_encode_32(pagelist, val_size2);
-		ceph_pagelist_append(pagelist, tmp_buf, val_size2);
+		ceph_databuf_encode_32(dbuf, val_size2);
+		ceph_databuf_append(dbuf, tmp_buf, val_size2);
 	}
 
 	kfree(tmp_buf);
 
 	as_ctx->acl = acl;
 	as_ctx->default_acl = default_acl;
-	as_ctx->pagelist = pagelist;
+	as_ctx->dbuf = dbuf;
 	return 0;
 
 out_err:
 	posix_acl_release(acl);
 	posix_acl_release(default_acl);
 	kfree(tmp_buf);
-	if (pagelist)
-		ceph_pagelist_release(pagelist);
+	ceph_databuf_release(dbuf);
 	return err;
 }
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 851d70200c6b..9de2960748b9 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -679,9 +679,9 @@  static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
 	iinfo.change_attr = 1;
 	ceph_encode_timespec64(&iinfo.btime, &now);
 
-	if (req->r_pagelist) {
-		iinfo.xattr_len = req->r_pagelist->length;
-		iinfo.xattr_data = req->r_pagelist->mapped_tail;
+	if (req->r_dbuf) {
+		iinfo.xattr_len = ceph_databuf_len(req->r_dbuf);
+		iinfo.xattr_data = kmap_ceph_databuf_page(req->r_dbuf, 0);
 	} else {
 		/* fake it */
 		iinfo.xattr_len = ARRAY_SIZE(xattr_buf);
@@ -731,6 +731,8 @@  static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
 	ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session,
 			      req->r_fmode, NULL);
 	up_read(&mdsc->snap_rwsem);
+	if (req->r_dbuf)
+		kunmap_local(iinfo.xattr_data);
 	if (ret) {
 		doutc(cl, "failed to fill inode: %d\n", ret);
 		ceph_dir_clear_complete(dir);
@@ -849,8 +851,8 @@  int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 			goto out_ctx;
 		}
 		/* Async create can't handle more than a page of xattrs */
-		if (as_ctx.pagelist &&
-		    !list_is_singular(&as_ctx.pagelist->head))
+		if (as_ctx.dbuf &&
+		    as_ctx.dbuf->nr_bvec > 1)
 			try_async = false;
 	} else if (!d_in_lookup(dentry)) {
 		/* If it's not being looked up, it's negative */
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index b060f765ad20..ec9b80fec7be 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -112,9 +112,9 @@  struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
 void ceph_as_ctx_to_req(struct ceph_mds_request *req,
 			struct ceph_acl_sec_ctx *as_ctx)
 {
-	if (as_ctx->pagelist) {
-		req->r_pagelist = as_ctx->pagelist;
-		as_ctx->pagelist = NULL;
+	if (as_ctx->dbuf) {
+		req->r_dbuf = as_ctx->dbuf;
+		as_ctx->dbuf = NULL;
 	}
 	ceph_fscrypt_as_ctx_to_req(req, as_ctx);
 }
@@ -2341,11 +2341,10 @@  static int fill_fscrypt_truncate(struct inode *inode,
 	loff_t pos, orig_pos = round_down(attr->ia_size,
 					  CEPH_FSCRYPT_BLOCK_SIZE);
 	u64 block = orig_pos >> CEPH_FSCRYPT_BLOCK_SHIFT;
-	struct ceph_pagelist *pagelist = NULL;
-	struct kvec iov = {0};
+	struct ceph_databuf *dbuf = NULL;
 	struct iov_iter iter;
-	struct page *page = NULL;
-	struct ceph_fscrypt_truncate_size_header header;
+	struct ceph_fscrypt_truncate_size_header *header;
+	void *p;
 	int retry_op = 0;
 	int len = CEPH_FSCRYPT_BLOCK_SIZE;
 	loff_t i_size = i_size_read(inode);
@@ -2372,37 +2371,35 @@  static int fill_fscrypt_truncate(struct inode *inode,
 			goto out;
 	}
 
-	page = __page_cache_alloc(GFP_KERNEL);
-	if (page == NULL) {
-		ret = -ENOMEM;
+	ret = -ENOMEM;
+	dbuf = ceph_databuf_req_alloc(2, 0, GFP_KERNEL);
+	if (!dbuf)
 		goto out;
-	}
 
-	pagelist = ceph_pagelist_alloc(GFP_KERNEL);
-	if (!pagelist) {
-		ret = -ENOMEM;
+	if (ceph_databuf_insert_frag(dbuf, 0, sizeof(*header), GFP_KERNEL) < 0)
+		goto out;
+	if (ceph_databuf_insert_frag(dbuf, 1, PAGE_SIZE, GFP_KERNEL) < 0)
 		goto out;
-	}
 
-	iov.iov_base = kmap_local_page(page);
-	iov.iov_len = len;
-	iov_iter_kvec(&iter, READ, &iov, 1, len);
+	iov_iter_bvec(&iter, ITER_DEST, &dbuf->bvec[1], 1, len);
 
 	pos = orig_pos;
 	ret = __ceph_sync_read(inode, &pos, &iter, &retry_op, &objver);
 	if (ret < 0)
 		goto out;
 
+	header = kmap_ceph_databuf_page(dbuf, 0);
+
 	/* Insert the header first */
-	header.ver = 1;
-	header.compat = 1;
-	header.change_attr = cpu_to_le64(inode_peek_iversion_raw(inode));
+	header->ver = 1;
+	header->compat = 1;
+	header->change_attr = cpu_to_le64(inode_peek_iversion_raw(inode));
 
 	/*
 	 * Always set the block_size to CEPH_FSCRYPT_BLOCK_SIZE,
 	 * because in MDS it may need this to do the truncate.
 	 */
-	header.block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);
+	header->block_size = cpu_to_le32(CEPH_FSCRYPT_BLOCK_SIZE);
 
 	/*
 	 * If we hit a hole here, we should just skip filling
@@ -2417,51 +2414,41 @@  static int fill_fscrypt_truncate(struct inode *inode,
 	if (!objver) {
 		doutc(cl, "hit hole, ppos %lld < size %lld\n", pos, i_size);
 
-		header.data_len = cpu_to_le32(8 + 8 + 4);
-		header.file_offset = 0;
+		header->data_len = cpu_to_le32(8 + 8 + 4);
+		header->file_offset = 0;
 		ret = 0;
 	} else {
-		header.data_len = cpu_to_le32(8 + 8 + 4 + CEPH_FSCRYPT_BLOCK_SIZE);
-		header.file_offset = cpu_to_le64(orig_pos);
+		header->data_len = cpu_to_le32(8 + 8 + 4 + CEPH_FSCRYPT_BLOCK_SIZE);
+		header->file_offset = cpu_to_le64(orig_pos);
 
 		doutc(cl, "encrypt block boff/bsize %d/%lu\n", boff,
 		      CEPH_FSCRYPT_BLOCK_SIZE);
 
 		/* truncate and zero out the extra contents for the last block */
-		memset(iov.iov_base + boff, 0, PAGE_SIZE - boff);
+		p = kmap_ceph_databuf_page(dbuf, 1);
+		memset(p + boff, 0, PAGE_SIZE - boff);
+		kunmap_local(p);
 
 		/* encrypt the last block */
-		ret = ceph_fscrypt_encrypt_block_inplace(inode, page,
-						    CEPH_FSCRYPT_BLOCK_SIZE,
-						    0, block,
-						    GFP_KERNEL);
+		ret = ceph_fscrypt_encrypt_block_inplace(
+			inode, ceph_databuf_page(dbuf, 1),
+			CEPH_FSCRYPT_BLOCK_SIZE, 0, block, GFP_KERNEL);
 		if (ret)
 			goto out;
 	}
 
-	/* Insert the header */
-	ret = ceph_pagelist_append(pagelist, &header, sizeof(header));
-	if (ret)
-		goto out;
+	ceph_databuf_added_data(dbuf, sizeof(*header));
+	if (header->block_size)
+		ceph_databuf_added_data(dbuf, CEPH_FSCRYPT_BLOCK_SIZE);
 
-	if (header.block_size) {
-		/* Append the last block contents to pagelist */
-		ret = ceph_pagelist_append(pagelist, iov.iov_base,
-					   CEPH_FSCRYPT_BLOCK_SIZE);
-		if (ret)
-			goto out;
-	}
-	req->r_pagelist = pagelist;
+	req->r_dbuf = dbuf;
 out:
 	doutc(cl, "%p %llx.%llx size dropping cap refs on %s\n", inode,
 	      ceph_vinop(inode), ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
-	if (iov.iov_base)
-		kunmap_local(iov.iov_base);
-	if (page)
-		__free_pages(page, 0);
-	if (ret && pagelist)
-		ceph_pagelist_release(pagelist);
+	kunmap_local(header);
+	if (ret)
+		ceph_databuf_release(dbuf);
 	return ret;
 }
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 230e0c3f341f..09661a34f287 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1125,8 +1125,7 @@  void ceph_mdsc_release_request(struct kref *kref)
 	put_cred(req->r_cred);
 	if (req->r_mnt_idmap)
 		mnt_idmap_put(req->r_mnt_idmap);
-	if (req->r_pagelist)
-		ceph_pagelist_release(req->r_pagelist);
+	ceph_databuf_release(req->r_dbuf);
 	kfree(req->r_fscrypt_auth);
 	kfree(req->r_altname);
 	put_request_session(req);
@@ -3207,10 +3206,10 @@  static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
-	if (req->r_pagelist) {
-		struct ceph_pagelist *pagelist = req->r_pagelist;
-		ceph_msg_data_add_pagelist(msg, pagelist);
-		msg->hdr.data_len = cpu_to_le32(pagelist->length);
+	if (req->r_dbuf) {
+		struct ceph_databuf *dbuf = req->r_dbuf;
+		ceph_msg_data_add_databuf(msg, dbuf);
+		msg->hdr.data_len = cpu_to_le32(ceph_databuf_len(dbuf));
 	} else {
 		msg->hdr.data_len = 0;
 	}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3e2a6fa7c19a..a7ee8da07ce7 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -333,7 +333,7 @@  struct ceph_mds_request {
 	u32 r_direct_hash;      /* choose dir frag based on this dentry hash */
 
 	/* data payload is used for xattr ops */
-	struct ceph_pagelist *r_pagelist;
+	struct ceph_databuf *r_dbuf;
 
 	/* what caps shall we drop? */
 	int r_inode_drop, r_inode_unless;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index bb0db0cc8003..984a6d2a5378 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -1137,7 +1137,7 @@  struct ceph_acl_sec_ctx {
 #ifdef CONFIG_FS_ENCRYPTION
 	struct ceph_fscrypt_auth *fscrypt_auth;
 #endif
-	struct ceph_pagelist *pagelist;
+	struct ceph_databuf *dbuf;
 };
 
 #ifdef CONFIG_SECURITY
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 537165db4519..b083cd3b3974 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -1114,17 +1114,17 @@  static int ceph_sync_setxattr(struct inode *inode, const char *name,
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	struct ceph_pagelist *pagelist = NULL;
+	struct ceph_databuf *dbuf = NULL;
 	int op = CEPH_MDS_OP_SETXATTR;
 	int err;
 
 	if (size > 0) {
-		/* copy value into pagelist */
-		pagelist = ceph_pagelist_alloc(GFP_NOFS);
-		if (!pagelist)
+		/* copy value into dbuf */
+		dbuf = ceph_databuf_req_alloc(1, size, GFP_NOFS);
+		if (!dbuf)
 			return -ENOMEM;
 
-		err = ceph_pagelist_append(pagelist, value, size);
+		err = ceph_databuf_append(dbuf, value, size);
 		if (err)
 			goto out;
 	} else if (!value) {
@@ -1154,8 +1154,8 @@  static int ceph_sync_setxattr(struct inode *inode, const char *name,
 		req->r_args.setxattr.flags = cpu_to_le32(flags);
 		req->r_args.setxattr.osdmap_epoch =
 			cpu_to_le32(osdc->osdmap->epoch);
-		req->r_pagelist = pagelist;
-		pagelist = NULL;
+		req->r_dbuf = dbuf;
+		dbuf = NULL;
 	}
 
 	req->r_inode = inode;
@@ -1169,8 +1169,7 @@  static int ceph_sync_setxattr(struct inode *inode, const char *name,
 	doutc(cl, "xattr.ver (after): %lld\n", ci->i_xattrs.version);
 
 out:
-	if (pagelist)
-		ceph_pagelist_release(pagelist);
+	ceph_databuf_release(dbuf);
 	return err;
 }
 
@@ -1377,7 +1376,7 @@  bool ceph_security_xattr_deadlock(struct inode *in)
 int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
 			   struct ceph_acl_sec_ctx *as_ctx)
 {
-	struct ceph_pagelist *pagelist = as_ctx->pagelist;
+	struct ceph_databuf *dbuf = as_ctx->dbuf;
 	const char *name;
 	size_t name_len;
 	int err;
@@ -1391,14 +1390,11 @@  int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
 	}
 
 	err = -ENOMEM;
-	if (!pagelist) {
-		pagelist = ceph_pagelist_alloc(GFP_KERNEL);
-		if (!pagelist)
+	if (!dbuf) {
+		dbuf = ceph_databuf_req_alloc(0, PAGE_SIZE, GFP_KERNEL);
+		if (!dbuf)
 			goto out;
-		err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
-		if (err)
-			goto out;
-		ceph_pagelist_encode_32(pagelist, 1);
+		ceph_databuf_encode_32(dbuf, 1);
 	}
 
 	/*
@@ -1407,38 +1403,31 @@  int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
 	 * dentry_init_security hook.
 	 */
 	name_len = strlen(name);
-	err = ceph_pagelist_reserve(pagelist,
-				    4 * 2 + name_len + as_ctx->lsmctx.len);
+	err = ceph_databuf_reserve(dbuf, 4 * 2 + name_len + as_ctx->lsmctx.len,
+				   GFP_KERNEL);
 	if (err)
 		goto out;
 
-	if (as_ctx->pagelist) {
+	if (as_ctx->dbuf) {
 		/* update count of KV pairs */
-		BUG_ON(pagelist->length <= sizeof(__le32));
-		if (list_is_singular(&pagelist->head)) {
-			le32_add_cpu((__le32*)pagelist->mapped_tail, 1);
-		} else {
-			struct page *page = list_first_entry(&pagelist->head,
-							     struct page, lru);
-			void *addr = kmap_atomic(page);
-			le32_add_cpu((__le32*)addr, 1);
-			kunmap_atomic(addr);
-		}
+		BUG_ON(ceph_databuf_len(dbuf) <= sizeof(__le32));
+		__le32 *addr = kmap_ceph_databuf_page(dbuf, 0);
+		le32_add_cpu(addr, 1);
+		kunmap_local(addr);
 	} else {
-		as_ctx->pagelist = pagelist;
+		as_ctx->dbuf = dbuf;
 	}
 
-	ceph_pagelist_encode_32(pagelist, name_len);
-	ceph_pagelist_append(pagelist, name, name_len);
+	ceph_databuf_encode_32(dbuf, name_len);
+	ceph_databuf_append(dbuf, name, name_len);
 
-	ceph_pagelist_encode_32(pagelist, as_ctx->lsmctx.len);
-	ceph_pagelist_append(pagelist, as_ctx->lsmctx.context,
-			     as_ctx->lsmctx.len);
+	ceph_databuf_encode_32(dbuf, as_ctx->lsmctx.len);
+	ceph_databuf_append(dbuf, as_ctx->lsmctx.context, as_ctx->lsmctx.len);
 
 	err = 0;
 out:
-	if (pagelist && !as_ctx->pagelist)
-		ceph_pagelist_release(pagelist);
+	if (dbuf && !as_ctx->dbuf)
+		ceph_databuf_release(dbuf);
 	return err;
 }
 #endif /* CONFIG_CEPH_FS_SECURITY_LABEL */
@@ -1456,8 +1445,7 @@  void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx)
 #ifdef CONFIG_FS_ENCRYPTION
 	kfree(as_ctx->fscrypt_auth);
 #endif
-	if (as_ctx->pagelist)
-		ceph_pagelist_release(as_ctx->pagelist);
+	ceph_databuf_release(as_ctx->dbuf);
 }
 
 /*