diff mbox

libceph: make message data be a pointer

Message ID 513FD203.10401@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Elder March 13, 2013, 1:10 a.m. UTC
Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.

A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.

Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.

When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.

This partially resolves:
    http://tracker.ceph.com/issues/4429

Signed-off-by: Alex Elder <elder@inktank.com>
---
 include/linux/ceph/messenger.h |    5 +--
 net/ceph/messenger.c           |   91
+++++++++++++++++++++++++++-------------
 2 files changed, 62 insertions(+), 34 deletions(-)

 	/*
@@ -1414,7 +1414,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
 		bool need_crc;
 		int ret;

-		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+		page = ceph_msg_data_next(msg->data, &page_offset, &length,
 							&last_piece);
 		if (do_datacrc && cursor->need_crc)
 			crc = ceph_crc32c_page(crc, page, page_offset, length);
@@ -1425,7 +1425,7 @@ static int write_partial_message_data(struct
ceph_connection *con)

 			return ret;
 		}
-		need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
+		need_crc = ceph_msg_data_advance(msg->data, (size_t) ret);
 	}
 	msg->footer.data_crc = cpu_to_le32(crc);

@@ -2075,7 +2075,7 @@ static int read_partial_message_section(struct
ceph_connection *con,
 static int read_partial_msg_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->in_msg;
-	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+	struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
 	const bool do_datacrc = !con->msgr->nocrc;
 	struct page *page;
 	size_t page_offset;
@@ -2084,12 +2084,12 @@ static int read_partial_msg_data(struct
ceph_connection *con)
 	int ret;

 	BUG_ON(!msg);
-	if (WARN_ON(!ceph_msg_has_data(msg)))
+	if (!msg->data)
 		return -EIO;

 	crc = con->in_data_crc;
 	while (cursor->resid) {
-		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+		page = ceph_msg_data_next(msg->data, &page_offset, &length,
 							NULL);
 		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
 		if (ret <= 0) {
@@ -2100,7 +2100,7 @@ static int read_partial_msg_data(struct
ceph_connection *con)

 		if (do_datacrc)
 			crc = ceph_crc32c_page(crc, page, page_offset, ret);
-		(void) ceph_msg_data_advance(&msg->data, (size_t) ret);
+		(void) ceph_msg_data_advance(msg->data, (size_t) ret);
 	}
 	con->in_data_crc = crc;

@@ -2910,44 +2910,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);

-static void ceph_msg_data_init(struct ceph_msg_data *data)
+static struct ceph_msg_data *ceph_msg_data_create(enum
ceph_msg_data_type type)
 {
-	data->type = CEPH_MSG_DATA_NONE;
+	struct ceph_msg_data *data;
+
+	if (WARN_ON(!ceph_msg_data_type_valid(type)))
+		return NULL;
+
+	data = kzalloc(sizeof (*data), GFP_NOFS);
+	if (data)
+		data->type = type;
+
+	return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+	if (!data)
+		return;
+
+	if (data->type == CEPH_MSG_DATA_PAGELIST) {
+		ceph_pagelist_release(data->pagelist);
+		kfree(data->pagelist);
+	}
+	kfree(data);
 }

 void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 		size_t length, size_t alignment)
 {
+	struct ceph_msg_data *data;
+
 	BUG_ON(!pages);
 	BUG_ON(!length);
-	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+	BUG_ON(msg->data != NULL);
+
+	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+	BUG_ON(!data);
+	data->pages = pages;
+	data->length = length;
+	data->alignment = alignment & ~PAGE_MASK;

-	msg->data.type = CEPH_MSG_DATA_PAGES;
-	msg->data.pages = pages;
-	msg->data.length = length;
-	msg->data.alignment = alignment & ~PAGE_MASK;
+	msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pages);

 void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
 				struct ceph_pagelist *pagelist)
 {
+	struct ceph_msg_data *data;
+
 	BUG_ON(!pagelist);
 	BUG_ON(!pagelist->length);
-	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+	BUG_ON(msg->data != NULL);
+
+	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+	BUG_ON(!data);
+	data->pagelist = pagelist;

-	msg->data.type = CEPH_MSG_DATA_PAGELIST;
-	msg->data.pagelist = pagelist;
+	msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);

 void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 {
+	struct ceph_msg_data *data;
+
 	BUG_ON(!bio);
-	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+	BUG_ON(msg->data != NULL);
+
+	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+	BUG_ON(!data);
+	data->bio = bio;

-	msg->data.type = CEPH_MSG_DATA_BIO;
-	msg->data.bio = bio;
+	msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);

@@ -2971,8 +3007,6 @@ struct ceph_msg *ceph_msg_new(int type, int
front_len, gfp_t flags,
 	INIT_LIST_HEAD(&m->list_head);
 	kref_init(&m->kref);

-	ceph_msg_data_init(&m->data);
-
 	/* front */
 	m->front_max = front_len;
 	if (front_len) {
@@ -3127,11 +3161,8 @@ void ceph_msg_last_put(struct kref *kref)
 		m->middle = NULL;
 	}

