diff mbox

[4/8] libceph: start defining message data cursor

Message ID 513CDBFB.504@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Elder March 10, 2013, 7:16 p.m. UTC
This patch lays out the foundation for using generic routines to
manage processing items of message data.

For simplicity, we'll start with just the trail portion of a
message, because it stands alone and is only present for outgoing
data.

First some basic concepts.  We'll use the term "data item" to
represent one of the ceph_msg_data structures associated with a
message.  There are currently four of those, with single-letter
field names p, l, b, and t.  A data item is further broken into
"pieces" which always lie in a single page.  A data item will
include a "cursor" that will track state as the memory defined by
the item is consumed by sending data from or receiving data into it.

We define three routines to manipulate a data item's cursor: the
"init" routine; the "next" routine; and the "advance" routine.  The
"init" routine initializes the cursor so it points at the beginning
of the first piece in the item.  The "next" routine returns the
page, page offset, and length (limited by both the page and item
size) of the next unconsumed piece in the item.  It also indicates
to the caller whether the piece being returned is the last one in
the data item.

The "advance" routine consumes the requested number of bytes in the
item (advancing the cursor).  This is used to record the number of
bytes from the current piece that were actually sent or received by
the network code.  It returns an indication of whether the result
means the current piece has been fully consumed.  This is used by
the message send code to determine whether it should calculate the
CRC for the next piece processed.

The trail of a message is implemented as a ceph pagelist.  The
routines defined for it will be usable for non-trail pagelist data
as well.

Signed-off-by: Alex Elder <elder@inktank.com>
---
 include/linux/ceph/messenger.h |    7 ++
 net/ceph/messenger.c           |  138
+++++++++++++++++++++++++++++++++++++---
 2 files changed, 135 insertions(+), 10 deletions(-)

 {
@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
 		init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
 #endif
 	msg_pos->data_pos = 0;
+
+	/* If there's a trail, initialize its cursor */
+
+	if (ceph_msg_has_trail(msg))
+		ceph_msg_data_cursor_init(&msg->t);
+
 	msg_pos->did_page_crc = false;
 }

@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,

 	msg_pos->data_pos += sent;
 	msg_pos->page_pos += sent;
+	if (in_trail) {
+		bool need_crc;
+
+		need_crc = ceph_msg_data_advance(&msg->t, sent);
+		BUG_ON(need_crc && sent != len);
+	}
 	if (sent < len)
 		return;

@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct
ceph_connection *con, struct page *page,
 	msg_pos->page_pos = 0;
 	msg_pos->page++;
 	msg_pos->did_page_crc = false;
-	if (in_trail) {
-		BUG_ON(!ceph_msg_has_trail(msg));
-		list_rotate_left(&msg->t.pagelist->head);
-	} else if (ceph_msg_has_pagelist(msg)) {
+	if (ceph_msg_has_pagelist(msg)) {
 		list_rotate_left(&msg->l.pagelist->head);
 #ifdef CONFIG_BLOCK
 	} else if (ceph_msg_has_bio(msg)) {
@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct
ceph_connection *con)
 		size_t length;
 		int max_write = PAGE_SIZE;
 		int bio_offset = 0;
+		bool use_cursor = false;
+		bool last_piece = true;	/* preserve existing behavior */

 		in_trail = in_trail || msg_pos->data_pos >= trail_off;
 		if (!in_trail)
@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct
ceph_connection *con)

 		if (in_trail) {
 			BUG_ON(!ceph_msg_has_trail(msg));
-			total_max_write = data_len - msg_pos->data_pos;
-			page = list_first_entry(&msg->t.pagelist->head,
-						struct page, lru);
+			use_cursor = true;
+			page = ceph_msg_data_next(&msg->t, &page_offset,
+							&length, &last_piece);
 		} else if (ceph_msg_has_pages(msg)) {
 			page = msg->p.pages[msg_pos->page];
 		} else if (ceph_msg_has_pagelist(msg)) {
@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct
ceph_connection *con)
 		} else {
 			page = zero_page;
 		}
