diff mbox series

[RFC,03/35] libceph: Add a new data container type, ceph_databuf

Message ID 20250313233341.1675324-4-dhowells@redhat.com (mailing list archive)
State New
Headers show
Series ceph, rbd, netfs: Make ceph fully use netfslib | expand

Commit Message

David Howells March 13, 2025, 11:32 p.m. UTC
Add a new ceph data container type, ceph_databuf, that can carry a list of
pages in a bvec and use an iov_iter to handle describe the data to the next
layer down.  The iterator can also be used to refer to other types, such as
ITER_FOLIOQ.

There are two ways of loading the bvec.  One way is to allocate a buffer
with space in it and then add data, expanding the space as needed; the
other is to splice in pages, expanding the bvec[] as needed.

This is intended to replace all other types.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Viacheslav Dubeyko <slava@dubeyko.com>
cc: Alex Markuze <amarkuze@redhat.com>
cc: Ilya Dryomov <idryomov@gmail.com>
cc: ceph-devel@vger.kernel.org
cc: linux-fsdevel@vger.kernel.org
---
 include/linux/ceph/databuf.h    | 131 +++++++++++++++++++++
 include/linux/ceph/messenger.h  |   6 +-
 include/linux/ceph/osd_client.h |   3 +
 net/ceph/Makefile               |   3 +-
 net/ceph/databuf.c              | 200 ++++++++++++++++++++++++++++++++
 net/ceph/messenger.c            |  20 +++-
 net/ceph/osd_client.c           |  11 +-
 7 files changed, 369 insertions(+), 5 deletions(-)
 create mode 100644 include/linux/ceph/databuf.h
 create mode 100644 net/ceph/databuf.c

Comments

Viacheslav Dubeyko March 14, 2025, 8:06 p.m. UTC | #1
On Thu, 2025-03-13 at 23:32 +0000, David Howells wrote:
> Add a new ceph data container type, ceph_databuf, that can carry a list of
> pages in a bvec and use an iov_iter to handle describe the data to the next
> layer down.  The iterator can also be used to refer to other types, such as
> ITER_FOLIOQ.
> 
> There are two ways of loading the bvec.  One way is to allocate a buffer
> with space in it and then add data, expanding the space as needed; the
> other is to splice in pages, expanding the bvec[] as needed.
> 
> This is intended to replace all other types.
> 

We definitely need to think about unit-tests or self-tests here.

> Signed-off-by: David Howells <dhowells@redhat.com>
> cc: Viacheslav Dubeyko <slava@dubeyko.com>
> cc: Alex Markuze <amarkuze@redhat.com>
> cc: Ilya Dryomov <idryomov@gmail.com>
> cc: ceph-devel@vger.kernel.org
> cc: linux-fsdevel@vger.kernel.org
> ---
>  include/linux/ceph/databuf.h    | 131 +++++++++++++++++++++
>  include/linux/ceph/messenger.h  |   6 +-
>  include/linux/ceph/osd_client.h |   3 +
>  net/ceph/Makefile               |   3 +-
>  net/ceph/databuf.c              | 200 ++++++++++++++++++++++++++++++++
>  net/ceph/messenger.c            |  20 +++-
>  net/ceph/osd_client.c           |  11 +-
>  7 files changed, 369 insertions(+), 5 deletions(-)
>  create mode 100644 include/linux/ceph/databuf.h
>  create mode 100644 net/ceph/databuf.c
> 
> diff --git a/include/linux/ceph/databuf.h b/include/linux/ceph/databuf.h
> new file mode 100644
> index 000000000000..14c7a6449467
> --- /dev/null
> +++ b/include/linux/ceph/databuf.h
> @@ -0,0 +1,131 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef __FS_CEPH_DATABUF_H
> +#define __FS_CEPH_DATABUF_H
> +
> +#include <asm/byteorder.h>
> +#include <linux/refcount.h>
> +#include <linux/blk_types.h>
> +
> +struct ceph_databuf {
> +	struct bio_vec	*bvec;		/* List of pages */

So, maybe we need to think about folios now?

> +	struct bio_vec	inline_bvec[1];	/* Inline bvecs for small buffers */
> +	struct iov_iter	iter;		/* Iterator defining occupied data */
> +	size_t		limit;		/* Maximum length before expansion required */
> +	size_t		nr_bvec;	/* Number of bvec[] that have pages */

Folios? :)

> +	size_t		max_bvec;	/* Size of bvec[] */
> +	refcount_t	refcnt;
> +	bool		put_pages;	/* T if pages in bvec[] need to be put*/

Maybe folios? :)

> +};
> +
> +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
> +					unsigned int data_source, gfp_t gfp);
> +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf);
> +void ceph_databuf_release(struct ceph_databuf *dbuf);
> +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l);