-	if (ceph_msg_has_data(m) && m->data.type == CEPH_MSG_DATA_PAGELIST) {
-		ceph_pagelist_release(m->data.pagelist);
-		kfree(m->data.pagelist);
-		m->data.pagelist = NULL;
-	}
+	ceph_msg_data_destroy(m->data);
+	m->data = NULL;

 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);
@@ -3143,7 +3174,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 void ceph_msg_dump(struct ceph_msg *msg)
 {
 	pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
-		 msg->front_max, msg->data.length);
+		 msg->front_max, msg->data->length);
 	print_hex_dump(KERN_DEBUG, "header: ",
 		       DUMP_PREFIX_OFFSET, 16, 1,
 		       &msg->hdr, sizeof(msg->hdr), true);

Comments

Josh Durgin March 14, 2013, 7:54 p.m. UTC | #1
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>

On 03/12/2013 06:10 PM, Alex Elder wrote:
> Begin the transition from a single message data item to a list of
> them by replacing the "data" structure in a message with a pointer
> to a ceph_msg_data structure.
>
> A null pointer will indicate the message has no data; replace the
> use of ceph_msg_has_data() with a simple check for a null pointer.
>
> Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
> to dynamically allocate and free a data item structure of a given type.
>
> When a message has its data item "set," allocate one of these to
> hold the data description, and free it when the last reference to
> the message is dropped.
>
> This partially resolves:
>      http://tracker.ceph.com/issues/4429
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
>   include/linux/ceph/messenger.h |    5 +--
>   net/ceph/messenger.c           |   91
> +++++++++++++++++++++++++++-------------
>   2 files changed, 62 insertions(+), 34 deletions(-)
>
> diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
> index 686df5b..3181321 100644
> --- a/include/linux/ceph/messenger.h
> +++ b/include/linux/ceph/messenger.h
> @@ -64,8 +64,6 @@ struct ceph_messenger {
>   	u32 required_features;
>   };
>
> -#define ceph_msg_has_data(m)		((m)->data.type != CEPH_MSG_DATA_NONE)
> -
>   enum ceph_msg_data_type {
>   	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
>   	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
> @@ -141,8 +139,7 @@ struct ceph_msg {
>   	struct kvec front;              /* unaligned blobs of message */
>   	struct ceph_buffer *middle;
>
> -	/* data payload */
> -	struct ceph_msg_data	data;
> +	struct ceph_msg_data	*data;	/* data payload */
>
>   	struct ceph_connection *con;
>   	struct list_head list_head;	/* links for connection lists */
> diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
> index 8e07ac4..d703813 100644
> --- a/net/ceph/messenger.c
> +++ b/net/ceph/messenger.c
> @@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
>
>   	/* Initialize data cursors */
>
> -	ceph_msg_data_cursor_init(&msg->data, data_len);
> +	ceph_msg_data_cursor_init(msg->data, data_len);
>   }
>
>   /*
> @@ -1388,13 +1388,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page
> *page,
>   static int write_partial_message_data(struct ceph_connection *con)
>   {
>   	struct ceph_msg *msg = con->out_msg;
> -	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
> +	struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
>   	bool do_datacrc = !con->msgr->nocrc;
>   	u32 crc;
>
>   	dout("%s %p msg %p\n", __func__, con, msg);
>
> -	if (WARN_ON(!ceph_msg_has_data(msg)))
> +	if (WARN_ON(!msg->data))
>   		return -EINVAL;
>
>   	/*
> @@ -1414,7 +1414,7 @@ static int write_partial_message_data(struct
> ceph_connection *con)
>   		bool need_crc;
>   		int ret;
>
> -		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
> +		page = ceph_msg_data_next(msg->data, &page_offset, &length,
>   							&last_piece);
>   		if (do_datacrc && cursor->need_crc)
>   			crc = ceph_crc32c_page(crc, page, page_offset, length);
> @@ -1425,7 +1425,7 @@ static int write_partial_message_data(struct
> ceph_connection *con)
>
>   			return ret;
>   		}
> -		need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
> +		need_crc = ceph_msg_data_advance(msg->data, (size_t) ret);
>   	}
>   	msg->footer.data_crc = cpu_to_le32(crc);
>
> @@ -2075,7 +2075,7 @@ static int read_partial_message_section(struct
> ceph_connection *con,
>   static int read_partial_msg_data(struct ceph_connection *con)
>   {
>   	struct ceph_msg *msg = con->in_msg;
> -	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
> +	struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
>   	const bool do_datacrc = !con->msgr->nocrc;
>   	struct page *page;
>   	size_t page_offset;
> @@ -2084,12 +2084,12 @@ static int read_partial_msg_data(struct
> ceph_connection *con)
>   	int ret;
>
>   	BUG_ON(!msg);
> -	if (WARN_ON(!ceph_msg_has_data(msg)))
> +	if (!msg->data)
>   		return -EIO;
>
>   	crc = con->in_data_crc;
>   	while (cursor->resid) {
> -		page = ceph_msg_data_next(&msg->data, &page_offset, &length,
> +		page = ceph_msg_data_next(msg->data, &page_offset, &length,
>   							NULL);
>   		ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
>   		if (ret <= 0) {
> @@ -2100,7 +2100,7 @@ static int read_partial_msg_data(struct
> ceph_connection *con)
>
>   		if (do_datacrc)
>   			crc = ceph_crc32c_page(crc, page, page_offset, ret);
> -		(void) ceph_msg_data_advance(&msg->data, (size_t) ret);
> +		(void) ceph_msg_data_advance(msg->data, (size_t) ret);
>   	}
>   	con->in_data_crc = crc;
>
> @@ -2910,44 +2910,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
>   }
>   EXPORT_SYMBOL(ceph_con_keepalive);
>
> -static void ceph_msg_data_init(struct ceph_msg_data *data)
> +static struct ceph_msg_data *ceph_msg_data_create(enum
> ceph_msg_data_type type)
>   {
> -	data->type = CEPH_MSG_DATA_NONE;
> +	struct ceph_msg_data *data;
> +
> +	if (WARN_ON(!ceph_msg_data_type_valid(type)))
> +		return NULL;
> +
> +	data = kzalloc(sizeof (*data), GFP_NOFS);
> +	if (data)
> +		data->type = type;
> +
> +	return data;
> +}
> +
> +static void ceph_msg_data_destroy(struct ceph_msg_data *data)
> +{
> +	if (!data)
> +		return;
> +
> +	if (data->type == CEPH_MSG_DATA_PAGELIST) {
> +		ceph_pagelist_release(data->pagelist);
> +		kfree(data->pagelist);
> +	}
> +	kfree(data);
>   }
>
>   void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
>   		size_t length, size_t alignment)
>   {
> +	struct ceph_msg_data *data;
> +
>   	BUG_ON(!pages);
>   	BUG_ON(!length);
> -	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
> +	BUG_ON(msg->data != NULL);
> +
> +	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
> +	BUG_ON(!data);
> +	data->pages = pages;
> +	data->length = length;
> +	data->alignment = alignment & ~PAGE_MASK;
>
> -	msg->data.type = CEPH_MSG_DATA_PAGES;
> -	msg->data.pages = pages;
> -	msg->data.length = length;
> -	msg->data.alignment = alignment & ~PAGE_MASK;
> +	msg->data = data;
>   }
>   EXPORT_SYMBOL(ceph_msg_data_set_pages);
>
>   void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
>   				struct ceph_pagelist *pagelist)
>   {
> +	struct ceph_msg_data *data;
> +
>   	BUG_ON(!pagelist);
>   	BUG_ON(!pagelist->length);
> -	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
> +	BUG_ON(msg->data != NULL);
> +
> +	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
> +	BUG_ON(!data);
> +	data->pagelist = pagelist;
>
> -	msg->data.type = CEPH_MSG_DATA_PAGELIST;
> -	msg->data.pagelist = pagelist;
> +	msg->data = data;
>   }
>   EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
>
>   void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
>   {
> +	struct ceph_msg_data *data;
> +
>   	BUG_ON(!bio);
> -	BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
> +	BUG_ON(msg->data != NULL);
> +
> +	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
> +	BUG_ON(!data);
> +	data->bio = bio;
>
> -	msg->data.type = CEPH_MSG_DATA_BIO;
> -	msg->data.bio = bio;
> +	msg->data = data;
>   }
>   EXPORT_SYMBOL(ceph_msg_data_set_bio);
>
> @@ -2971,8 +3007,6 @@ struct ceph_msg *ceph_msg_new(int type, int
> front_len, gfp_t flags,
>   	INIT_LIST_HEAD(&m->list_head);
>   	kref_init(&m->kref);
>
> -	ceph_msg_data_init(&m->data);
> -
>   	/* front */
>   	m->front_max = front_len;
>   	if (front_len) {
> @@ -3127,11 +3161,8 @@ void ceph_msg_last_put(struct kref *kref)
>   		m->middle = NULL;
>   	}
>
> -	if (ceph_msg_has_data(m) && m->data.type == CEPH_MSG_DATA_PAGELIST) {
> -		ceph_pagelist_release(m->data.pagelist);
> -		kfree(m->data.pagelist);
> -		m->data.pagelist = NULL;
> -	}
> +	ceph_msg_data_destroy(m->data);
> +	m->data = NULL;
>
>   	if (m->pool)
>   		ceph_msgpool_put(m->pool, m);
> @@ -3143,7 +3174,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
>   void ceph_msg_dump(struct ceph_msg *msg)
>   {
>   	pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
> -		 msg->front_max, msg->data.length);
> +		 msg->front_max, msg->data->length);
>   	print_hex_dump(KERN_DEBUG, "header: ",
>   		       DUMP_PREFIX_OFFSET, 16, 1,
>   		       &msg->hdr, sizeof(msg->hdr), true);
>

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 686df5b..3181321 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -64,8 +64,6 @@  struct ceph_messenger {
 	u32 required_features;
 };

