@@ -173,11 +173,15 @@ struct ceph_msg_data {
u32 bio_length;
};
#endif /* CONFIG_BLOCK */
- struct ceph_bvec_iter bvec_pos;
+ struct {
+ struct ceph_bvec_iter bvec_pos;
+ u32 num_bvecs;
+ };
struct {
struct page **pages;
size_t length; /* total # bytes */
unsigned int alignment; /* first page */
+ bool pages_from_pool;
bool own_pages;
};
struct ceph_pagelist *pagelist;
@@ -357,8 +361,29 @@ extern void ceph_con_keepalive(struct ceph_connection *con);
extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
unsigned long interval);
-void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
- size_t length, size_t alignment, bool own_pages);
+extern void ceph_msg_data_init(struct ceph_msg_data *data);
+extern void ceph_msg_data_release(struct ceph_msg_data *data);
+extern size_t ceph_msg_data_length(struct ceph_msg_data *data);
+
+extern void ceph_msg_data_pages_init(struct ceph_msg_data *data,
+ struct page **pages, u64 length,
+ u32 alignment, bool pages_from_pool,
+ bool own_pages);
+extern void ceph_msg_data_pagelist_init(struct ceph_msg_data *data,
+ struct ceph_pagelist *pagelist);
+#ifdef CONFIG_BLOCK
+extern void ceph_msg_data_bio_init(struct ceph_msg_data *data,
+ struct ceph_bio_iter *bio_pos,
+ u32 bio_length);
+#endif /* CONFIG_BLOCK */
+extern void ceph_msg_data_bvecs_init(struct ceph_msg_data *data,
+ struct ceph_bvec_iter *bvec_pos,
+ u32 num_bvecs);
+extern void ceph_msg_data_add(struct ceph_msg *msg, struct ceph_msg_data *data);
+
+extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
+ size_t length, size_t alignment,
+ bool pages_from_pool, bool own_pages);
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
@@ -366,7 +391,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
u32 length);
#endif /* CONFIG_BLOCK */
void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
- struct ceph_bvec_iter *bvec_pos);
+ struct ceph_bvec_iter *bvec_pos,
+ u32 num_bvecs);
struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
gfp_t flags, bool can_fail);
@@ -3240,13 +3240,20 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
return false;
}
-static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
+static struct ceph_msg_data *ceph_msg_data_get_next(struct ceph_msg *msg)
{
BUG_ON(msg->num_data_items >= msg->max_data_items);
return &msg->data[msg->num_data_items++];
}
-static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+void ceph_msg_data_init(struct ceph_msg_data *data)
+{
+ memset(data, 0, sizeof(*data));
+ data->type = CEPH_MSG_DATA_NONE;
+}
+EXPORT_SYMBOL(ceph_msg_data_init);
+
+void ceph_msg_data_release(struct ceph_msg_data *data)
{
if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) {
int num_pages = calc_pages_for(data->alignment, data->length);
@@ -3254,23 +3261,120 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
} else if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist);
}
+ ceph_msg_data_init(data);
}
+EXPORT_SYMBOL(ceph_msg_data_release);
-void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
- size_t length, size_t alignment, bool own_pages)
+/*
+ * Consumes @pages if @own_pages is true.
+ */
+void ceph_msg_data_pages_init(struct ceph_msg_data *data,
+ struct page **pages, u64 length, u32 alignment,
+ bool pages_from_pool, bool own_pages)
{
- struct ceph_msg_data *data;
-
- BUG_ON(!pages);
- BUG_ON(!length);
-
- data = ceph_msg_data_add(msg);
data->type = CEPH_MSG_DATA_PAGES;
data->pages = pages;
data->length = length;
data->alignment = alignment & ~PAGE_MASK;
+ data->pages_from_pool = pages_from_pool;
data->own_pages = own_pages;
+}
+EXPORT_SYMBOL(ceph_msg_data_pages_init);
+
+/*
+ * Consumes a ref on @pagelist.
+ */
+void ceph_msg_data_pagelist_init(struct ceph_msg_data *data,
+ struct ceph_pagelist *pagelist)
+{
+ data->type = CEPH_MSG_DATA_PAGELIST;
+ data->pagelist = pagelist;
+}
+EXPORT_SYMBOL(ceph_msg_data_pagelist_init);
+
+#ifdef CONFIG_BLOCK
+void ceph_msg_data_bio_init(struct ceph_msg_data *data,
+ struct ceph_bio_iter *bio_pos,
+ u32 bio_length)
+{
+ data->type = CEPH_MSG_DATA_BIO;
+ data->bio_pos = *bio_pos;
+ data->bio_length = bio_length;
+}
+EXPORT_SYMBOL(ceph_msg_data_bio_init);
+#endif /* CONFIG_BLOCK */
+void ceph_msg_data_bvecs_init(struct ceph_msg_data *data,
+ struct ceph_bvec_iter *bvec_pos,
+ u32 num_bvecs)
+{
+ data->type = CEPH_MSG_DATA_BVECS;
+ data->bvec_pos = *bvec_pos;
+ data->num_bvecs = num_bvecs;
+}
+EXPORT_SYMBOL(ceph_msg_data_bvecs_init);
+
+size_t ceph_msg_data_length(struct ceph_msg_data *data)
+{
+ switch (data->type) {
+ case CEPH_MSG_DATA_NONE:
+ return 0;
+ case CEPH_MSG_DATA_PAGES:
+ return data->length;
+ case CEPH_MSG_DATA_PAGELIST:
+ return data->pagelist->length;
+#ifdef CONFIG_BLOCK
+ case CEPH_MSG_DATA_BIO:
+ return data->bio_length;
+#endif /* CONFIG_BLOCK */
+ case CEPH_MSG_DATA_BVECS:
+ return data->bvec_pos.iter.bi_size;
+ default:
+ WARN(true, "unrecognized data type %d\n", (int)data->type);
+ return 0;
+ }
+}
+EXPORT_SYMBOL(ceph_msg_data_length);
+
+void ceph_msg_data_add(struct ceph_msg *msg, struct ceph_msg_data *data)
+{
+ u64 length = ceph_msg_data_length(data);
+
+ if (data->type == CEPH_MSG_DATA_PAGES) {
+ BUG_ON(length > (u64)SIZE_MAX);
+ if (likely(length))
+ ceph_msg_data_add_pages(msg, data->pages,
+ length, data->alignment,
+ data->pages_from_pool,
+ false);
+ } else if (data->type == CEPH_MSG_DATA_PAGELIST) {
+ BUG_ON(!length);
+ ceph_msg_data_add_pagelist(msg, data->pagelist);
+#ifdef CONFIG_BLOCK
+ } else if (data->type == CEPH_MSG_DATA_BIO) {
+ ceph_msg_data_add_bio(msg, &data->bio_pos, length);
+#endif
+ } else if (data->type == CEPH_MSG_DATA_BVECS) {
+ ceph_msg_data_add_bvecs(msg, &data->bvec_pos,
+ data->num_bvecs);
+ } else {
+ BUG_ON(data->type != CEPH_MSG_DATA_NONE);
+ }
+}
+EXPORT_SYMBOL(ceph_msg_data_add);
+
+void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
+ size_t length, size_t alignment,
+ bool pages_from_pool, bool own_pages)
+{
+ struct ceph_msg_data *data;
+
+ BUG_ON(!pages);
+ BUG_ON(!length);
+
+ data = ceph_msg_data_get_next(msg);
+ ceph_msg_data_pages_init(data, pages, length, alignment,
+ pages_from_pool, own_pages);
msg->data_length += length;
}
EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3283,10 +3387,9 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist);
BUG_ON(!pagelist->length);
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_PAGELIST;
+ data = ceph_msg_data_get_next(msg);
+ ceph_msg_data_pagelist_init(data, pagelist);
refcount_inc(&pagelist->refcnt);
- data->pagelist = pagelist;
msg->data_length += pagelist->length;
}
@@ -3298,10 +3401,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
{
struct ceph_msg_data *data;
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_BIO;
- data->bio_pos = *bio_pos;
- data->bio_length = length;
+ data = ceph_msg_data_get_next(msg);
+ ceph_msg_data_bio_init(data, bio_pos, length);
msg->data_length += length;
}
@@ -3309,13 +3410,13 @@ EXPORT_SYMBOL(ceph_msg_data_add_bio);
#endif /* CONFIG_BLOCK */
void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
- struct ceph_bvec_iter *bvec_pos)
+ struct ceph_bvec_iter *bvec_pos,
+ u32 num_bvecs)
{
struct ceph_msg_data *data;
- data = ceph_msg_data_add(msg);
- data->type = CEPH_MSG_DATA_BVECS;
- data->bvec_pos = *bvec_pos;
+ data = ceph_msg_data_get_next(msg);
+ ceph_msg_data_bvecs_init(data, bvec_pos, num_bvecs);
msg->data_length += bvec_pos->iter.bi_size;
}
@@ -3502,7 +3603,7 @@ static void ceph_msg_release(struct kref *kref)
}
for (i = 0; i < m->num_data_items; i++)
- ceph_msg_data_destroy(&m->data[i]);
+ ceph_msg_data_release(&m->data[i]);
if (m->pool)
ceph_msgpool_put(m->pool, m);
@@ -962,7 +962,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
BUG_ON(length > (u64) SIZE_MAX);
if (length)
ceph_msg_data_add_pages(msg, osd_data->pages,
- length, osd_data->alignment, false);
+ length, osd_data->alignment,
+ osd_data->pages_from_pool, false);
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
BUG_ON(!length);
ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
@@ -971,7 +972,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
#endif
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
- ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
+ ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos,
+ osd_data->num_bvecs);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
}
@@ -5443,7 +5445,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
return NULL;
}
- ceph_msg_data_add_pages(m, pages, data_len, 0, true);
+ ceph_msg_data_add_pages(m, pages, data_len, 0, false, true);
}
return m;
There is a similar msg data interface for osd_client, which has the structure named ceph_osd_data. This is a first patch towards API unification, i.e. ceph_msg_data API will be used for all the cases. Signed-off-by: Roman Penyaev <rpenyaev@suse.de> --- include/linux/ceph/messenger.h | 34 +++++++- net/ceph/messenger.c | 145 ++++++++++++++++++++++++++++----- net/ceph/osd_client.c | 8 +- 3 files changed, 158 insertions(+), 29 deletions(-)