I think that declaration is important and argument names needs to be clear
enough. Short name is good but it could be confusing. Why not len instead of l?
And I am still guessing what d means. :)

> +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp);
> +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
> +			     size_t len, gfp_t gfp);
> +
> +static inline
> +struct ceph_databuf *ceph_databuf_req_alloc(size_t min_bvec, size_t space, gfp_t gfp)
> +{
> +	return ceph_databuf_alloc(min_bvec, space, ITER_SOURCE, gfp);
> +}
> +
> +static inline
> +struct ceph_databuf *ceph_databuf_reply_alloc(size_t min_bvec, size_t space, gfp_t gfp)
> +{
> +	struct ceph_databuf *dbuf;
> +
> +	dbuf = ceph_databuf_alloc(min_bvec, space, ITER_DEST, gfp);
> +	if (dbuf)
> +		iov_iter_reexpand(&dbuf->iter, space);
> +	return dbuf;
> +}
> +
> +static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf,
> +					     unsigned int ix)
> +{
> +	return dbuf->bvec[ix].bv_page;
> +}
> +
> +#define kmap_ceph_databuf_page(dbuf, ix) \
> +	kmap_local_page(ceph_databuf_page(dbuf, ix));
> +

I am still thinking that we need to base the new code on folio.

> +static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v)
> +{
> +	__le64 ev = cpu_to_le64(v);
> +	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
> +}
> +static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v)
> +{
> +	__le32 ev = cpu_to_le32(v);
> +	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
> +}
> +static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v)
> +{
> +	__le16 ev = cpu_to_le16(v);
> +	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
> +}
> +static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v)
> +{
> +	return ceph_databuf_append(dbuf, &v, 1);
> +}

Maybe, encode_8, encode_16, encode_32, encode_64? I mean reverse sequence here.

> +static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf,
> +					     const char *s, u32 len)
> +{
> +	int ret = ceph_databuf_encode_32(dbuf, len);
> +	if (ret)
> +		return ret;
> +	if (len)
> +		return ceph_databuf_append(dbuf, s, len);
> +	return 0;
> +}
> +
> +static inline size_t ceph_databuf_len(struct ceph_databuf *dbuf)
> +{
> +	return dbuf->iter.count;
> +}
> +
> +static inline void ceph_databuf_added_data(struct ceph_databuf *dbuf,
> +					   size_t len)
> +{
> +	dbuf->iter.count += len;
> +}
> +
> +static inline void ceph_databuf_reply_ready(struct ceph_databuf *reply,
> +					    size_t len)
> +{
> +	reply->iter.data_source = ITER_SOURCE;
> +	iov_iter_truncate(&reply->iter, len);
> +}
> +
> +static inline void ceph_databuf_reset_reply(struct ceph_databuf *reply)
> +{
> +	iov_iter_bvec(&reply->iter, ITER_DEST,
> +		      reply->bvec, reply->nr_bvec, reply->limit);
> +}
> +
> +static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf,
> +					    struct page *page,
> +					    unsigned int offset,
> +					    unsigned int len)
> +{
> +	BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec);
> +	bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset);
> +	dbuf->iter.count += len;
> +	dbuf->iter.nr_segs++;

Why do we assign len to dbuf->iter.count but only increment dbuf->iter.nr_segs?

> +}
> +
> +static inline void *ceph_databuf_enc_start(struct ceph_databuf *dbuf)
> +{
> +	return page_address(ceph_databuf_page(dbuf, 0)) + dbuf->iter.count;
> +}
> +
> +static inline void ceph_databuf_enc_stop(struct ceph_databuf *dbuf, void *p)
> +{
> +	dbuf->iter.count = p - page_address(ceph_databuf_page(dbuf, 0));
> +	BUG_ON(dbuf->iter.count > dbuf->limit);
> +}

The same about page...