-#define ceph_msg_has_data(m)		((m)->data.type != CEPH_MSG_DATA_NONE)
-
 enum ceph_msg_data_type {
 	CEPH_MSG_DATA_NONE,	/* message contains no data payload */
 	CEPH_MSG_DATA_PAGES,	/* data source/destination is a page array */
@@ -141,8 +139,7 @@  struct ceph_msg {
 	struct kvec front;              /* unaligned blobs of message */
 	struct ceph_buffer *middle;

-	/* data payload */
-	struct ceph_msg_data	data;
+	struct ceph_msg_data	*data;	/* data payload */

 	struct ceph_connection *con;
 	struct list_head list_head;	/* links for connection lists */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8e07ac4..d703813 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1086,7 +1086,7 @@  static void prepare_message_data(struct ceph_msg *msg)

 	/* Initialize data cursors */

-	ceph_msg_data_cursor_init(&msg->data, data_len);
+	ceph_msg_data_cursor_init(msg->data, data_len);
 }

 /*
@@ -1388,13 +1388,13 @@  static u32 ceph_crc32c_page(u32 crc, struct page
*page,
 static int write_partial_message_data(struct ceph_connection *con)
 {
 	struct ceph_msg *msg = con->out_msg;
-	struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+	struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
 	bool do_datacrc = !con->msgr->nocrc;
 	u32 crc;

 	dout("%s %p msg %p\n", __func__, con, msg);

-	if (WARN_ON(!ceph_msg_has_data(msg)))
+	if (WARN_ON(!msg->data))
 		return -EINVAL;