diff mbox series

[net,v2] rxrpc: Fix alteration of headers whilst zerocopy pending

Message ID 2181712.1739131675@warthog.procyon.org.uk (mailing list archive)
State Accepted
Commit 06ea2c9c4163b8a8fde890a9e21d1059f22bb76d
Delegated to: Netdev Maintainers
Headers show
Series [net,v2] rxrpc: Fix alteration of headers whilst zerocopy pending | expand

Checks

Context Check Description
netdev/series_format success Single patches do not need cover letters
netdev/tree_selection success Clearly marked for net
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag present in non-next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers success CCed 8 of 8 maintainers
netdev/build_clang success Errors and warnings before: 35 this patch: 35
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success Fixes tag looks correct
netdev/build_allmodconfig_warn success Errors and warnings before: 3 this patch: 3
netdev/checkpatch warning WARNING: line length of 85 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 2 this patch: 2
netdev/source_inline success Was 0 now: 0
netdev/contest success net-next-2025-02-11--00-00 (tests: 889)

Commit Message

David Howells Feb. 9, 2025, 8:07 p.m. UTC
rxrpc: Fix alteration of headers whilst zerocopy pending

AF_RXRPC now uses MSG_SPLICE_PAGES to do zerocopy of the DATA packets when
it transmits them, but to reduce the number of descriptors required in the
DMA ring, it allocates a space for the protocol header in the memory
immediately before the data content so that it can include both in a single
descriptor.  This is used for either the main RX header or the smaller
jumbo subpacket header as appropriate:

  +----+------+
  | RX |      |
  +-+--+DATA  |
    |JH|      |
    +--+------+

Now, when it stitches a large jumbo packet together from a number of
individual DATA packets (each of which is 1412 bytes of data), it uses the
full RX header from the first and then the jumbo subpacket header for the
rest of the components:

  +---+--+------+--+------+--+------+--+------+--+------+--+------+
  |UDP|RX|DATA  |JH|DATA  |JH|DATA  |JH|DATA  |JH|DATA  |JH|DATA  |
  +---+--+------+--+------+--+------+--+------+--+------+--+------+

As mentioned, the main RX header and the jumbo header overlay one another
in memory and the formats don't match, so switching from one to the other
means rearranging the fields and adjusting the flags.

However, now that TLP has been included, it wants to retransmit the last
subpacket as a new data packet on its own, which means switching between
the header formats... and if the transmission is still pending, because of
the MSG_SPLICE_PAGES, we end up corrupting the jumbo subheader.

This has a variety of effects, with the RX service number overwriting the
jumbo checksum/key number field and the RX checksum overwriting the jumbo
flags - resulting in, at the very least, a confused connection-level abort
from the peer.

Fix this by leaving the jumbo header in the allocation with the data, but
allocating the RX header from the page frag allocator and concocting it on
the fly at the point of transmission as it does for ACK packets.

Fixes: 7c482665931b ("rxrpc: Implement RACK/TLP to deal with transmission stalls [RFC8985]")
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: Jakub Kicinski <kuba@kernel.org>
cc: "David S. Miller" <davem@davemloft.net>
cc: Chuck Lever <chuck.lever@oracle.com>
cc: Eric Dumazet <edumazet@google.com>
cc: Paolo Abeni <pabeni@redhat.com>
cc: Simon Horman <horms@kernel.org>
cc: linux-afs@lists.infradead.org
cc: netdev@vger.kernel.org
---
Changes
=======
ver #2)
 - Fixed the Fixes tag
 - Removed no-longer-used variable.

 net/rxrpc/ar-internal.h |    7 ++----
 net/rxrpc/output.c      |   50 +++++++++++++++++++++++++++++++++---------------
 net/rxrpc/rxkad.c       |   13 ++++--------
 net/rxrpc/sendmsg.c     |    4 ---
 net/rxrpc/txbuf.c       |   37 +++++++++--------------------------
 5 files changed, 54 insertions(+), 57 deletions(-)

Comments

patchwork-bot+netdevbpf@kernel.org Feb. 12, 2025, 1:50 a.m. UTC | #1
Hello:

This patch was applied to netdev/net.git (main)
by Jakub Kicinski <kuba@kernel.org>:

On Sun, 09 Feb 2025 20:07:55 +0000 you wrote:
> rxrpc: Fix alteration of headers whilst zerocopy pending
> 
> AF_RXRPC now uses MSG_SPLICE_PAGES to do zerocopy of the DATA packets when
> it transmits them, but to reduce the number of descriptors required in the
> DMA ring, it allocates a space for the protocol header in the memory
> immediately before the data content so that it can include both in a single
> descriptor.  This is used for either the main RX header or the smaller
> jumbo subpacket header as appropriate:
> 
> [...]

Here is the summary with links:
  - [net,v2] rxrpc: Fix alteration of headers whilst zerocopy pending
    https://git.kernel.org/netdev/net/c/06ea2c9c4163

You are awesome, thank you!
diff mbox series

Patch

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index f251845fe532..5e740c486203 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -327,8 +327,8 @@  struct rxrpc_local {
 	 * packet with a maximum set of jumbo subpackets or a PING ACK padded
 	 * out to 64K with zeropages for PMTUD.
 	 */
-	struct kvec		kvec[RXRPC_MAX_NR_JUMBO > 3 + 16 ?
-				     RXRPC_MAX_NR_JUMBO : 3 + 16];
+	struct kvec		kvec[1 + RXRPC_MAX_NR_JUMBO > 3 + 16 ?
+				     1 + RXRPC_MAX_NR_JUMBO : 3 + 16];
 };
 
 /*
@@ -874,8 +874,7 @@  struct rxrpc_txbuf {
 #define RXRPC_TXBUF_RESENT	0x100		/* Set if has been resent */
 	__be16			cksum;		/* Checksum to go in header */
 	bool			jumboable;	/* Can be non-terminal jumbo subpacket */
-	u8			nr_kvec;	/* Amount of kvec[] used */
-	struct kvec		kvec[1];
+	void			*data;		/* Data with preceding jumbo header */
 };
 
 static inline bool rxrpc_sending_to_server(const struct rxrpc_txbuf *txb)
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 6f7a125d6e90..95905b85a8d7 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -428,13 +428,13 @@  int rxrpc_send_abort_packet(struct rxrpc_call *call)
 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
 					   struct rxrpc_send_data_req *req,
 					   struct rxrpc_txbuf *txb,
+					   struct rxrpc_wire_header *whdr,
 					   rxrpc_serial_t serial, int subpkt)
 {
-	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
-	struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
+	struct rxrpc_jumbo_header *jumbo = txb->data - sizeof(*jumbo);
 	enum rxrpc_req_ack_trace why;
 	struct rxrpc_connection *conn = call->conn;
-	struct kvec *kv = &call->local->kvec[subpkt];
+	struct kvec *kv = &call->local->kvec[1 + subpkt];
 	size_t len = txb->pkt_len;
 	bool last;
 	u8 flags;
@@ -491,18 +491,15 @@  static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
 	}
 dont_set_request_ack:
 
-	/* The jumbo header overlays the wire header in the txbuf. */
+	/* There's a jumbo header prepended to the data if we need it. */
 	if (subpkt < req->n - 1)
 		flags |= RXRPC_JUMBO_PACKET;
 	else
 		flags &= ~RXRPC_JUMBO_PACKET;
 	if (subpkt == 0) {
 		whdr->flags	= flags;
-		whdr->serial	= htonl(txb->serial);
 		whdr->cksum	= txb->cksum;
-		whdr->serviceId	= htons(conn->service_id);
-		kv->iov_base	= whdr;
-		len += sizeof(*whdr);
+		kv->iov_base	= txb->data;
 	} else {
 		jumbo->flags	= flags;
 		jumbo->pad	= 0;
@@ -535,7 +532,9 @@  static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
 /*
  * Prepare a (jumbo) packet for transmission.
  */
-static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
+static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call,
+					struct rxrpc_send_data_req *req,
+					struct rxrpc_wire_header *whdr)
 {
 	struct rxrpc_txqueue *tq = req->tq;
 	rxrpc_serial_t serial;
@@ -549,6 +548,18 @@  static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
 	/* Each transmission of a Tx packet needs a new serial number */
 	serial = rxrpc_get_next_serials(call->conn, req->n);
 
+	whdr->epoch		= htonl(call->conn->proto.epoch);
+	whdr->cid		= htonl(call->cid);
+	whdr->callNumber	= htonl(call->call_id);
+	whdr->seq		= htonl(seq);
+	whdr->serial		= htonl(serial);
+	whdr->type		= RXRPC_PACKET_TYPE_DATA;
+	whdr->flags		= 0;
+	whdr->userStatus	= 0;
+	whdr->securityIndex	= call->security_ix;
+	whdr->_rsvd		= 0;
+	whdr->serviceId		= htons(call->conn->service_id);
+
 	call->tx_last_serial = serial + req->n - 1;
 	call->tx_last_sent = req->now;
 	xmit_ts = rxrpc_prepare_txqueue(tq, req);
@@ -576,7 +587,7 @@  static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
 		if (i + 1 == req->n)
 			/* Only sample the last subpacket in a jumbo. */
 			__set_bit(ix, &tq->rtt_samples);
-		len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
+		len += rxrpc_prepare_data_subpacket(call, req, txb, whdr, serial, i);
 		serial++;
 		seq++;
 		i++;
@@ -618,6 +629,7 @@  static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
 	}
 
 	rxrpc_set_keepalive(call, req->now);