> +
> +#endif /* __FS_CEPH_DATABUF_H */
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index db2aba32b8a0..864aad369c91 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -117,6 +117,7 @@ struct ceph_messenger {
>  
>  enum ceph_msg_data_type {
>  	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
> +	CEPH_MSG_DATA_DATABUF,	/* data source/destination is a data buffer */
>  	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
>  	CEPH_MSG_DATA_PAGELIST,	/* data source/destination is a pagelist */

So, the final replacement on databuf will be in the future?

>  #ifdef CONFIG_BLOCK
> @@ -210,7 +211,10 @@ struct ceph_bvec_iter {
>  
>  struct ceph_msg_data {
>  	enum ceph_msg_data_type		type;
> +	struct iov_iter			iter;
> +	bool				release_dbuf;
>  	union {
> +		struct ceph_databuf	*dbuf;
>  #ifdef CONFIG_BLOCK
>  		struct {
>  			struct ceph_bio_iter	bio_pos;
> @@ -225,7 +229,6 @@ struct ceph_msg_data {
>  			bool		own_pages;
>  		};
>  		struct ceph_pagelist	*pagelist;
> -		struct iov_iter		iter;
>  	};
>  };
>  
> @@ -601,6 +604,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
>  extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
>  				       unsigned long interval);
>  
> +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf);
>  void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
>  			     size_t length, size_t offset, bool own_pages);
>  extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 8fc84f389aad..b8fb5a71dd57 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -16,6 +16,7 @@
>  #include <linux/ceph/msgpool.h>
>  #include <linux/ceph/auth.h>
>  #include <linux/ceph/pagelist.h>
> +#include <linux/ceph/databuf.h>
>  
>  struct ceph_msg;
>  struct ceph_snap_context;
> @@ -103,6 +104,7 @@ struct ceph_osd {
>  
>  enum ceph_osd_data_type {
>  	CEPH_OSD_DATA_TYPE_NONE = 0,
> +	CEPH_OSD_DATA_TYPE_DATABUF,
>  	CEPH_OSD_DATA_TYPE_PAGES,
>  	CEPH_OSD_DATA_TYPE_PAGELIST,

The same question about replacement on databuf here? Is it future work?

>  #ifdef CONFIG_BLOCK
> @@ -115,6 +117,7 @@ enum ceph_osd_data_type {
>  struct ceph_osd_data {
>  	enum ceph_osd_data_type	type;
>  	union {
> +		struct ceph_databuf	*dbuf;
>  		struct {
>  			struct page	**pages;
>  			u64		length;
> diff --git a/net/ceph/Makefile b/net/ceph/Makefile
> index 8802a0c0155d..4b2e0b654e45 100644
> --- a/net/ceph/Makefile
> +++ b/net/ceph/Makefile
> @@ -15,4 +15,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
>  	auth_x.o \
>  	ceph_strings.o ceph_hash.o \
>  	pagevec.o snapshot.o string_table.o \
> -	messenger_v1.o messenger_v2.o
> +	messenger_v1.o messenger_v2.o \
> +	databuf.o
> diff --git a/net/ceph/databuf.c b/net/ceph/databuf.c
> new file mode 100644
> index 000000000000..9d108fff5a4f
> --- /dev/null
> +++ b/net/ceph/databuf.c
> @@ -0,0 +1,200 @@
> +// SPDX-License-Identifier: GPL-2.0
> +/* Data container
> + *
> + * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved.
> + * Written by David Howells (dhowells@redhat.com)
> + */
> +
> +#include <linux/export.h>
> +#include <linux/gfp.h>
> +#include <linux/slab.h>
> +#include <linux/uio.h>
> +#include <linux/pagemap.h>
> +#include <linux/highmem.h>
> +#include <linux/ceph/databuf.h>
> +
> +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
> +					unsigned int data_source, gfp_t gfp)
> +{
> +	struct ceph_databuf *dbuf;
> +	size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
> +
> +	dbuf = kzalloc(sizeof(*dbuf), gfp);
> +	if (!dbuf)
> +		return NULL;

I am guessing... Should we return error code here?

> +
> +	refcount_set(&dbuf->refcnt, 1);
> +
> +	if (min_bvec == 0 && space == 0) {
> +		/* Do nothing */
> +	} else if (min_bvec <= inl && space <= inl * PAGE_SIZE) {
> +		dbuf->bvec = dbuf->inline_bvec;
> +		dbuf->max_bvec = inl;
> +		dbuf->limit = space;
> +	} else if (min_bvec) {
> +		min_bvec = umax(min_bvec, 16);

Why 16 here? Maybe, do we need to introduce some well explained constant?

> +
> +		dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp);
> +		if (!dbuf->bvec) {
> +			kfree(dbuf);
> +			return NULL;

Ditto. Should we return error code here?

> +		}
> +
> +		dbuf->max_bvec = min_bvec;

Why do we assign min_bvec to max_bvec? I am simply slightly confused why
argument of function is named as min_bvec, but finally we are saving min_bvec
value into max_bvec.

> +	}
> +
> +	iov_iter_bvec(&dbuf->iter, data_source, dbuf->bvec, 0, 0);
> +
> +	if (space) {
> +		if (ceph_databuf_reserve(dbuf, space, gfp) < 0) {
> +			ceph_databuf_release(dbuf);
> +			return NULL;

Ditto. Should we return error code here?

> +		}
> +	}
> +	return dbuf;
> +}
> +EXPORT_SYMBOL(ceph_databuf_alloc);
> +
> +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf)

I see the point here. But do we really need to return pointer? Why not simply:

void ceph_databuf_get(struct ceph_databuf *dbuf)

> +{
> +	if (!dbuf)
> +		return NULL;
> +	refcount_inc(&dbuf->refcnt);
> +	return dbuf;
> +}
> +EXPORT_SYMBOL(ceph_databuf_get);
> +
> +void ceph_databuf_release(struct ceph_databuf *dbuf)
> +{
> +	size_t i;
> +
> +	if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt))
> +		return;
> +
> +	if (dbuf->put_pages)
> +		for (i = 0; i < dbuf->nr_bvec; i++)
> +			put_page(dbuf->bvec[i].bv_page);
> +	if (dbuf->bvec != dbuf->inline_bvec)
> +		kfree(dbuf->bvec);
> +	kfree(dbuf);
> +}
> +EXPORT_SYMBOL(ceph_databuf_release);
> +
> +/*
> + * Expand the bvec[] in the dbuf.
> + */
> +static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec,
> +			       gfp_t gfp)
> +{
> +	struct bio_vec *bvec = dbuf->bvec, *old = bvec;

I think that assigning (*old = bvec) looks confusing if we keep it on the same
line as bvec declaration and initialization. Why do not declare and not
initialize it on the next line?

> +	size_t size, max_bvec, off = dbuf->iter.bvec - old;

I think it's too much declarations on the same line. Why not:

size_t size, max_bvec;
size_t off = dbuf->iter.bvec - old;

> +	size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
> +
> +	if (req_bvec <= inl) {
> +		dbuf->bvec = dbuf->inline_bvec;
> +		dbuf->max_bvec = inl;
> +		dbuf->iter.bvec = dbuf->inline_bvec + off;
> +		return 0;
> +	}
> +
> +	max_bvec = roundup_pow_of_two(req_bvec);
> +	size = array_size(max_bvec, sizeof(struct bio_vec));
> +
> +	if (old == dbuf->inline_bvec) {
> +		bvec = kmalloc_array(max_bvec, sizeof(struct bio_vec), gfp);
> +		if (!bvec)
> +			return -ENOMEM;
> +		memcpy(bvec, old, inl);
> +	} else {
> +		bvec = krealloc(old, size, gfp);
> +		if (!bvec)
> +			return -ENOMEM;
> +	}
> +	dbuf->bvec = bvec;
> +	dbuf->max_bvec = max_bvec;
> +	dbuf->iter.bvec = bvec + off;
> +	return 0;
> +}
> +
> +/* Allocate enough pages for a dbuf to append the given amount
> + * of dbuf without allocating.
> + * Returns: 0 on success, -ENOMEM on error.
> + */
> +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space,
> +			 gfp_t gfp)
> +{
> +	struct bio_vec *bvec;
> +	size_t i, req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE);

