new file mode 100644
@@ -0,0 +1,131 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __FS_CEPH_DATABUF_H
+#define __FS_CEPH_DATABUF_H
+
+#include <asm/byteorder.h>
+#include <linux/refcount.h>
+#include <linux/blk_types.h>
+
+struct ceph_databuf {
+ struct bio_vec *bvec; /* List of pages */
+ struct bio_vec inline_bvec[1]; /* Inline bvecs for small buffers */
+ struct iov_iter iter; /* Iterator defining occupied data */
+ size_t limit; /* Maximum length before expansion required */
+ size_t nr_bvec; /* Number of bvec[] that have pages */
+ size_t max_bvec; /* Size of bvec[] */
+ refcount_t refcnt;
+ bool put_pages; /* T if pages in bvec[] need to be put*/
+};
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
+ unsigned int data_source, gfp_t gfp);
+struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf);
+void ceph_databuf_release(struct ceph_databuf *dbuf);
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *d, size_t l);
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t space, gfp_t gfp);
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+ size_t len, gfp_t gfp);
+
+static inline
+struct ceph_databuf *ceph_databuf_req_alloc(size_t min_bvec, size_t space, gfp_t gfp)
+{
+ return ceph_databuf_alloc(min_bvec, space, ITER_SOURCE, gfp);
+}
+
+static inline
+struct ceph_databuf *ceph_databuf_reply_alloc(size_t min_bvec, size_t space, gfp_t gfp)
+{
+ struct ceph_databuf *dbuf;
+
+ dbuf = ceph_databuf_alloc(min_bvec, space, ITER_DEST, gfp);
+ if (dbuf)
+ iov_iter_reexpand(&dbuf->iter, space);
+ return dbuf;
+}
+
+static inline struct page *ceph_databuf_page(struct ceph_databuf *dbuf,
+ unsigned int ix)
+{
+ return dbuf->bvec[ix].bv_page;
+}
+
+#define kmap_ceph_databuf_page(dbuf, ix) \
+ kmap_local_page(ceph_databuf_page(dbuf, ix));
+
+static inline int ceph_databuf_encode_64(struct ceph_databuf *dbuf, u64 v)
+{
+ __le64 ev = cpu_to_le64(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_32(struct ceph_databuf *dbuf, u32 v)
+{
+ __le32 ev = cpu_to_le32(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_16(struct ceph_databuf *dbuf, u16 v)
+{
+ __le16 ev = cpu_to_le16(v);
+ return ceph_databuf_append(dbuf, &ev, sizeof(ev));
+}
+static inline int ceph_databuf_encode_8(struct ceph_databuf *dbuf, u8 v)
+{
+ return ceph_databuf_append(dbuf, &v, 1);
+}
+static inline int ceph_databuf_encode_string(struct ceph_databuf *dbuf,
+ const char *s, u32 len)
+{
+ int ret = ceph_databuf_encode_32(dbuf, len);
+ if (ret)
+ return ret;
+ if (len)
+ return ceph_databuf_append(dbuf, s, len);
+ return 0;
+}
+
+static inline size_t ceph_databuf_len(struct ceph_databuf *dbuf)
+{
+ return dbuf->iter.count;
+}
+
+static inline void ceph_databuf_added_data(struct ceph_databuf *dbuf,
+ size_t len)
+{
+ dbuf->iter.count += len;
+}
+
+static inline void ceph_databuf_reply_ready(struct ceph_databuf *reply,
+ size_t len)
+{
+ reply->iter.data_source = ITER_SOURCE;
+ iov_iter_truncate(&reply->iter, len);
+}
+
+static inline void ceph_databuf_reset_reply(struct ceph_databuf *reply)
+{
+ iov_iter_bvec(&reply->iter, ITER_DEST,
+ reply->bvec, reply->nr_bvec, reply->limit);
+}
+
+static inline void ceph_databuf_append_page(struct ceph_databuf *dbuf,
+ struct page *page,
+ unsigned int offset,
+ unsigned int len)
+{
+ BUG_ON(dbuf->nr_bvec >= dbuf->max_bvec);
+ bvec_set_page(&dbuf->bvec[dbuf->nr_bvec++], page, len, offset);
+ dbuf->iter.count += len;
+ dbuf->iter.nr_segs++;
+}
+
+static inline void *ceph_databuf_enc_start(struct ceph_databuf *dbuf)
+{
+ return page_address(ceph_databuf_page(dbuf, 0)) + dbuf->iter.count;
+}
+
+static inline void ceph_databuf_enc_stop(struct ceph_databuf *dbuf, void *p)
+{
+ dbuf->iter.count = p - page_address(ceph_databuf_page(dbuf, 0));
+ BUG_ON(dbuf->iter.count > dbuf->limit);
+}
+
+#endif /* __FS_CEPH_DATABUF_H */
@@ -117,6 +117,7 @@ struct ceph_messenger {
enum ceph_msg_data_type {
CEPH_MSG_DATA_NONE, /* message contains no data payload */
+ CEPH_MSG_DATA_DATABUF, /* data source/destination is a data buffer */
CEPH_MSG_DATA_PAGES, /* data source/destination is a page array */
CEPH_MSG_DATA_PAGELIST, /* data source/destination is a pagelist */
#ifdef CONFIG_BLOCK
@@ -210,7 +211,10 @@ struct ceph_bvec_iter {
struct ceph_msg_data {
enum ceph_msg_data_type type;
+ struct iov_iter iter;
+ bool release_dbuf;
union {
+ struct ceph_databuf *dbuf;
#ifdef CONFIG_BLOCK
struct {
struct ceph_bio_iter bio_pos;
@@ -225,7 +229,6 @@ struct ceph_msg_data {
bool own_pages;
};
struct ceph_pagelist *pagelist;
- struct iov_iter iter;
};
};
@@ -601,6 +604,7 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);
+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf);
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
@@ -16,6 +16,7 @@
#include <linux/ceph/msgpool.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
+#include <linux/ceph/databuf.h>
struct ceph_msg;
struct ceph_snap_context;
@@ -103,6 +104,7 @@ struct ceph_osd {
enum ceph_osd_data_type {
CEPH_OSD_DATA_TYPE_NONE = 0,
+ CEPH_OSD_DATA_TYPE_DATABUF,
CEPH_OSD_DATA_TYPE_PAGES,
CEPH_OSD_DATA_TYPE_PAGELIST,
#ifdef CONFIG_BLOCK
@@ -115,6 +117,7 @@ enum ceph_osd_data_type {
struct ceph_osd_data {
enum ceph_osd_data_type type;
union {
+ struct ceph_databuf *dbuf;
struct {
struct page **pages;
u64 length;
@@ -15,4 +15,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
auth_x.o \
ceph_strings.o ceph_hash.o \
pagevec.o snapshot.o string_table.o \
- messenger_v1.o messenger_v2.o
+ messenger_v1.o messenger_v2.o \
+ databuf.o
new file mode 100644
@@ -0,0 +1,200 @@
+// SPDX-License-Identifier: GPL-2.0
+/* Data container
+ *
+ * Copyright (C) 2023 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#include <linux/export.h>
+#include <linux/gfp.h>
+#include <linux/slab.h>
+#include <linux/uio.h>
+#include <linux/pagemap.h>
+#include <linux/highmem.h>
+#include <linux/ceph/databuf.h>
+
+struct ceph_databuf *ceph_databuf_alloc(size_t min_bvec, size_t space,
+ unsigned int data_source, gfp_t gfp)
+{
+ struct ceph_databuf *dbuf;
+ size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
+
+ dbuf = kzalloc(sizeof(*dbuf), gfp);
+ if (!dbuf)
+ return NULL;
+
+ refcount_set(&dbuf->refcnt, 1);
+
+ if (min_bvec == 0 && space == 0) {
+ /* Do nothing */
+ } else if (min_bvec <= inl && space <= inl * PAGE_SIZE) {
+ dbuf->bvec = dbuf->inline_bvec;
+ dbuf->max_bvec = inl;
+ dbuf->limit = space;
+ } else if (min_bvec) {
+ min_bvec = umax(min_bvec, 16);
+
+ dbuf->bvec = kcalloc(min_bvec, sizeof(struct bio_vec), gfp);
+ if (!dbuf->bvec) {
+ kfree(dbuf);
+ return NULL;
+ }
+
+ dbuf->max_bvec = min_bvec;
+ }
+
+ iov_iter_bvec(&dbuf->iter, data_source, dbuf->bvec, 0, 0);
+
+ if (space) {
+ if (ceph_databuf_reserve(dbuf, space, gfp) < 0) {
+ ceph_databuf_release(dbuf);
+ return NULL;
+ }
+ }
+ return dbuf;
+}
+EXPORT_SYMBOL(ceph_databuf_alloc);
+
+struct ceph_databuf *ceph_databuf_get(struct ceph_databuf *dbuf)
+{
+ if (!dbuf)
+ return NULL;
+ refcount_inc(&dbuf->refcnt);
+ return dbuf;
+}
+EXPORT_SYMBOL(ceph_databuf_get);
+
+void ceph_databuf_release(struct ceph_databuf *dbuf)
+{
+ size_t i;
+
+ if (!dbuf || !refcount_dec_and_test(&dbuf->refcnt))
+ return;
+
+ if (dbuf->put_pages)
+ for (i = 0; i < dbuf->nr_bvec; i++)
+ put_page(dbuf->bvec[i].bv_page);
+ if (dbuf->bvec != dbuf->inline_bvec)
+ kfree(dbuf->bvec);
+ kfree(dbuf);
+}
+EXPORT_SYMBOL(ceph_databuf_release);
+
+/*
+ * Expand the bvec[] in the dbuf.
+ */
+static int ceph_databuf_expand(struct ceph_databuf *dbuf, size_t req_bvec,
+ gfp_t gfp)
+{
+ struct bio_vec *bvec = dbuf->bvec, *old = bvec;
+ size_t size, max_bvec, off = dbuf->iter.bvec - old;
+ size_t inl = ARRAY_SIZE(dbuf->inline_bvec);
+
+ if (req_bvec <= inl) {
+ dbuf->bvec = dbuf->inline_bvec;
+ dbuf->max_bvec = inl;
+ dbuf->iter.bvec = dbuf->inline_bvec + off;
+ return 0;
+ }
+
+ max_bvec = roundup_pow_of_two(req_bvec);
+ size = array_size(max_bvec, sizeof(struct bio_vec));
+
+ if (old == dbuf->inline_bvec) {
+ bvec = kmalloc_array(max_bvec, sizeof(struct bio_vec), gfp);
+ if (!bvec)
+ return -ENOMEM;
+ memcpy(bvec, old, inl);
+ } else {
+ bvec = krealloc(old, size, gfp);
+ if (!bvec)
+ return -ENOMEM;
+ }
+ dbuf->bvec = bvec;
+ dbuf->max_bvec = max_bvec;
+ dbuf->iter.bvec = bvec + off;
+ return 0;
+}
+
+/* Allocate enough pages for a dbuf to append the given amount
+ * of dbuf without allocating.
+ * Returns: 0 on success, -ENOMEM on error.
+ */
+int ceph_databuf_reserve(struct ceph_databuf *dbuf, size_t add_space,
+ gfp_t gfp)
+{
+ struct bio_vec *bvec;
+ size_t i, req_bvec = DIV_ROUND_UP(dbuf->iter.count + add_space, PAGE_SIZE);
+ int ret;
+
+ dbuf->put_pages = true;
+ if (req_bvec > dbuf->max_bvec) {
+ ret = ceph_databuf_expand(dbuf, req_bvec, gfp);
+ if (ret < 0)
+ return ret;
+ }
+
+ bvec = dbuf->bvec;
+ while (dbuf->nr_bvec < req_bvec) {
+ struct page *pages[16];
+ size_t want = min(req_bvec, ARRAY_SIZE(pages)), got;
+
+ memset(pages, 0, sizeof(pages));
+ got = alloc_pages_bulk(gfp, want, pages);
+ if (!got)
+ return -ENOMEM;
+ for (i = 0; i < got; i++)
+ bvec_set_page(&bvec[dbuf->nr_bvec + i], pages[i],
+ PAGE_SIZE, 0);
+ dbuf->iter.nr_segs += got;
+ dbuf->nr_bvec += got;
+ dbuf->limit = dbuf->nr_bvec * PAGE_SIZE;
+ }
+
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_reserve);
+
+int ceph_databuf_append(struct ceph_databuf *dbuf, const void *buf, size_t len)
+{
+ struct iov_iter temp_iter;
+
+ if (!len)
+ return 0;
+ if (dbuf->limit - dbuf->iter.count > len &&
+ ceph_databuf_reserve(dbuf, len, GFP_NOIO) < 0)
+ return -ENOMEM;
+
+ iov_iter_bvec(&temp_iter, ITER_DEST,
+ dbuf->bvec, dbuf->nr_bvec, dbuf->limit);
+ iov_iter_advance(&temp_iter, dbuf->iter.count);
+
+ if (copy_to_iter(buf, len, &temp_iter) != len)
+ return -EFAULT;
+ dbuf->iter.count += len;
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_append);
+
+/*
+ * Allocate a fragment and insert it into the buffer at the specified index.
+ */
+int ceph_databuf_insert_frag(struct ceph_databuf *dbuf, unsigned int ix,
+ size_t len, gfp_t gfp)
+{
+ struct page *page;
+
+ page = alloc_page(gfp);
+ if (!page)
+ return -ENOMEM;
+
+ bvec_set_page(&dbuf->bvec[ix], page, len, 0);
+
+ if (dbuf->nr_bvec == ix) {
+ dbuf->iter.nr_segs = ix + 1;
+ dbuf->nr_bvec = ix + 1;
+ dbuf->iter.count += len;
+ }
+ return 0;
+}
+EXPORT_SYMBOL(ceph_databuf_insert_frag);
@@ -1872,7 +1872,9 @@ static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
static void ceph_msg_data_destroy(struct ceph_msg_data *data)
{
- if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
+ if (data->type == CEPH_MSG_DATA_DATABUF) {
+ ceph_databuf_release(data->dbuf);
+ } else if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
int num_pages = calc_pages_for(data->offset, data->length);
ceph_release_page_vector(data->pages, num_pages);
} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
@@ -1880,6 +1882,22 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
}
}
+void ceph_msg_data_add_databuf(struct ceph_msg *msg, struct ceph_databuf *dbuf)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!dbuf);
+ BUG_ON(!ceph_databuf_len(dbuf));
+
+ data = ceph_msg_data_add(msg);
+ data->type = CEPH_MSG_DATA_DATABUF;
+ data->dbuf = ceph_databuf_get(dbuf);
+ data->iter = dbuf->iter;
+
+ msg->data_length += ceph_databuf_len(dbuf);
+}
+EXPORT_SYMBOL(ceph_msg_data_add_databuf);
+
void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
size_t length, size_t offset, bool own_pages)
{
@@ -359,6 +359,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
switch (osd_data->type) {
case CEPH_OSD_DATA_TYPE_NONE:
return 0;
+ case CEPH_OSD_DATA_TYPE_DATABUF:
+ return ceph_databuf_len(osd_data->dbuf);
case CEPH_OSD_DATA_TYPE_PAGES:
return osd_data->length;
case CEPH_OSD_DATA_TYPE_PAGELIST:
@@ -379,7 +381,9 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
+ ceph_databuf_release(osd_data->dbuf);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
int num_pages;
num_pages = calc_pages_for((u64)osd_data->offset,
@@ -965,7 +969,10 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
{
u64 length = ceph_osd_data_length(osd_data);
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+ if (osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF) {
+ BUG_ON(!length);
+ ceph_msg_data_add_databuf(msg, osd_data->dbuf);
+ } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
BUG_ON(length > (u64) SIZE_MAX);
if (length)
ceph_msg_data_add_pages(msg, osd_data->pages,
Add a new ceph data container type, ceph_databuf, that can carry a list of pages in a bvec and use an iov_iter to handle describe the data to the next layer down. The iterator can also be used to refer to other types, such as ITER_FOLIOQ. There are two ways of loading the bvec. One way is to allocate a buffer with space in it and then add data, expanding the space as needed; the other is to splice in pages, expanding the bvec[] as needed. This is intended to replace all other types. Signed-off-by: David Howells <dhowells@redhat.com> cc: Viacheslav Dubeyko <slava@dubeyko.com> cc: Alex Markuze <amarkuze@redhat.com> cc: Ilya Dryomov <idryomov@gmail.com> cc: ceph-devel@vger.kernel.org cc: linux-fsdevel@vger.kernel.org --- include/linux/ceph/databuf.h | 131 +++++++++++++++++++++ include/linux/ceph/messenger.h | 6 +- include/linux/ceph/osd_client.h | 3 + net/ceph/Makefile | 3 +- net/ceph/databuf.c | 200 ++++++++++++++++++++++++++++++++ net/ceph/messenger.c | 20 +++- net/ceph/osd_client.c | 11 +- 7 files changed, 369 insertions(+), 5 deletions(-) create mode 100644 include/linux/ceph/databuf.h create mode 100644 net/ceph/databuf.c