-		length = min_t(int, max_write - msg_pos->page_pos,
-			    total_max_write);
+		if (!use_cursor)
+			length = min_t(int, max_write - msg_pos->page_pos,
+					    total_max_write);

 		page_offset = msg_pos->page_pos + bio_offset;
 		if (do_datacrc && !msg_pos->did_page_crc) {
@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct
ceph_connection *con)
 			msg_pos->did_page_crc = true;
 		}
 		ret = ceph_tcp_sendpage(con->sock, page, page_offset,
-				      length, true);
+				      length, last_piece);
 		if (ret <= 0)
 			goto out;
diff mbox

Patch

diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 5860dd0..1486243 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -95,6 +95,12 @@  static __inline__ bool ceph_msg_data_type_valid(enum
ceph_msg_data_type type)
 	}
 }

+struct ceph_msg_data_cursor {
+	bool		last_piece;	/* now at last piece of data item */
+	struct page	*page;		/* current page in pagelist */
+	size_t		offset;		/* pagelist bytes consumed */
+};
+
 struct ceph_msg_data {
 	enum ceph_msg_data_type		type;
 	union {
@@ -112,6 +118,7 @@  struct ceph_msg_data {
 		};
 		struct ceph_pagelist	*pagelist;
 	};
+	struct ceph_msg_data_cursor	cursor;		/* pagelist only */
 };

 /*
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f256b4b..b978cf8 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@ 
 #include <linux/ceph/pagelist.h>
 #include <linux/export.h>

+#define list_entry_next(pos, member)					\
+	list_entry(pos->member.next, typeof(*pos), member)
+
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
@@ -738,6 +741,109 @@  static void iter_bio_next(struct bio **bio_iter,
unsigned int *seg)
 }
 #endif

+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page.  The network layer might not
+ * consume an entire piece at once.  A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece.  It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct ceph_pagelist *pagelist;
+	struct page *page;
+
+	if (data->type != CEPH_MSG_DATA_PAGELIST)
+		return;
+
+	pagelist = data->pagelist;
+	BUG_ON(!pagelist);
+	if (!pagelist->length)
+		return;		/* pagelist can be assigned but empty */
+
+	BUG_ON(list_empty(&pagelist->head));
+	page = list_first_entry(&pagelist->head, struct page, lru);
+
+	cursor->page = page;
+	cursor->offset = 0;
+	cursor->last_piece = pagelist->length <= PAGE_SIZE;
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
+						size_t *page_offset,
+						size_t *length,
+						bool *last_piece)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct ceph_pagelist *pagelist;
+	size_t piece_end;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+	pagelist = data->pagelist;
+	BUG_ON(!pagelist);
+
+	BUG_ON(!cursor->page);
+	BUG_ON(cursor->offset >= pagelist->length);
+
+	*last_piece = cursor->last_piece;
+	if (*last_piece) {
+		/* pagelist offset is always 0 */
+		piece_end = pagelist->length & ~PAGE_MASK;
+		if (!piece_end)
+			piece_end = PAGE_SIZE;
+	} else {
+		piece_end = PAGE_SIZE;
+	}
+	*page_offset = cursor->offset & ~PAGE_MASK;
+	*length = piece_end - *page_offset;
+
+	return data->cursor.page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * (the next page) of the pagelist.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct ceph_pagelist *pagelist;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+	pagelist = data->pagelist;
+	BUG_ON(!pagelist);
+	BUG_ON(!cursor->page);
+	BUG_ON(cursor->offset + bytes > pagelist->length);
+	BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+	/* Advance the cursor offset */
+
+	cursor->offset += bytes;
+	/* pagelist offset is always 0 */
+	if (!bytes || cursor->offset & ~PAGE_MASK)
+		return false;	/* more bytes to process in the current page */
+
+	/* Move on to the next page */
+
+	BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+	cursor->page = list_entry_next(cursor->page, lru);
+
+	/* cursor offset is at page boundary; pagelist offset is always 0 */
+	if (pagelist->length - cursor->offset <= PAGE_SIZE)
+		cursor->last_piece = true;
+
+	return true;
+}
+
 static void prepare_message_data(struct ceph_msg *msg,
 				struct ceph_msg_pos *msg_pos)