Message ID | 20250313233341.1675324-4-dhowells@redhat.com (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | ceph, rbd, netfs: Make ceph fully use netfslib | expand |
On Thu, 2025-03-13 at 23:32 +0000, David Howells wrote: > Add a new ceph data container type, ceph_databuf, that can carry a list of > pages in a bvec and use an iov_iter to handle describe the data to the next > layer down. The iterator can also be used to refer to other types, such as > ITER_FOLIOQ. > > There are two ways of loading the bvec. One way is to allocate a buffer > with space in it and then add data, expanding the space as needed; the > other is to splice in pages, expanding the bvec[] as needed. > > This is intended to replace all other types. > We definitely need to think about unit-tests or self-tests here. > Signed-off-by: David Howells <dhowells@redhat.com> > cc: Viacheslav Dubeyko <slava@dubeyko.com> > cc: Alex Markuze <amarkuze@redhat.com> > cc: Ilya Dryomov <idryomov@gmail.com> > cc: ceph-devel@vger.kernel.org > cc: linux-fsdevel@vger.kernel.org > --- > include/linux/ceph/databuf.h | 131 +++++++++++++++++++++ > include/linux/ceph/messenger.h | 6 +- > include/linux/ceph/osd_client.h | 3 + > net/ceph/Makefile | 3 +- > net/ceph/databuf.c | 200 ++++++++++++++++++++++++++++++++ > net/ceph/messenger.c | 20 +++- > net/ceph/osd_client.c | 11 +- > 7 files changed, 369 insertions(+), 5 deletions(-) > create mode 100644 include/linux/ceph/databuf.h > create mode 100644 net/ceph/databuf.c > > diff --git a/include/linux/ceph/databuf.h b/include/linux/ceph/databuf.h > new file mode 100644 > index 000000000000..14c7a6449467 > --- /dev/null > +++ b/include/linux/ceph/databuf.h > @@ -0,0 +1,131 @@ > +/* SPDX-License-Identifier: GPL-2.0 */ > +#ifndef __FS_CEPH_DATABUF_H > +#define __FS_CEPH_DATABUF_H > + > +#include <asm/byteorder.h> > +#include <linux/refcount.h> > +#include <linux/blk_types.h> > + > +struct ceph_databuf { > + struct bio_vec *bvec; /* List of pages */ So, maybe we need to think about folios now? > + struct bio_vec inline_bvec[1]; /* Inline bvecs for small buffers */ > + struct iov_iter iter; /* Iterator defining occupied data */ > + size_t limit; /* Maximum length before expansion required */ > + size_t nr_bvec; /* Number of bvec[] that have pages */ Folios? :) > + size_t max_bvec; /* Size of bvec[] */ > + refcount_t refcnt; > + bool put_pages; /* T if pages in bvec[] need to be put*/ Maybe folios? :) > +}; > + > +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, > + unsigned int data_source, gfp_t gfp); > +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf); > +void ceph_databuf_release(struct ceph_databuf *dbuf); > +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l); I think that declaration is important and argument names needs to be clear enough. Short name is good but it could be confusing. Why not len instead of l? And I am still guessing what d means. :) > +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp); > +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix, > + size_t len, gfp_t gfp); > + > +static inline > +struct ceph_databuf *ceph_databuf_req_alloc(size_t min_bvec, size_t space, gfp_t gfp) > +{ > + return ceph_databuf_alloc(min_bvec, space, ITER_SOURCE, gfp); > +} > + > +static inline > +struct ceph_databuf *ceph_databuf_reply_alloc(size_t min_bvec, size_t space, gfp_t gfp) > +{ > + struct ceph_databuf *dbuf; > + > + dbuf = ceph_databuf_alloc(min_bvec, space, ITER_DEST, gfp); > + if (dbuf) > + iov_iter_reexpand(&dbuf->iter, space); > + return dbuf; > +} > + > +static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf, > + unsigned int ix) > +{ > + return dbuf->bvec[ix].bv_page; > +} > + > +#define kmap_ceph_databuf_page(dbuf, ix) \ > + kmap_local_page(ceph_databuf_page(dbuf, ix)); > + I am still thinking that we need to base the new code on folio. > +static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v) > +{ > + __le64 ev = cpu_to_le64(v); > + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); > +} > +static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v) > +{ > + __le32 ev = cpu_to_le32(v); > + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); > +} > +static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v) > +{ > + __le16 ev = cpu_to_le16(v); > + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); > +} > +static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v) > +{ > + return ceph_databuf_append(dbuf, &v, 1); > +} Maybe, encode_8, encode_16, encode_32, encode_64? I mean reverse sequence here. > +static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf, > + const char *s, u32 len) > +{ > + int ret = ceph_databuf_encode_32(dbuf, len); > + if (ret) > + return ret; > + if (len) > + return ceph_databuf_append(dbuf, s, len); > + return 0; > +} > + > +static inline size_t ceph_databuf_len(struct ceph_databuf *dbuf) > +{ > + return dbuf->iter.count; > +} > + > +static inline void ceph_databuf_added_data(struct ceph_databuf *dbuf, > + size_t len) > +{ > + dbuf->iter.count += len; > +} > + > +static inline void ceph_databuf_reply_ready(struct ceph_databuf *reply, > + size_t len) > +{ > + reply->iter.data_source = ITER_SOURCE; > + iov_iter_truncate(&reply->iter, len); > +} > + > +static inline void ceph_databuf_reset_reply(struct ceph_databuf *reply) > +{ > + iov_iter_bvec(&reply->iter, ITER_DEST, > + reply->bvec, reply->nr_bvec, reply->limit); > +} > + > +static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf, > + struct page *page, > + unsigned int offset, > + unsigned int len) > +{ > + BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec); > + bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset); > + dbuf->iter.count += len; > + dbuf->iter.nr_segs++; Why do we assign len to dbuf->iter.count but only increment dbuf->iter.nr_segs? > +} > + > +static inline void *ceph_databuf_enc_start(struct ceph_databuf *dbuf) > +{ > + return page_address(ceph_databuf_page(dbuf, 0)) + dbuf->iter.count; > +} > + > +static inline void ceph_databuf_enc_stop(struct ceph_databuf *dbuf, void *p) > +{ > + dbuf->iter.count = p - page_address(ceph_databuf_page(dbuf, 0)); > + BUG_ON(dbuf->iter.count > dbuf->limit); > +} The same about page... > + > +#endif /* __FS_CEPH_DATABUF_H */ > diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h > index db2aba32b8a0..864aad369c91 100644 > --- a/include/linux/ceph/messenger.h > +++ b/include/linux/ceph/messenger.h > @@ -117,6 +117,7 @@ struct ceph_messenger { > > enum ceph_msg_data_type { > CEPH_MSG_DATA_NONE, /* message contains no data payload */ > + CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */ > CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */ > CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */ So, the final replacement on databuf will be in the future? > #ifdef CONFIG_BLOCK > @@ -210,7 +211,10 @@ struct ceph_bvec_iter { > > struct ceph_msg_data { > enum ceph_msg_data_type type; > + struct iov_iter iter; > + bool release_dbuf; > union { > + struct ceph_databuf *dbuf; > #ifdef CONFIG_BLOCK > struct { > struct ceph_bio_iter bio_pos; > @@ -225,7 +229,6 @@ struct ceph_msg_data { > bool own_pages; > }; > struct ceph_pagelist *pagelist; > - struct iov_iter iter; > }; > }; > > @@ -601,6 +604,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con); > extern bool ceph_con_keepalive_expired(struct ceph_connection *con, > unsigned long interval); > > +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf); > void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, > size_t length, size_t offset, bool own_pages); > extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg, > diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h > index 8fc84f389aad..b8fb5a71dd57 100644 > --- a/include/linux/ceph/osd_client.h > +++ b/include/linux/ceph/osd_client.h > @@ -16,6 +16,7 @@ > #include <linux/ceph/msgpool.h> > #include <linux/ceph/auth.h> > #include <linux/ceph/pagelist.h> > +#include <linux/ceph/databuf.h> > > struct ceph_msg; > struct ceph_snap_context; > @@ -103,6 +104,7 @@ struct ceph_osd { > > enum ceph_osd_data_type { > CEPH_OSD_DATA_TYPE_NONE = 0, > + CEPH_OSD_DATA_TYPE_DATABUF, > CEPH_OSD_DATA_TYPE_PAGES, > CEPH_OSD_DATA_TYPE_PAGELIST, The same question about replacement on databuf here? Is it future work? > #ifdef CONFIG_BLOCK > @@ -115,6 +117,7 @@ enum ceph_osd_data_type { > struct ceph_osd_data { > enum ceph_osd_data_type type; > union { > + struct ceph_databuf *dbuf; > struct { > struct page **pages; > u64 length; > diff --git a/net/ceph/Makefile b/net/ceph/Makefile > index 8802a0c0155d..4b2e0b654e45 100644 > --- a/net/ceph/Makefile > +++ b/net/ceph/Makefile > @@ -15,4 +15,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ > auth_x.o \ > ceph_strings.o ceph_hash.o \ > pagevec.o snapshot.o string_table.o \ > - messenger_v1.o messenger_v2.o > + messenger_v1.o messenger_v2.o \ > + databuf.o > diff --git a/net/ceph/databuf.c b/net/ceph/databuf.c > new file mode 100644 > index 000000000000..9d108fff5a4f > --- /dev/null > +++ b/net/ceph/databuf.c > @@ -0,0 +1,200 @@ > +// SPDX-License-Identifier: GPL-2.0 > +/* Data container > + * > + * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved. > + * Written by David Howells (dhowells@redhat.com) > + */ > + > +#include <linux/export.h> > +#include <linux/gfp.h> > +#include <linux/slab.h> > +#include <linux/uio.h> > +#include <linux/pagemap.h> > +#include <linux/highmem.h> > +#include <linux/ceph/databuf.h> > + > +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, > + unsigned int data_source, gfp_t gfp) > +{ > + struct ceph_databuf *dbuf; > + size_t inl = ARRAY_SIZE(dbuf->inline_bvec); > + > + dbuf = kzalloc(sizeof(*dbuf), gfp); > + if (!dbuf) > + return NULL; I am guessing... Should we return error code here? > + > + refcount_set(&dbuf->refcnt, 1); > + > + if (min_bvec == 0 && space == 0) { > + /* Do nothing */ > + } else if (min_bvec <= inl && space <= inl * PAGE_SIZE) { > + dbuf->bvec = dbuf->inline_bvec; > + dbuf->max_bvec = inl; > + dbuf->limit = space; > + } else if (min_bvec) { > + min_bvec = umax(min_bvec, 16); Why 16 here? Maybe, do we need to introduce some well explained constant? > + > + dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp); > + if (!dbuf->bvec) { > + kfree(dbuf); > + return NULL; Ditto. Should we return error code here? > + } > + > + dbuf->max_bvec = min_bvec; Why do we assign min_bvec to max_bvec? I am simply slightly confused why argument of function is named as min_bvec, but finally we are saving min_bvec value into max_bvec. > + } > + > + iov_iter_bvec(&dbuf->iter, data_source, dbuf->bvec, 0, 0); > + > + if (space) { > + if (ceph_databuf_reserve(dbuf, space, gfp) < 0) { > + ceph_databuf_release(dbuf); > + return NULL; Ditto. Should we return error code here? > + } > + } > + return dbuf; > +} > +EXPORT_SYMBOL(ceph_databuf_alloc); > + > +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf) I see the point here. But do we really need to return pointer? Why not simply: void ceph_databuf_get(struct ceph_databuf *dbuf) > +{ > + if (!dbuf) > + return NULL; > + refcount_inc(&dbuf->refcnt); > + return dbuf; > +} > +EXPORT_SYMBOL(ceph_databuf_get); > + > +void ceph_databuf_release(struct ceph_databuf *dbuf) > +{ > + size_t i; > + > + if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt)) > + return; > + > + if (dbuf->put_pages) > + for (i = 0; i < dbuf->nr_bvec; i++) > + put_page(dbuf->bvec[i].bv_page); > + if (dbuf->bvec != dbuf->inline_bvec) > + kfree(dbuf->bvec); > + kfree(dbuf); > +} > +EXPORT_SYMBOL(ceph_databuf_release); > + > +/* > + * Expand the bvec[] in the dbuf. > + */ > +static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec, > + gfp_t gfp) > +{ > + struct bio_vec *bvec = dbuf->bvec, *old = bvec; I think that assigning (*old = bvec) looks confusing if we keep it on the same line as bvec declaration and initialization. Why do not declare and not initialize it on the next line? > + size_t size, max_bvec, off = dbuf->iter.bvec - old; I think it's too much declarations on the same line. Why not: size_t size, max_bvec; size_t off = dbuf->iter.bvec - old; > + size_t inl = ARRAY_SIZE(dbuf->inline_bvec); > + > + if (req_bvec <= inl) { > + dbuf->bvec = dbuf->inline_bvec; > + dbuf->max_bvec = inl; > + dbuf->iter.bvec = dbuf->inline_bvec + off; > + return 0; > + } > + > + max_bvec = roundup_pow_of_two(req_bvec); > + size = array_size(max_bvec, sizeof(struct bio_vec)); > + > + if (old == dbuf->inline_bvec) { > + bvec = kmalloc_array(max_bvec, sizeof(struct bio_vec), gfp); > + if (!bvec) > + return -ENOMEM; > + memcpy(bvec, old, inl); > + } else { > + bvec = krealloc(old, size, gfp); > + if (!bvec) > + return -ENOMEM; > + } > + dbuf->bvec = bvec; > + dbuf->max_bvec = max_bvec; > + dbuf->iter.bvec = bvec + off; > + return 0; > +} > + > +/* Allocate enough pages for a dbuf to append the given amount > + * of dbuf without allocating. > + * Returns: 0 on success, -ENOMEM on error. > + */ > +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space, > + gfp_t gfp) > +{ > + struct bio_vec *bvec; > + size_t i, req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE); Why not: size_t req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE); size_t i; > + int ret; > + > + dbuf->put_pages = true; > + if (req_bvec > dbuf->max_bvec) { > + ret = ceph_databuf_expand(dbuf, req_bvec, gfp); > + if (ret < 0) > + return ret; > + } > + > + bvec = dbuf->bvec; > + while (dbuf->nr_bvec < req_bvec) { > + struct page *pages[16]; Why do we hardcoded 16 here but using some well defined constant? And, again, why not folio? > + size_t want = min(req_bvec, ARRAY_SIZE(pages)), got; > + > + memset(pages, 0, sizeof(pages)); > + got = alloc_pages_bulk(gfp, want, pages); > + if (!got) > + return -ENOMEM; > + for (i = 0; i < got; i++) Why do we use size_t for i and got? Why not int, for example? > + bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i], > + PAGE_SIZE, 0); > + dbuf->iter.nr_segs += got; > + dbuf->nr_bvec += got; If I understood correctly, the ceph_databuf_append_page() uses slightly different logic. + dbuf->iter.count += len; + dbuf->iter.nr_segs++; But here we assign number of allocated pages to nr_segs. It is slightly confusing. I think I am missing something here. > + dbuf->limit = dbuf->nr_bvec * PAGE_SIZE; > + } > + > + return 0; > +} > +EXPORT_SYMBOL(ceph_databuf_reserve); > + > +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len) > +{ > + struct iov_iter temp_iter; > + > + if (!len) > + return 0; > + if (dbuf->limit - dbuf->iter.count > len && > + ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0) > + return -ENOMEM; > + > + iov_iter_bvec(&temp_iter, ITER_DEST, > + dbuf->bvec, dbuf->nr_bvec, dbuf->limit); > + iov_iter_advance(&temp_iter, dbuf->iter.count); > + > + if (copy_to_iter(buf, len, &temp_iter) != len) > + return -EFAULT; > + dbuf->iter.count += len; > + return 0; > +} > +EXPORT_SYMBOL(ceph_databuf_append); > + > +/* > + * Allocate a fragment and insert it into the buffer at the specified index. > + */ > +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix, > + size_t len, gfp_t gfp) > +{ > + struct page *page; > + Why not folio? > + page = alloc_page(gfp); > + if (!page) > + return -ENOMEM; > + > + bvec_set_page(&dbuf->bvec[ix], page, len, 0); > + > + if (dbuf->nr_bvec == ix) { > + dbuf->iter.nr_segs = ix + 1; > + dbuf->nr_bvec = ix + 1; > + dbuf->iter.count += len; > + } > + return 0; > +} > +EXPORT_SYMBOL(ceph_databuf_insert_frag); > diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c > index 1df4291cc80b..802f0b222131 100644 > --- a/net/ceph/messenger.c > +++ b/net/ceph/messenger.c > @@ -1872,7 +1872,9 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) > > static void ceph_msg_data_destroy(struct ceph_msg_data *data) > { > - if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { > + if (data->type == CEPH_MSG_DATA_DATABUF) { > + ceph_databuf_release(data->dbuf); > + } else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { > int num_pages = calc_pages_for(data->offset, data->length); > ceph_release_page_vector(data->pages, num_pages); > } else if (data->type == CEPH_MSG_DATA_PAGELIST) { > @@ -1880,6 +1882,22 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data) > } > } > > +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf) > +{ > + struct ceph_msg_data *data; > + > + BUG_ON(!dbuf); > + BUG_ON(!ceph_databuf_len(dbuf)); > + > + data = ceph_msg_data_add(msg); > + data->type = CEPH_MSG_DATA_DATABUF; > + data->dbuf = ceph_databuf_get(dbuf); > + data->iter = dbuf->iter; > + > + msg->data_length += ceph_databuf_len(dbuf); > +} > +EXPORT_SYMBOL(ceph_msg_data_add_databuf); > + > void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, > size_t length, size_t offset, bool own_pages) > { > diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c > index e359e70ad47e..c84634264377 100644 > --- a/net/ceph/osd_client.c > +++ b/net/ceph/osd_client.c > @@ -359,6 +359,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) > switch (osd_data->type) { > case CEPH_OSD_DATA_TYPE_NONE: > return 0; > + case CEPH_OSD_DATA_TYPE_DATABUF: > + return ceph_databuf_len(osd_data->dbuf); > case CEPH_OSD_DATA_TYPE_PAGES: > return osd_data->length; > case CEPH_OSD_DATA_TYPE_PAGELIST: > @@ -379,7 +381,9 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) > > static void ceph_osd_data_release(struct ceph_osd_data *osd_data) > { > - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { > + if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) { > + ceph_databuf_release(osd_data->dbuf); > + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { > int num_pages; > > num_pages = calc_pages_for((u64)osd_data->offset, > @@ -965,7 +969,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, > { > u64 length = ceph_osd_data_length(osd_data); > > - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { > + if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) { > + BUG_ON(!length); > + ceph_msg_data_add_databuf(msg, osd_data->dbuf); > + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { > BUG_ON(length > (u64) SIZE_MAX); > if (length) > ceph_msg_data_add_pages(msg, osd_data->pages, > > Thanks, Slava.
Viacheslav Dubeyko <Slava.Dubeyko@ibm.com> wrote: > > +struct ceph_databuf { > > + struct bio_vec *bvec; /* List of pages */ > > So, maybe we need to think about folios now? Yeah, I know... but struct bio_vec has a page pointer and may point to non-folio type pages. This stuff is still undergoing evolution as Willy works on reducing struct page. What I'm pondering is changing struct folio_queue to take a list of { folio, offset, len } rather than using a folio_batch with a simple list of folios. It doesn't necessarily help with DIO, though, but there we're given an iterator we're required to use. One of the things I'd like to look at for ceph as well is using the page frag allocator[*] to get small pieces of memory for stashing protocol data in rather than allocating full-page buffers. [*] Memory allocated from the page frag allocator can be used with MSG_SPLICE_PAGES as its lifetime is controlled by the refcount. Now, we could probably have a page frag allocator that uses folios rather than non-folio pages for network filesystem use. That could be of use to afs and cifs also. As I mentioned, in a previous reply, how to better integrate folioq/bvec is hopefully up for discussion at LSF/MM next week. > > +static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf, > > + struct page *page, > > + unsigned int offset, > > + unsigned int len) > > +{ > > + BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec); > > + bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset); > > + dbuf->iter.count += len; > > + dbuf->iter.nr_segs++; > > Why do we assign len to dbuf->iter.count but only increment > dbuf->iter.nr_segs? Um, because it doesn't? It adds len to dbuf->iter.count. > > enum ceph_msg_data_type { > > CEPH_MSG_DATA_NONE, /* message contains no data payload */ > > + CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */ > > CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */ > > CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */ > > So, the final replacement on databuf will be in the future? The result of each patch has to compile and work, right? But yes, various of the patches in this series reduce the use of those other data types. I have patches in progress to finally remove PAGES and PAGELIST, but they're not quite compiling yet. > > + dbuf = kzalloc(sizeof(*dbuf), gfp); > > + if (!dbuf) > > + return NULL; > > I am guessing... Should we return error code here? The only error this function can return is ENOMEM, so it just returns NULL like many other alloc functions. > > + } else if (min_bvec) { > > + min_bvec = umax(min_bvec, 16); > > Why 16 here? Maybe, do we need to introduce some well explained constant? Fair point. > > + dbuf->max_bvec = min_bvec; > > Why do we assign min_bvec to max_bvec? I am simply slightly confused why > argument of function is named as min_bvec, but finally we are saving min_bvec > value into max_bvec. The 'min_bvec' argument is the minimum number of bvecs that the caller needs to be allocated. This may get rounded up to include all of the piece of memory we're going to be given by the slab. 'dbuf->max_bvec' is the maximum number of entries that can be used in dbuf->bvec[] and is a property of the databuf object. > > +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf) > > I see the point here. But do we really need to return pointer? Why not simply: > > void ceph_databuf_get(struct ceph_databuf *dbuf) It means I can do: foo->databuf = ceph_databuf_get(dbuf); rather than: ceph_databuf_get(dbuf); foo->databuf = dbuf; > > +static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec, > > + gfp_t gfp) > > +{ > > + struct bio_vec *bvec = dbuf->bvec, *old = bvec; > > I think that assigning (*old = bvec) looks confusing if we keep it on the same > line as bvec declaration and initialization. Why do not declare and not > initialize it on the next line? > > > + size_t size, max_bvec, off = dbuf->iter.bvec - old; > > I think it's too much declarations on the same line. Why not: > > size_t size, max_bvec; > size_t off = dbuf->iter.bvec - old; A matter of personal preference, I guess. > > + bvec = dbuf->bvec; > > + while (dbuf->nr_bvec < req_bvec) { > > + struct page *pages[16]; > > Why do we hardcoded 16 here but using some well defined constant? Because this is only about stack usage. alloc_pages_bulk() gets an straight array of page*; we have a bvec[], so we need an intermediate. Now, I could actually just overlay the array over the tail of the bvec[] and do a single bulk allocation since sizeof(struct page*) > sizeof(struct bio_vec). > And, again, why not folio? I don't think there's a bulk folio allocator. Quite possibly there *should* be so that readahead can use it - one that allocates different sizes of folios to fill the space required. > > + size_t want = min(req_bvec, ARRAY_SIZE(pages)), got; > > + > > + memset(pages, 0, sizeof(pages)); > > + got = alloc_pages_bulk(gfp, want, pages); > > + if (!got) > > + return -ENOMEM; > > + for (i = 0; i < got; i++) > > Why do we use size_t for i and got? Why not int, for example? alloc_pages_bulk() doesn't return an int. Now, one could legitimately argue that I should use "unsigned long" rather than "size_t", but I wouldn't use int here. int is smaller and signed. Granted, it's unlikely we'll be asked >2G pages, but if we're going to assign it down to an int, it probably needs to be checked first. > > + bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i], > > + PAGE_SIZE, 0); > > + dbuf->iter.nr_segs += got; > > + dbuf->nr_bvec += got; > > If I understood correctly, the ceph_databuf_append_page() uses slightly > different logic. Can you elaborate? > + dbuf->iter.count += len; > + dbuf->iter.nr_segs++; > > But here we assign number of allocated pages to nr_segs. It is slightly > confusing. I think I am missing something here. Um - it's an incremement? I think part of the problem might be that we're mixing two things within the same container: Partial pages that get kmapped and accessed directly (e.g. protocol bits) and pages that get accessed indirectly (e.g. data buffers). Maybe this needs to be made more explicit in the API. David
diff --git a/include/linux/ceph/databuf.h b/include/linux/ceph/databuf.h new file mode 100644 index 000000000000..14c7a6449467 --- /dev/null +++ b/include/linux/ceph/databuf.h @@ -0,0 +1,131 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef __FS_CEPH_DATABUF_H +#define __FS_CEPH_DATABUF_H + +#include <asm/byteorder.h> +#include <linux/refcount.h> +#include <linux/blk_types.h> + +struct ceph_databuf { + struct bio_vec *bvec; /* List of pages */ + struct bio_vec inline_bvec[1]; /* Inline bvecs for small buffers */ + struct iov_iter iter; /* Iterator defining occupied data */ + size_t limit; /* Maximum length before expansion required */ + size_t nr_bvec; /* Number of bvec[] that have pages */ + size_t max_bvec; /* Size of bvec[] */ + refcount_t refcnt; + bool put_pages; /* T if pages in bvec[] need to be put*/ +}; + +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, + unsigned int data_source, gfp_t gfp); +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf); +void ceph_databuf_release(struct ceph_databuf *dbuf); +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l); +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp); +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix, + size_t len, gfp_t gfp); + +static inline +struct ceph_databuf *ceph_databuf_req_alloc(size_t min_bvec, size_t space, gfp_t gfp) +{ + return ceph_databuf_alloc(min_bvec, space, ITER_SOURCE, gfp); +} + +static inline +struct ceph_databuf *ceph_databuf_reply_alloc(size_t min_bvec, size_t space, gfp_t gfp) +{ + struct ceph_databuf *dbuf; + + dbuf = ceph_databuf_alloc(min_bvec, space, ITER_DEST, gfp); + if (dbuf) + iov_iter_reexpand(&dbuf->iter, space); + return dbuf; +} + +static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf, + unsigned int ix) +{ + return dbuf->bvec[ix].bv_page; +} + +#define kmap_ceph_databuf_page(dbuf, ix) \ + kmap_local_page(ceph_databuf_page(dbuf, ix)); + +static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v) +{ + __le64 ev = cpu_to_le64(v); + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); +} +static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v) +{ + __le32 ev = cpu_to_le32(v); + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); +} +static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v) +{ + __le16 ev = cpu_to_le16(v); + return ceph_databuf_append(dbuf, &ev, sizeof(ev)); +} +static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v) +{ + return ceph_databuf_append(dbuf, &v, 1); +} +static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf, + const char *s, u32 len) +{ + int ret = ceph_databuf_encode_32(dbuf, len); + if (ret) + return ret; + if (len) + return ceph_databuf_append(dbuf, s, len); + return 0; +} + +static inline size_t ceph_databuf_len(struct ceph_databuf *dbuf) +{ + return dbuf->iter.count; +} + +static inline void ceph_databuf_added_data(struct ceph_databuf *dbuf, + size_t len) +{ + dbuf->iter.count += len; +} + +static inline void ceph_databuf_reply_ready(struct ceph_databuf *reply, + size_t len) +{ + reply->iter.data_source = ITER_SOURCE; + iov_iter_truncate(&reply->iter, len); +} + +static inline void ceph_databuf_reset_reply(struct ceph_databuf *reply) +{ + iov_iter_bvec(&reply->iter, ITER_DEST, + reply->bvec, reply->nr_bvec, reply->limit); +} + +static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf, + struct page *page, + unsigned int offset, + unsigned int len) +{ + BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec); + bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset); + dbuf->iter.count += len; + dbuf->iter.nr_segs++; +} + +static inline void *ceph_databuf_enc_start(struct ceph_databuf *dbuf) +{ + return page_address(ceph_databuf_page(dbuf, 0)) + dbuf->iter.count; +} + +static inline void ceph_databuf_enc_stop(struct ceph_databuf *dbuf, void *p) +{ + dbuf->iter.count = p - page_address(ceph_databuf_page(dbuf, 0)); + BUG_ON(dbuf->iter.count > dbuf->limit); +} + +#endif /* __FS_CEPH_DATABUF_H */ diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index db2aba32b8a0..864aad369c91 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -117,6 +117,7 @@ struct ceph_messenger { enum ceph_msg_data_type { CEPH_MSG_DATA_NONE, /* message contains no data payload */ + CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */ CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */ CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */ #ifdef CONFIG_BLOCK @@ -210,7 +211,10 @@ struct ceph_bvec_iter { struct ceph_msg_data { enum ceph_msg_data_type type; + struct iov_iter iter; + bool release_dbuf; union { + struct ceph_databuf *dbuf; #ifdef CONFIG_BLOCK struct { struct ceph_bio_iter bio_pos; @@ -225,7 +229,6 @@ struct ceph_msg_data { bool own_pages; }; struct ceph_pagelist *pagelist; - struct iov_iter iter; }; }; @@ -601,6 +604,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con); extern bool ceph_con_keepalive_expired(struct ceph_connection *con, unsigned long interval); +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf); void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, size_t length, size_t offset, bool own_pages); extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8fc84f389aad..b8fb5a71dd57 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -16,6 +16,7 @@ #include <linux/ceph/msgpool.h> #include <linux/ceph/auth.h> #include <linux/ceph/pagelist.h> +#include <linux/ceph/databuf.h> struct ceph_msg; struct ceph_snap_context; @@ -103,6 +104,7 @@ struct ceph_osd { enum ceph_osd_data_type { CEPH_OSD_DATA_TYPE_NONE = 0, + CEPH_OSD_DATA_TYPE_DATABUF, CEPH_OSD_DATA_TYPE_PAGES, CEPH_OSD_DATA_TYPE_PAGELIST, #ifdef CONFIG_BLOCK @@ -115,6 +117,7 @@ enum ceph_osd_data_type { struct ceph_osd_data { enum ceph_osd_data_type type; union { + struct ceph_databuf *dbuf; struct { struct page **pages; u64 length; diff --git a/net/ceph/Makefile b/net/ceph/Makefile index 8802a0c0155d..4b2e0b654e45 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -15,4 +15,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ auth_x.o \ ceph_strings.o ceph_hash.o \ pagevec.o snapshot.o string_table.o \ - messenger_v1.o messenger_v2.o + messenger_v1.o messenger_v2.o \ + databuf.o diff --git a/net/ceph/databuf.c b/net/ceph/databuf.c new file mode 100644 index 000000000000..9d108fff5a4f --- /dev/null +++ b/net/ceph/databuf.c @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: GPL-2.0 +/* Data container + * + * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + */ + +#include <linux/export.h> +#include <linux/gfp.h> +#include <linux/slab.h> +#include <linux/uio.h> +#include <linux/pagemap.h> +#include <linux/highmem.h> +#include <linux/ceph/databuf.h> + +struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space, + unsigned int data_source, gfp_t gfp) +{ + struct ceph_databuf *dbuf; + size_t inl = ARRAY_SIZE(dbuf->inline_bvec); + + dbuf = kzalloc(sizeof(*dbuf), gfp); + if (!dbuf) + return NULL; + + refcount_set(&dbuf->refcnt, 1); + + if (min_bvec == 0 && space == 0) { + /* Do nothing */ + } else if (min_bvec <= inl && space <= inl * PAGE_SIZE) { + dbuf->bvec = dbuf->inline_bvec; + dbuf->max_bvec = inl; + dbuf->limit = space; + } else if (min_bvec) { + min_bvec = umax(min_bvec, 16); + + dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp); + if (!dbuf->bvec) { + kfree(dbuf); + return NULL; + } + + dbuf->max_bvec = min_bvec; + } + + iov_iter_bvec(&dbuf->iter, data_source, dbuf->bvec, 0, 0); + + if (space) { + if (ceph_databuf_reserve(dbuf, space, gfp) < 0) { + ceph_databuf_release(dbuf); + return NULL; + } + } + return dbuf; +} +EXPORT_SYMBOL(ceph_databuf_alloc); + +struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf) +{ + if (!dbuf) + return NULL; + refcount_inc(&dbuf->refcnt); + return dbuf; +} +EXPORT_SYMBOL(ceph_databuf_get); + +void ceph_databuf_release(struct ceph_databuf *dbuf) +{ + size_t i; + + if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt)) + return; + + if (dbuf->put_pages) + for (i = 0; i < dbuf->nr_bvec; i++) + put_page(dbuf->bvec[i].bv_page); + if (dbuf->bvec != dbuf->inline_bvec) + kfree(dbuf->bvec); + kfree(dbuf); +} +EXPORT_SYMBOL(ceph_databuf_release); + +/* + * Expand the bvec[] in the dbuf. + */ +static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec, + gfp_t gfp) +{ + struct bio_vec *bvec = dbuf->bvec, *old = bvec; + size_t size, max_bvec, off = dbuf->iter.bvec - old; + size_t inl = ARRAY_SIZE(dbuf->inline_bvec); + + if (req_bvec <= inl) { + dbuf->bvec = dbuf->inline_bvec; + dbuf->max_bvec = inl; + dbuf->iter.bvec = dbuf->inline_bvec + off; + return 0; + } + + max_bvec = roundup_pow_of_two(req_bvec); + size = array_size(max_bvec, sizeof(struct bio_vec)); + + if (old == dbuf->inline_bvec) { + bvec = kmalloc_array(max_bvec, sizeof(struct bio_vec), gfp); + if (!bvec) + return -ENOMEM; + memcpy(bvec, old, inl); + } else { + bvec = krealloc(old, size, gfp); + if (!bvec) + return -ENOMEM; + } + dbuf->bvec = bvec; + dbuf->max_bvec = max_bvec; + dbuf->iter.bvec = bvec + off; + return 0; +} + +/* Allocate enough pages for a dbuf to append the given amount + * of dbuf without allocating. + * Returns: 0 on success, -ENOMEM on error. + */ +int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space, + gfp_t gfp) +{ + struct bio_vec *bvec; + size_t i, req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE); + int ret; + + dbuf->put_pages = true; + if (req_bvec > dbuf->max_bvec) { + ret = ceph_databuf_expand(dbuf, req_bvec, gfp); + if (ret < 0) + return ret; + } + + bvec = dbuf->bvec; + while (dbuf->nr_bvec < req_bvec) { + struct page *pages[16]; + size_t want = min(req_bvec, ARRAY_SIZE(pages)), got; + + memset(pages, 0, sizeof(pages)); + got = alloc_pages_bulk(gfp, want, pages); + if (!got) + return -ENOMEM; + for (i = 0; i < got; i++) + bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i], + PAGE_SIZE, 0); + dbuf->iter.nr_segs += got; + dbuf->nr_bvec += got; + dbuf->limit = dbuf->nr_bvec * PAGE_SIZE; + } + + return 0; +} +EXPORT_SYMBOL(ceph_databuf_reserve); + +int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len) +{ + struct iov_iter temp_iter; + + if (!len) + return 0; + if (dbuf->limit - dbuf->iter.count > len && + ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0) + return -ENOMEM; + + iov_iter_bvec(&temp_iter, ITER_DEST, + dbuf->bvec, dbuf->nr_bvec, dbuf->limit); + iov_iter_advance(&temp_iter, dbuf->iter.count); + + if (copy_to_iter(buf, len, &temp_iter) != len) + return -EFAULT; + dbuf->iter.count += len; + return 0; +} +EXPORT_SYMBOL(ceph_databuf_append); + +/* + * Allocate a fragment and insert it into the buffer at the specified index. + */ +int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix, + size_t len, gfp_t gfp) +{ + struct page *page; + + page = alloc_page(gfp); + if (!page) + return -ENOMEM; + + bvec_set_page(&dbuf->bvec[ix], page, len, 0); + + if (dbuf->nr_bvec == ix) { + dbuf->iter.nr_segs = ix + 1; + dbuf->nr_bvec = ix + 1; + dbuf->iter.count += len; + } + return 0; +} +EXPORT_SYMBOL(ceph_databuf_insert_frag); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 1df4291cc80b..802f0b222131 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1872,7 +1872,9 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) static void ceph_msg_data_destroy(struct ceph_msg_data *data) { - if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { + if (data->type == CEPH_MSG_DATA_DATABUF) { + ceph_databuf_release(data->dbuf); + } else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { int num_pages = calc_pages_for(data->offset, data->length); ceph_release_page_vector(data->pages, num_pages); } else if (data->type == CEPH_MSG_DATA_PAGELIST) { @@ -1880,6 +1882,22 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data) } } +void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf) +{ + struct ceph_msg_data *data; + + BUG_ON(!dbuf); + BUG_ON(!ceph_databuf_len(dbuf)); + + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_DATABUF; + data->dbuf = ceph_databuf_get(dbuf); + data->iter = dbuf->iter; + + msg->data_length += ceph_databuf_len(dbuf); +} +EXPORT_SYMBOL(ceph_msg_data_add_databuf); + void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, size_t length, size_t offset, bool own_pages) { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e359e70ad47e..c84634264377 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -359,6 +359,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) switch (osd_data->type) { case CEPH_OSD_DATA_TYPE_NONE: return 0; + case CEPH_OSD_DATA_TYPE_DATABUF: + return ceph_databuf_len(osd_data->dbuf); case CEPH_OSD_DATA_TYPE_PAGES: return osd_data->length; case CEPH_OSD_DATA_TYPE_PAGELIST: @@ -379,7 +381,9 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) static void ceph_osd_data_release(struct ceph_osd_data *osd_data) { - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { + if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) { + ceph_databuf_release(osd_data->dbuf); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { int num_pages; num_pages = calc_pages_for((u64)osd_data->offset, @@ -965,7 +969,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, { u64 length = ceph_osd_data_length(osd_data); - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { + if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) { + BUG_ON(!length); + ceph_msg_data_add_databuf(msg, osd_data->dbuf); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { BUG_ON(length > (u64) SIZE_MAX); if (length) ceph_msg_data_add_pages(msg, osd_data->pages,
Add a new ceph data container type, ceph_databuf, that can carry a list of pages in a bvec and use an iov_iter to handle describe the data to the next layer down. The iterator can also be used to refer to other types, such as ITER_FOLIOQ. There are two ways of loading the bvec. One way is to allocate a buffer with space in it and then add data, expanding the space as needed; the other is to splice in pages, expanding the bvec[] as needed. This is intended to replace all other types. Signed-off-by: David Howells <dhowells@redhat.com> cc: Viacheslav Dubeyko <slava@dubeyko.com> cc: Alex Markuze <amarkuze@redhat.com> cc: Ilya Dryomov <idryomov@gmail.com> cc: ceph-devel@vger.kernel.org cc: linux-fsdevel@vger.kernel.org --- include/linux/ceph/databuf.h | 131 +++++++++++++++++++++ include/linux/ceph/messenger.h | 6 +- include/linux/ceph/osd_client.h | 3 + net/ceph/Makefile | 3 +- net/ceph/databuf.c | 200 ++++++++++++++++++++++++++++++++ net/ceph/messenger.c | 20 +++- net/ceph/osd_client.c | 11 +- 7 files changed, 369 insertions(+), 5 deletions(-) create mode 100644 include/linux/ceph/databuf.h create mode 100644 net/ceph/databuf.c