Why not:

size_t req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE);
size_t i;


> +	int ret;
> +
> +	dbuf->put_pages = true;
> +	if (req_bvec > dbuf->max_bvec) {
> +		ret = ceph_databuf_expand(dbuf, req_bvec, gfp);
> +		if (ret < 0)
> +			return ret;
> +	}
> +
> +	bvec = dbuf->bvec;
> +	while (dbuf->nr_bvec < req_bvec) {
> +		struct page *pages[16];

Why do we hardcoded 16 here but using some well defined constant?

And, again, why not folio?

> +		size_t want = min(req_bvec, ARRAY_SIZE(pages)), got;
> +
> +		memset(pages, 0, sizeof(pages));
> +		got = alloc_pages_bulk(gfp, want, pages);
> +		if (!got)
> +			return -ENOMEM;
> +		for (i = 0; i < got; i++)

Why do we use size_t for i and got? Why not int, for example?

> +			bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i],
> +				      PAGE_SIZE, 0);
> +		dbuf->iter.nr_segs += got;
> +		dbuf->nr_bvec += got;

If I understood correctly, the ceph_databuf_append_page() uses slightly
different logic.

+	dbuf->iter.count += len;
+	dbuf->iter.nr_segs++;

But here we assign number of allocated pages to nr_segs. It is slightly
confusing. I think I am missing something here.