+	page_frag_free(whdr);
 	return len;
 }
 
@@ -626,25 +638,33 @@  static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_se
  */
 void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
 {
+	struct rxrpc_wire_header *whdr;
 	struct rxrpc_connection *conn = call->conn;
 	enum rxrpc_tx_point frag;
 	struct rxrpc_txqueue *tq = req->tq;
 	struct rxrpc_txbuf *txb;
 	struct msghdr msg;
 	rxrpc_seq_t seq = req->seq;
-	size_t len;
+	size_t len = sizeof(*whdr);
 	bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
 	int ret, stat_ix;
 
 	_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
 
+	whdr = page_frag_alloc(&call->local->tx_alloc, sizeof(*whdr), GFP_NOFS);
+	if (!whdr)
+		return; /* Drop the packet if no memory. */
+
+	call->local->kvec[0].iov_base = whdr;
+	call->local->kvec[0].iov_len = sizeof(*whdr);
+
 	stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1;
 	atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]);
 
-	len = rxrpc_prepare_data_packet(call, req);
+	len += rxrpc_prepare_data_packet(call, req, whdr);
 	txb = tq->bufs[seq & RXRPC_TXQ_MASK];
 
-	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);
+	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1 + req->n, len);
 
 	msg.msg_name	= &call->peer->srx.transport;
 	msg.msg_namelen	= call->peer->srx.transport_len;
@@ -695,13 +715,13 @@  void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req
 
 	if (ret == -EMSGSIZE) {
 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize);
