@@ -192,6 +192,9 @@ struct ceph_msg_data_cursor {
size_t total_resid; /* across all data items */
struct ceph_msg_data *data; /* current data item */
+ struct iov_iter iter; /* iterator for current data */
+ struct bio_vec it_bvec; /* used as an addition to it */
+ unsigned int direction; /* data direction */
size_t resid; /* bytes not yet consumed */
bool last_piece; /* current is last piece */
bool need_crc; /* crc update needed */
@@ -523,6 +523,22 @@ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
return r;
}
+static int ceph_tcp_recviov(struct socket *sock, struct iov_iter *iter)
+{
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
+ .msg_iter = *iter };
+ int r;
+
+ if (!iter->count)
+ msg.msg_flags |= MSG_TRUNC;
+
+ r = sock_recvmsg(sock, &msg, msg.msg_flags);
+ if (r == -EAGAIN)
+ r = 0;
+ return r;
+}
+
+__attribute__((unused))
static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
int page_offset, size_t length)
{
@@ -594,6 +610,42 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
return ret;
}
+/**
+ * ceph_tcp_sendiov() - either does sendmsg() or 0-copy sendpage()
+ *
+ * @more is true if caller will be sending more data shortly.
+ */
+static int ceph_tcp_sendiov(struct socket *sock, struct iov_iter *iter,
+ bool more)
+{
+ if (iov_iter_is_bvec(iter)) {
+ const struct bio_vec *bvec = &iter->bvec[0];
+ int flags = more ? MSG_MORE | MSG_SENDPAGE_NOTLAST : 0;
+
+ /* Do 0-copy instead of sendmsg */
+
+ return ceph_tcp_sendpage(sock, bvec->bv_page,
+ iter->iov_offset + bvec->bv_offset,
+ bvec->bv_len - iter->iov_offset,
+ flags);
+ } else {
+ struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
+ .msg_iter = *iter };
+ int r;
+
+ if (more)
+ msg.msg_flags |= MSG_MORE;
+ else
+ /* superfluous, but what the hell */
+ msg.msg_flags |= MSG_EOR;
+
+ r = sock_sendmsg(sock, &msg);
+ if (r == -EAGAIN)
+ r = 0;
+ return r;
+ }
+}
+
/*
* Shutdown/close the socket for the given connection.
*/
@@ -1086,12 +1138,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
}
/*
- * Message data is handled (sent or received) in pieces, where each
- * piece resides on a single page. The network layer might not
- * consume an entire piece at once. A data item's cursor keeps
- * track of which piece is next to process and how much remains to
- * be processed in that piece. It also tracks whether the current
- * piece is the last one in the data item.
+ * Message data is iterated (sent or received) by internal iov_iter.
*/
static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
{
@@ -1120,7 +1167,8 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
cursor->need_crc = true;
}
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+static void ceph_msg_data_cursor_init(unsigned int dir, struct ceph_msg *msg,
+ size_t length)
{
struct ceph_msg_data_cursor *cursor = &msg->cursor;
@@ -1130,33 +1178,33 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
cursor->total_resid = length;
cursor->data = msg->data;
+ cursor->direction = dir;
__ceph_msg_data_cursor_init(cursor);
}
/*
- * Return the page containing the next piece to process for a given
- * data item, and supply the page offset and length of that piece.
+ * Setups cursor->iter for the next piece to process.
*/
-static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
- size_t *page_offset, size_t *length)
+static void ceph_msg_data_next(struct ceph_msg_data_cursor *cursor)
{
struct page *page;
+ size_t off, len;
switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST:
- page = ceph_msg_data_pagelist_next(cursor, page_offset, length);
+ page = ceph_msg_data_pagelist_next(cursor, &off, &len);
break;
case CEPH_MSG_DATA_PAGES:
- page = ceph_msg_data_pages_next(cursor, page_offset, length);
+ page = ceph_msg_data_pages_next(cursor, &off, &len);
break;
#ifdef CONFIG_BLOCK
case CEPH_MSG_DATA_BIO:
- page = ceph_msg_data_bio_next(cursor, page_offset, length);
+ page = ceph_msg_data_bio_next(cursor, &off, &len);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_BVECS:
- page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+ page = ceph_msg_data_bvecs_next(cursor, &off, &len);
break;
case CEPH_MSG_DATA_NONE:
default:
@@ -1165,11 +1213,16 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
}
BUG_ON(!page);
- BUG_ON(*page_offset + *length > PAGE_SIZE);
- BUG_ON(!*length);
- BUG_ON(*length > cursor->resid);
+ BUG_ON(off + len > PAGE_SIZE);
+ BUG_ON(!len);
+ BUG_ON(len > cursor->resid);
+
+ cursor->it_bvec.bv_page = page;
+ cursor->it_bvec.bv_len = len;
+ cursor->it_bvec.bv_offset = off;
- return page;
+ iov_iter_bvec(&cursor->iter, cursor->direction,
+ &cursor->it_bvec, 1, len);
}
/*
@@ -1220,11 +1273,12 @@ static size_t sizeof_footer(struct ceph_connection *con)
sizeof(struct ceph_msg_footer_old);
}
-static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+static void prepare_message_data(unsigned int dir, struct ceph_msg *msg,
+ u32 data_len)
{
/* Initialize data cursor */
- ceph_msg_data_cursor_init(msg, (size_t)data_len);
+ ceph_msg_data_cursor_init(dir, msg, (size_t)data_len);
}
/*
@@ -1331,7 +1385,7 @@ static void prepare_write_message(struct ceph_connection *con)
/* is there a data payload? */
con->out_msg->footer.data_crc = 0;
if (m->data_length) {
- prepare_message_data(con->out_msg, m->data_length);
+ prepare_message_data(WRITE, con->out_msg, m->data_length);
con->out_more = 1; /* data + footer will follow */
} else {
/* no, queue up footer too and be done */
@@ -1532,16 +1586,19 @@ static int write_partial_kvec(struct ceph_connection *con)
return ret; /* done! */
}
-static u32 ceph_crc32c_page(u32 crc, struct page *page,
- unsigned int page_offset,
- unsigned int length)
+static int crc32c_kvec(struct kvec *vec, void *p)
{
- char *kaddr;
+ u32 *crc = p;
- kaddr = kmap(page);
- BUG_ON(kaddr == NULL);
- crc = crc32c(crc, kaddr + page_offset, length);
- kunmap(page);
+ *crc = crc32c(*crc, vec->iov_base, vec->iov_len);
+
+ return 0;
+}
+
+static u32 ceph_crc32c_iov(u32 crc, struct iov_iter *iter,
+ unsigned int length)
+{
+ iov_iter_for_each_range(iter, length, crc32c_kvec, &crc);
return crc;
}
@@ -1557,7 +1614,6 @@ static int write_partial_message_data(struct ceph_connection *con)
struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
- int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
u32 crc;
dout("%s %p msg %p\n", __func__, con, msg);
@@ -1575,9 +1631,6 @@ static int write_partial_message_data(struct ceph_connection *con)
*/
crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
while (cursor->total_resid) {
- struct page *page;
- size_t page_offset;
- size_t length;
int ret;
if (!cursor->resid) {
@@ -1585,11 +1638,8 @@ static int write_partial_message_data(struct ceph_connection *con)
continue;
}
- page = ceph_msg_data_next(cursor, &page_offset, &length);
- if (length == cursor->total_resid)
- more = MSG_MORE;
- ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
- more);
+ ceph_msg_data_next(cursor);
+ ret = ceph_tcp_sendiov(con->sock, &cursor->iter, true);
if (ret <= 0) {
if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
@@ -1597,7 +1647,7 @@ static int write_partial_message_data(struct ceph_connection *con)
return ret;
}
if (do_datacrc && cursor->need_crc)
- crc = ceph_crc32c_page(crc, page, page_offset, length);
+ crc = ceph_crc32c_iov(crc, &cursor->iter, ret);
ceph_msg_data_advance(cursor, (size_t)ret);
}
@@ -2315,9 +2365,6 @@ static int read_partial_msg_data(struct ceph_connection *con)
struct ceph_msg *msg = con->in_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
- struct page *page;
- size_t page_offset;
- size_t length;
u32 crc = 0;
int ret;
@@ -2332,8 +2379,8 @@ static int read_partial_msg_data(struct ceph_connection *con)
continue;
}
- page = ceph_msg_data_next(cursor, &page_offset, &length);
- ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+ ceph_msg_data_next(cursor);
+ ret = ceph_tcp_recviov(con->sock, &cursor->iter);
if (ret <= 0) {
if (do_datacrc)
con->in_data_crc = crc;
@@ -2342,7 +2389,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
}
if (do_datacrc)
- crc = ceph_crc32c_page(crc, page, page_offset, ret);
+ crc = ceph_crc32c_iov(crc, &cursor->iter, ret);
ceph_msg_data_advance(cursor, (size_t)ret);
}
if (do_datacrc)
@@ -2443,7 +2490,7 @@ static int read_partial_message(struct ceph_connection *con)
/* prepare for data payload, if any */
if (data_len)
- prepare_message_data(con->in_msg, data_len);
+ prepare_message_data(READ, con->in_msg, data_len);
}
/* front */
The first problem is performance. Why not to pass to read/write socket function the whole iov_iter and let socket API handle everything at once instead of doing IO page by page? So better to make data cursor as iov_iter, which is generic for many API calls. The second reason is the support of kvec in the future, i.e. we do not have a page in a hand, but a buffer. So this patch is a preparation, the first iteration: users of data cursor do not see pages, but use cursor->iter instead. Internally cursor still uses page. In next patches that will be avoided. We are still able to use sendpage() for 0-copy and have performance benefit from multi-pages, i.e. if bvec in iter is a multi-page, then we pass the whole multi-page to sendpage() and not only 4k. Important to mention that for sendpage() MSG_SENDPAGE_NOTLAST is always set if @more flag is true. We know that the footer of a message will follow, @more will be false and all data will be pushed out of the socket. Signed-off-by: Roman Penyaev <rpenyaev@suse.de> --- include/linux/ceph/messenger.h | 3 + net/ceph/messenger.c | 141 ++++++++++++++++++++++----------- 2 files changed, 97 insertions(+), 47 deletions(-)