> +		dbuf->limit = dbuf->nr_bvec * PAGE_SIZE;
> +	}
> +
> +	return 0;
> +}
> +EXPORT_SYMBOL(ceph_databuf_reserve);
> +
> +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len)
> +{
> +	struct iov_iter temp_iter;
> +
> +	if (!len)
> +		return 0;
> +	if (dbuf->limit - dbuf->iter.count > len &&
> +	    ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0)
> +		return -ENOMEM;
> +
> +	iov_iter_bvec(&temp_iter, ITER_DEST,
> +		      dbuf->bvec, dbuf->nr_bvec, dbuf->limit);
> +	iov_iter_advance(&temp_iter, dbuf->iter.count);
> +
> +	if (copy_to_iter(buf, len, &temp_iter) != len)
> +		return -EFAULT;
> +	dbuf->iter.count += len;
> +	return 0;
> +}
> +EXPORT_SYMBOL(ceph_databuf_append);
> +
> +/*
> + * Allocate a fragment and insert it into the buffer at the specified index.
> + */
> +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
> +			     size_t len, gfp_t gfp)
> +{
> +	struct page *page;
> +

Why not folio?

> +	page = alloc_page(gfp);
> +	if (!page)
> +		return -ENOMEM;
> +
> +	bvec_set_page(&dbuf->bvec[ix], page, len, 0);
> +
> +	if (dbuf->nr_bvec == ix) {
> +		dbuf->iter.nr_segs = ix + 1;
> +		dbuf->nr_bvec = ix + 1;
> +		dbuf->iter.count += len;
> +	}
> +	return 0;
> +}
> +EXPORT_SYMBOL(ceph_databuf_insert_frag);
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 1df4291cc80b..802f0b222131 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -1872,7 +1872,9 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
>  
>  static void ceph_msg_data_destroy(struct ceph_msg_data *data)
>  {
> -	if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
> +	if (data->type == CEPH_MSG_DATA_DATABUF) {
> +		ceph_databuf_release(data->dbuf);
> +	} else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
>  		int num_pages = calc_pages_for(data->offset, data->length);
>  		ceph_release_page_vector(data->pages, num_pages);
>  	} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
> @@ -1880,6 +1882,22 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
>  	}
>  }
>  
> +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
> +{
> +	struct ceph_msg_data *data;
> +
> +	BUG_ON(!dbuf);
> +	BUG_ON(!ceph_databuf_len(dbuf));
> +
> +	data = ceph_msg_data_add(msg);
> +	data->type = CEPH_MSG_DATA_DATABUF;
> +	data->dbuf = ceph_databuf_get(dbuf);
> +	data->iter = dbuf->iter;
> +
> +	msg->data_length += ceph_databuf_len(dbuf);
> +}
> +EXPORT_SYMBOL(ceph_msg_data_add_databuf);
> +
>  void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
>  			     size_t length, size_t offset, bool own_pages)
>  {
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index e359e70ad47e..c84634264377 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -359,6 +359,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
>  	switch (osd_data->type) {
>  	case CEPH_OSD_DATA_TYPE_NONE:
>  		return 0;
> +	case CEPH_OSD_DATA_TYPE_DATABUF:
> +		return ceph_databuf_len(osd_data->dbuf);
>  	case CEPH_OSD_DATA_TYPE_PAGES:
>  		return osd_data->length;
>  	case CEPH_OSD_DATA_TYPE_PAGELIST:
> @@ -379,7 +381,9 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
>  
>  static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
>  {
> -	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
> +	if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
> +		ceph_databuf_release(osd_data->dbuf);
> +	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
>  		int num_pages;
>  
>  		num_pages = calc_pages_for((u64)osd_data->offset,
> @@ -965,7 +969,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
>  {
>  	u64 length = ceph_osd_data_length(osd_data);
>  
> -	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
> +	if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
> +		BUG_ON(!length);
> +		ceph_msg_data_add_databuf(msg, osd_data->dbuf);
> +	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
>  		BUG_ON(length > (u64) SIZE_MAX);
>  		if (length)
>  			ceph_msg_data_add_pages(msg, osd_data->pages,
> 
> 

Thanks,
Slava.
diff mbox series

Patch

diff --git a/include/linux/ceph/databuf.h b/include/linux/ceph/databuf.h
new file mode 100644
index 000000000000..14c7a6449467
--- /dev/null
+++ b/include/linux/ceph/databuf.h
@@ -0,0 +1,131 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __FS_CEPH_DATABUF_H
+#define __FS_CEPH_DATABUF_H
+
+#include <asm/byteorder.h>
+#include <linux/refcount.h>
+#include <linux/blk_types.h>
+
+struct ceph_databuf {
+	struct bio_vec	*bvec;		/* List of pages */
+	struct bio_vec	inline_bvec[1];	/* Inline bvecs for small buffers */
+	struct iov_iter	iter;		/* Iterator defining occupied data */
+	size_t		limit;		/* Maximum length before expansion required */
+	size_t		nr_bvec;	/* Number of bvec[] that have pages */
+	size_t		max_bvec;	/* Size of bvec[] */
+	refcount_t	refcnt;
+	bool		put_pages;	/* T if pages in bvec[] need to be put*/
+};
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
+					unsigned int data_source, gfp_t gfp);
+struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf);
+void ceph_databuf_release(struct ceph_databuf *dbuf);
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l);
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp);
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+			     size_t len, gfp_t gfp);
+
+static inline
+struct ceph_databuf *ceph_databuf_req_alloc(size_t min_bvec, size_t space, gfp_t gfp)
+{
+	return ceph_databuf_alloc(min_bvec, space, ITER_SOURCE, gfp);
+}
+
+static inline
+struct ceph_databuf *ceph_databuf_reply_alloc(size_t min_bvec, size_t space, gfp_t gfp)
+{
+	struct ceph_databuf *dbuf;
+
+	dbuf = ceph_databuf_alloc(min_bvec, space, ITER_DEST, gfp);
+	if (dbuf)
+		iov_iter_reexpand(&dbuf->iter, space);
+	return dbuf;
+}
+
+static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf,
+					     unsigned int ix)
+{
+	return dbuf->bvec[ix].bv_page;
+}
+
+#define kmap_ceph_databuf_page(dbuf, ix) \
+	kmap_local_page(ceph_databuf_page(dbuf, ix));
+
+static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v)
+{
+	__le64 ev = cpu_to_le64(v);
+	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v)
+{
+	__le32 ev = cpu_to_le32(v);
+	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v)
+{
+	__le16 ev = cpu_to_le16(v);
+	return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v)
+{
+	return ceph_databuf_append(dbuf, &v, 1);
+}
+static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf,
+					     const char *s, u32 len)
+{
+	int ret = ceph_databuf_encode_32(dbuf, len);
+	if (ret)
+		return ret;
+	if (len)
+		return ceph_databuf_append(dbuf, s, len);
+	return 0;
+}
+
+static inline size_t ceph_databuf_len(struct ceph_databuf *dbuf)
+{
+	return dbuf->iter.count;
+}
+
+static inline void ceph_databuf_added_data(struct ceph_databuf *dbuf,
+					   size_t len)
+{
+	dbuf->iter.count += len;
+}
+
+static inline void ceph_databuf_reply_ready(struct ceph_databuf *reply,
+					    size_t len)
+{
+	reply->iter.data_source = ITER_SOURCE;
+	iov_iter_truncate(&reply->iter, len);
+}
+
+static inline void ceph_databuf_reset_reply(struct ceph_databuf *reply)
+{
+	iov_iter_bvec(&reply->iter, ITER_DEST,
+		      reply->bvec, reply->nr_bvec, reply->limit);
+}
+
+static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf,
+					    struct page *page,
+					    unsigned int offset,
+					    unsigned int len)
+{
+	BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec);
+	bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset);
+	dbuf->iter.count += len;
+	dbuf->iter.nr_segs++;
+}
+
+static inline void *ceph_databuf_enc_start(struct ceph_databuf *dbuf)
+{
+	return page_address(ceph_databuf_page(dbuf, 0)) + dbuf->iter.count;
+}
+
+static inline void ceph_databuf_enc_stop(struct ceph_databuf *dbuf, void *p)
+{
+	dbuf->iter.count = p - page_address(ceph_databuf_page(dbuf, 0));
+	BUG_ON(dbuf->iter.count > dbuf->limit);
+}
+
+#endif /* __FS_CEPH_DATABUF_H */
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index db2aba32b8a0..864aad369c91 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -117,6 +117,7 @@  struct ceph_messenger {
 
 enum ceph_msg_data_type {
 	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
+	CEPH_MSG_DATA_DATABUF,	/* data source/destination is a data buffer */
 	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
 	CEPH_MSG_DATA_PAGELIST,	/* data source/destination is a pagelist */
 #ifdef CONFIG_BLOCK
@@ -210,7 +211,10 @@  struct ceph_bvec_iter {
 
 struct ceph_msg_data {
 	enum ceph_msg_data_type		type;
+	struct iov_iter			iter;
+	bool				release_dbuf;
 	union {
+		struct ceph_databuf	*dbuf;
 #ifdef CONFIG_BLOCK
 		struct {
 			struct ceph_bio_iter	bio_pos;
@@ -225,7 +229,6 @@  struct ceph_msg_data {
 			bool		own_pages;
 		};
 		struct ceph_pagelist	*pagelist;
-		struct iov_iter		iter;
 	};
 };
 
@@ -601,6 +604,7 @@  extern void ceph_con_keepalive(struct ceph_connection *con);
 extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
 				       unsigned long interval);
 
+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf);
 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 			     size_t length, size_t offset, bool own_pages);
 extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 8fc84f389aad..b8fb5a71dd57 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -16,6 +16,7 @@ 
 #include <linux/ceph/msgpool.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
+#include <linux/ceph/databuf.h>
 
 struct ceph_msg;
 struct ceph_snap_context;
@@ -103,6 +104,7 @@  struct ceph_osd {
 
 enum ceph_osd_data_type {
 	CEPH_OSD_DATA_TYPE_NONE = 0,
+	CEPH_OSD_DATA_TYPE_DATABUF,
 	CEPH_OSD_DATA_TYPE_PAGES,
 	CEPH_OSD_DATA_TYPE_PAGELIST,
 #ifdef CONFIG_BLOCK
@@ -115,6 +117,7 @@  enum ceph_osd_data_type {
 struct ceph_osd_data {
 	enum ceph_osd_data_type	type;
 	union {
+		struct ceph_databuf	*dbuf;
 		struct {
 			struct page	**pages;
 			u64		length;
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 8802a0c0155d..4b2e0b654e45 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -15,4 +15,5 @@  libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	auth_x.o \
 	ceph_strings.o ceph_hash.o \
 	pagevec.o snapshot.o string_table.o \
-	messenger_v1.o messenger_v2.o
+	messenger_v1.o messenger_v2.o \
+	databuf.o
diff --git a/net/ceph/databuf.c b/net/ceph/databuf.c
new file mode 100644
index 000000000000..9d108fff5a4f
--- /dev/null
+++ b/net/ceph/databuf.c
@@ -0,0 +1,200 @@ 
+// SPDX-License-Identifier: GPL-2.0
+/* Data container
+ *
+ * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#include <linux/export.h>
+#include <linux/gfp.h>
+#include <linux/slab.h>
+#include <linux/uio.h>
+#include <linux/pagemap.h>
+#include <linux/highmem.h>
+#include <linux/ceph/databuf.h>
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
+					unsigned int data_source, gfp_t gfp)
+{
+	struct ceph_databuf *dbuf;
+	size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
+
+	dbuf = kzalloc(sizeof(*dbuf), gfp);
+	if (!dbuf)
+		return NULL;
+
+	refcount_set(&dbuf->refcnt, 1);
+
+	if (min_bvec == 0 && space == 0) {
+		/* Do nothing */
+	} else if (min_bvec <= inl && space <= inl * PAGE_SIZE) {
+		dbuf->bvec = dbuf->inline_bvec;
+		dbuf->max_bvec = inl;
+		dbuf->limit = space;
+	} else if (min_bvec) {
+		min_bvec = umax(min_bvec, 16);
+
+		dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp);
+		if (!dbuf->bvec) {
+			kfree(dbuf);
+			return NULL;
+		}
+
+		dbuf->max_bvec = min_bvec;
+	}
+
+	iov_iter_bvec(&dbuf->iter, data_source, dbuf->bvec, 0, 0);
+
+	if (space) {
+		if (ceph_databuf_reserve(dbuf, space, gfp) < 0) {
+			ceph_databuf_release(dbuf);
+			return NULL;
+		}
+	}
+	return dbuf;
+}
+EXPORT_SYMBOL(ceph_databuf_alloc);
+
+struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf)
+{
+	if (!dbuf)
+		return NULL;
+	refcount_inc(&dbuf->refcnt);
+	return dbuf;
+}
+EXPORT_SYMBOL(ceph_databuf_get);
+
+void ceph_databuf_release(struct ceph_databuf *dbuf)
+{
+	size_t i;
+
+	if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt))
+		return;
+
+	if (dbuf->put_pages)
+		for (i = 0; i < dbuf->nr_bvec; i++)
+			put_page(dbuf->bvec[i].bv_page);
+	if (dbuf->bvec != dbuf->inline_bvec)
+		kfree(dbuf->bvec);
+	kfree(dbuf);
+}
+EXPORT_SYMBOL(ceph_databuf_release);
+
+/*
+ * Expand the bvec[] in the dbuf.
+ */
+static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec,
+			       gfp_t gfp)
+{
+	struct bio_vec *bvec = dbuf->bvec, *old = bvec;
+	size_t size, max_bvec, off = dbuf->iter.bvec - old;
+	size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
+
+	if (req_bvec <= inl) {
+		dbuf->bvec = dbuf->inline_bvec;
+		dbuf->max_bvec = inl;
+		dbuf->iter.bvec = dbuf->inline_bvec + off;
+		return 0;
+	}
+
+	max_bvec = roundup_pow_of_two(req_bvec);
+	size = array_size(max_bvec, sizeof(struct bio_vec));
+
+	if (old == dbuf->inline_bvec) {
+		bvec = kmalloc_array(max_bvec, sizeof(struct bio_vec), gfp);
+		if (!bvec)
+			return -ENOMEM;
+		memcpy(bvec, old, inl);
+	} else {
+		bvec = krealloc(old, size, gfp);
+		if (!bvec)
+			return -ENOMEM;
+	}
+	dbuf->bvec = bvec;
+	dbuf->max_bvec = max_bvec;
+	dbuf->iter.bvec = bvec + off;
+	return 0;
+}
+
+/* Allocate enough pages for a dbuf to append the given amount
+ * of dbuf without allocating.
+ * Returns: 0 on success, -ENOMEM on error.
+ */
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space,
+			 gfp_t gfp)
+{
+	struct bio_vec *bvec;
+	size_t i, req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE);
+	int ret;
+
+	dbuf->put_pages = true;
+	if (req_bvec > dbuf->max_bvec) {
+		ret = ceph_databuf_expand(dbuf, req_bvec, gfp);
+		if (ret < 0)
+			return ret;
+	}
+
+	bvec = dbuf->bvec;
+	while (dbuf->nr_bvec < req_bvec) {
+		struct page *pages[16];
+		size_t want = min(req_bvec, ARRAY_SIZE(pages)), got;
+
+		memset(pages, 0, sizeof(pages));
+		got = alloc_pages_bulk(gfp, want, pages);
+		if (!got)
+			return -ENOMEM;
+		for (i = 0; i < got; i++)
+			bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i],
+				      PAGE_SIZE, 0);
+		dbuf->iter.nr_segs += got;
+		dbuf->nr_bvec += got;
+		dbuf->limit = dbuf->nr_bvec * PAGE_SIZE;
+	}
+
+	return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_reserve);
+
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len)
+{
+	struct iov_iter temp_iter;
+
+	if (!len)
+		return 0;
+	if (dbuf->limit - dbuf->iter.count > len &&
+	    ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0)
+		return -ENOMEM;
+
+	iov_iter_bvec(&temp_iter, ITER_DEST,
+		      dbuf->bvec, dbuf->nr_bvec, dbuf->limit);
+	iov_iter_advance(&temp_iter, dbuf->iter.count);
+
+	if (copy_to_iter(buf, len, &temp_iter) != len)
+		return -EFAULT;
+	dbuf->iter.count += len;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_append);
+
+/*
+ * Allocate a fragment and insert it into the buffer at the specified index.
+ */
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+			     size_t len, gfp_t gfp)
+{
+	struct page *page;
+
+	page = alloc_page(gfp);
+	if (!page)
+		return -ENOMEM;
+
+	bvec_set_page(&dbuf->bvec[ix], page, len, 0);
+
+	if (dbuf->nr_bvec == ix) {
+		dbuf->iter.nr_segs = ix + 1;
+		dbuf->nr_bvec = ix + 1;
+		dbuf->iter.count += len;
+	}
+	return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_insert_frag);
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 1df4291cc80b..802f0b222131 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1872,7 +1872,9 @@  static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
 
 static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 {
-	if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
+	if (data->type == CEPH_MSG_DATA_DATABUF) {
+		ceph_databuf_release(data->dbuf);
+	} else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
 		int num_pages = calc_pages_for(data->offset, data->length);
 		ceph_release_page_vector(data->pages, num_pages);
 	} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