-		trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
+		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
 		ret = 0;
 	} else if (ret < 0) {
 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
 		trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
 	} else {
-		trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
+		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
 	}
 
 	rxrpc_tx_backoff(call, ret);
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index 62b09d23ec08..6cb37b0eb77f 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -257,8 +257,7 @@  static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
 				    struct rxrpc_txbuf *txb,
 				    struct skcipher_request *req)
 {
-	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
-	struct rxkad_level1_hdr *hdr = (void *)(whdr + 1);
+	struct rxkad_level1_hdr *hdr = txb->data;
 	struct rxrpc_crypt iv;
 	struct scatterlist sg;
 	size_t pad;
@@ -274,7 +273,7 @@  static int rxkad_secure_packet_auth(const struct rxrpc_call *call,
 	pad = RXKAD_ALIGN - pad;
 	pad &= RXKAD_ALIGN - 1;
 	if (pad) {
-		memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
+		memset(txb->data + txb->offset, 0, pad);
 		txb->pkt_len += pad;
 	}
 
@@ -300,8 +299,7 @@  static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
 				       struct skcipher_request *req)
 {
 	const struct rxrpc_key_token *token;
-	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
-	struct rxkad_level2_hdr *rxkhdr = (void *)(whdr + 1);
+	struct rxkad_level2_hdr *rxkhdr = txb->data;
 	struct rxrpc_crypt iv;
 	struct scatterlist sg;
 	size_t content, pad;
@@ -319,7 +317,7 @@  static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
 	txb->pkt_len = round_up(content, RXKAD_ALIGN);
 	pad = txb->pkt_len - content;
 	if (pad)
-		memset(txb->kvec[0].iov_base + txb->offset, 0, pad);
+		memset(txb->data + txb->offset, 0, pad);
 
 	/* encrypt from the session key */
 	token = call->conn->key->payload.data[0];
@@ -407,9 +405,8 @@  static int rxkad_secure_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
 
 	/* Clear excess space in the packet */
 	if (txb->pkt_len < txb->alloc_size) {
-		struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
 		size_t gap = txb->alloc_size - txb->pkt_len;
-		void *p = whdr + 1;
+		void *p = txb->data;
 
 		memset(p + txb->pkt_len, 0, gap);
 	}
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 584397aba4a0..84dc6c94f23b 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -419,7 +419,7 @@  static int rxrpc_send_data(struct rxrpc_sock *rx,
 			size_t copy = umin(txb->space, msg_data_left(msg));
 
 			_debug("add %zu", copy);
-			if (!copy_from_iter_full(txb->kvec[0].iov_base + txb->offset,
+			if (!copy_from_iter_full(txb->data + txb->offset,
 						 copy, &msg->msg_iter))
 				goto efault;
 			_debug("added");
@@ -445,8 +445,6 @@  static int rxrpc_send_data(struct rxrpc_sock *rx,
 			ret = call->security->secure_packet(call, txb);
 			if (ret < 0)
 				goto out;
-
-			txb->kvec[0].iov_len += txb->len;
 			rxrpc_queue_packet(rx, call, txb, notify_end_tx);
 			txb = NULL;
 		}
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index 131d9e55c8e9..c550991d48fa 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -19,17 +19,19 @@  atomic_t rxrpc_nr_txbuf;
 struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
 					   size_t data_align, gfp_t gfp)
 {
-	struct rxrpc_wire_header *whdr;
 	struct rxrpc_txbuf *txb;
-	size_t total, hoff;
+	size_t total, doff, jsize = sizeof(struct rxrpc_jumbo_header);
 	void *buf;
 
 	txb = kzalloc(sizeof(*txb), gfp);
 	if (!txb)
 		return NULL;
 
-	hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr);
-	total = hoff + sizeof(*whdr) + data_size;
+	/* We put a jumbo header in the buffer, but not a full wire header to
+	 * avoid delayed-corruption problems with zerocopy.
+	 */
+	doff = round_up(jsize, data_align);
+	total = doff + data_size;
 
 	data_align = umax(data_align, L1_CACHE_BYTES);
 	mutex_lock(&call->conn->tx_data_alloc_lock);
@@ -41,30 +43,15 @@  struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_
 		return NULL;
 	}
 
-	whdr = buf + hoff;
-
 	refcount_set(&txb->ref, 1);
 	txb->call_debug_id	= call->debug_id;
 	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
 	txb->alloc_size		= data_size;
 	txb->space		= data_size;
-	txb->offset		= sizeof(*whdr);
+	txb->offset		= 0;
 	txb->flags		= call->conn->out_clientflag;
 	txb->seq		= call->send_top + 1;
-	txb->nr_kvec		= 1;
-	txb->kvec[0].iov_base	= whdr;
-	txb->kvec[0].iov_len	= sizeof(*whdr);
-
-	whdr->epoch		= htonl(call->conn->proto.epoch);
-	whdr->cid		= htonl(call->cid);
-	whdr->callNumber	= htonl(call->call_id);
-	whdr->seq		= htonl(txb->seq);
-	whdr->type		= RXRPC_PACKET_TYPE_DATA;
-	whdr->flags		= 0;
-	whdr->userStatus	= 0;
-	whdr->securityIndex	= call->security_ix;
-	whdr->_rsvd		= 0;
-	whdr->serviceId		= htons(call->dest_srx.srx_service);
+	txb->data		= buf + doff;
 
 	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
 			  rxrpc_txbuf_alloc_data);
@@ -90,14 +77,10 @@  void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
 
 static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb)
 {
-	int i;
-
 	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
 			  rxrpc_txbuf_free);
-	for (i = 0; i < txb->nr_kvec; i++)
-		if (txb->kvec[i].iov_base &&
-		    !is_zero_pfn(page_to_pfn(virt_to_page(txb->kvec[i].iov_base))))
-			page_frag_free(txb->kvec[i].iov_base);
+	if (txb->data)
+		page_frag_free(txb->data);
 	kfree(txb);
 	atomic_dec(&rxrpc_nr_txbuf);
 }