diff mbox series

[RFC,25/28] rds: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage

Message ID 20230316152618.711970-26-dhowells@redhat.com (mailing list archive)
State Mainlined, archived
Headers show
Series splice, net: Replace sendpage with sendmsg(MSG_SPLICE_PAGES) | expand

Commit Message

David Howells March 16, 2023, 3:26 p.m. UTC
When transmitting data, call down into TCP using a single sendmsg with
MSG_SPLICE_PAGES to indicate that content should be spliced rather than
performing several sendmsg and sendpage calls to transmit header and data
pages.

To make this work, the data is assembled in a bio_vec array and attached to
a BVEC-type iterator.  The header are copied into memory acquired from
zcopy_alloc() which just breaks a page up into small pieces that can be
freed with put_page().

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Santosh Shilimkar <santosh.shilimkar@oracle.com>
cc: "David S. Miller" <davem@davemloft.net>
cc: Eric Dumazet <edumazet@google.com>
cc: Jakub Kicinski <kuba@kernel.org>
cc: Paolo Abeni <pabeni@redhat.com>
cc: Jens Axboe <axboe@kernel.dk>
cc: Matthew Wilcox <willy@infradead.org>
cc: linux-rdma@vger.kernel.org
cc: rds-devel@oss.oracle.com
cc: netdev@vger.kernel.org
---
 net/rds/tcp_send.c | 80 ++++++++++++++++++++--------------------------
 1 file changed, 35 insertions(+), 45 deletions(-)
diff mbox series

Patch

diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 8c4d1d6e9249..0d6eb85a930d 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -32,6 +32,7 @@ 
  */
 #include <linux/kernel.h>
 #include <linux/in.h>
+#include <linux/zcopy_alloc.h>
 #include <net/tcp.h>
 
 #include "rds_single_path.h"
@@ -52,29 +53,24 @@  void rds_tcp_xmit_path_complete(struct rds_conn_path *cp)
 	tcp_sock_set_cork(tc->t_sock->sk, false);
 }
 
-/* the core send_sem serializes this with other xmit and shutdown */
-static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
-{
-	struct kvec vec = {
-		.iov_base = data,
-		.iov_len = len,
-	};
-	struct msghdr msg = {
-		.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
-	};
-
-	return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
-}
-
 /* the core send_sem serializes this with other xmit and shutdown */
 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		 unsigned int hdr_off, unsigned int sg, unsigned int off)
 {
 	struct rds_conn_path *cp = rm->m_inc.i_conn_path;
 	struct rds_tcp_connection *tc = cp->cp_transport_data;
+	struct msghdr msg = {
+		.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
+	};
+	struct bio_vec *bvec;
+	unsigned int i, size = 0, ix = 0;
+	bool free_hdr = false;
 	int done = 0;
-	int ret = 0;
-	int more;
+	int ret = -ENOMEM;
+
+	bvec = kmalloc_array(1 + sg, sizeof(struct bio_vec), GFP_KERNEL);
+	if (!bvec)
+		goto out;
 
 	if (hdr_off == 0) {
 		/*
@@ -101,41 +97,30 @@  int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		/* see rds_tcp_write_space() */
 		set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
 
-		ret = rds_tcp_sendmsg(tc->t_sock,
-				      (void *)&rm->m_inc.i_hdr + hdr_off,
-				      sizeof(rm->m_inc.i_hdr) - hdr_off);
+		ret = zcopy_memdup(sizeof(rm->m_inc.i_hdr) - hdr_off,
+				   (void *)&rm->m_inc.i_hdr + hdr_off,
+				   &bvec[ix], GFP_KERNEL);
 		if (ret < 0)
 			goto out;
-		done += ret;
-		if (hdr_off + done != sizeof(struct rds_header))
-			goto out;
+		free_hdr = true;
+		size += bvec[ix].bv_len;
+		ix++;
 	}
 
-	more = rm->data.op_nents > 1 ? (MSG_MORE | MSG_SENDPAGE_NOTLAST) : 0;
-	while (sg < rm->data.op_nents) {
-		int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-
-		ret = tc->t_sock->ops->sendpage(tc->t_sock,
-						sg_page(&rm->data.op_sg[sg]),
-						rm->data.op_sg[sg].offset + off,
-						rm->data.op_sg[sg].length - off,
-						flags);
-		rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->data.op_sg[sg]),
-			 rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off,
-			 ret);
-		if (ret <= 0)
-			break;
-
-		off += ret;
-		done += ret;
-		if (off == rm->data.op_sg[sg].length) {
-			off = 0;
-			sg++;
-		}
-		if (sg == rm->data.op_nents - 1)
-			more = 0;
+	for (i = sg; i < rm->data.op_nents; i++) {
+		bvec_set_page(&bvec[ix],
+			      sg_page(&rm->data.op_sg[i]),
+			      rm->data.op_sg[i].length - off,
+			      rm->data.op_sg[i].offset + off);
+		off = 0;
+		size += bvec[ix].bv_len;
+		ix++;
 	}
 
+	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, bvec, ix, size);
+	ret = sock_sendmsg(tc->t_sock, &msg);
+	rdsdebug("tcp sendmsg-splice %u,%u ret %d\n", ix, size, ret);
+
 out:
 	if (ret <= 0) {
 		/* write_space will hit after EAGAIN, all else fatal */
@@ -158,6 +143,11 @@  int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 	}
 	if (done == 0)
 		done = ret;
+	if (bvec) {
+		if (free_hdr)
+			put_page(bvec[0].bv_page);
+		kfree(bvec);
+	}
 	return done;
 }