@@ -1880,6 +1882,22 @@  static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 	}
 }
 
+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
+{
+	struct ceph_msg_data *data;
+
+	BUG_ON(!dbuf);
+	BUG_ON(!ceph_databuf_len(dbuf));
+
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_DATABUF;
+	data->dbuf = ceph_databuf_get(dbuf);
+	data->iter = dbuf->iter;
+
+	msg->data_length += ceph_databuf_len(dbuf);
+}
+EXPORT_SYMBOL(ceph_msg_data_add_databuf);
+
 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 			     size_t length, size_t offset, bool own_pages)
 {
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index e359e70ad47e..c84634264377 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -359,6 +359,8 @@  static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 	switch (osd_data->type) {
 	case CEPH_OSD_DATA_TYPE_NONE:
 		return 0;
+	case CEPH_OSD_DATA_TYPE_DATABUF:
+		return ceph_databuf_len(osd_data->dbuf);
 	case CEPH_OSD_DATA_TYPE_PAGES:
 		return osd_data->length;
 	case CEPH_OSD_DATA_TYPE_PAGELIST:
@@ -379,7 +381,9 @@  static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 
 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
 {
-	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+	if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
+		ceph_databuf_release(osd_data->dbuf);
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
 		int num_pages;
 
 		num_pages = calc_pages_for((u64)osd_data->offset,
@@ -965,7 +969,10 @@  static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 {
 	u64 length = ceph_osd_data_length(osd_data);
 
-	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+	if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
+		BUG_ON(!length);
+		ceph_msg_data_add_databuf(msg, osd_data->dbuf);
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
 		BUG_ON(length > (u64) SIZE_MAX);
 		if (length)
 			ceph_msg_data_add_pages(msg, osd_data->pages,