diff mbox series

[net-next,20/36] rxrpc: Move packet reception processing into I/O thread

Message ID 166994027315.1732290.3016573765287552450.stgit@warthog.procyon.org.uk (mailing list archive)
State Accepted
Commit 446b3e14525b477e441a6bb8ce56cea12512acc2
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Increasing SACK size and moving away from softirq, parts 2 & 3 | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count fail Series longer than 15 patches (and no cover letter)
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 4 this patch: 4
netdev/cc_maintainers warning 4 maintainers not CCed: edumazet@google.com davem@davemloft.net pabeni@redhat.com kuba@kernel.org
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 3 this patch: 3
netdev/checkpatch warning WARNING: function definition argument 'struct sk_buff *' should also have an identifier name WARNING: function definition argument 'struct sock *' should also have an identifier name WARNING: line length of 93 exceeds 80 columns WARNING: networking block comments don't use an empty /* line, use /* Comment...
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

David Howells Dec. 2, 2022, 12:17 a.m. UTC
Split the packet input handler to make the softirq side just dump the
received packet into the local endpoint receive queue and then call the
remainder of the input function from the I/O thread.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
---

 net/rxrpc/ar-internal.h  |    3 ++
 net/rxrpc/call_event.c   |    4 ++-
 net/rxrpc/call_object.c  |    2 +-
 net/rxrpc/io_thread.c    |   61 +++++++++++++++++++++++++++++++---------------
 net/rxrpc/local_object.c |    2 +-
 5 files changed, 47 insertions(+), 25 deletions(-)
diff mbox series

Patch

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index de82c25956a6..044815ba2b49 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -36,6 +36,7 @@  struct rxrpc_txbuf;
  * to pass supplementary information.
  */
 enum rxrpc_skb_mark {
+	RXRPC_SKB_MARK_PACKET,		/* Received packet */
 	RXRPC_SKB_MARK_REJECT_BUSY,	/* Reject with BUSY */
 	RXRPC_SKB_MARK_REJECT_ABORT,	/* Reject with ABORT (code in skb->priority) */
 };
@@ -957,7 +958,7 @@  void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
 /*
  * io_thread.c
  */
-int rxrpc_input_packet(struct sock *, struct sk_buff *);
+int rxrpc_encap_rcv(struct sock *, struct sk_buff *);
 int rxrpc_io_thread(void *data);
 static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
 {
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 049b92b1c040..3925b55e2064 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -83,7 +83,7 @@  void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
 	rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
 
 	txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK,
-				in_softirq() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
+				rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS);
 	if (!txb) {
 		kleave(" = -ENOMEM");
 		return;
@@ -111,7 +111,7 @@  void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
 	spin_unlock_bh(&local->ack_tx_lock);
 	trace_rxrpc_send_ack(call, why, ack_reason, serial);
 
-	if (in_task()) {
+	if (!rcu_read_lock_held()) {
 		rxrpc_transmit_ack_packets(call->peer->local);
 	} else {
 		rxrpc_get_local(local, rxrpc_local_get_queue);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 9cd7e0190ef4..57c8d4cc900a 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -632,7 +632,7 @@  void rxrpc_cleanup_call(struct rxrpc_call *call)
 	del_timer_sync(&call->timer);
 	cancel_work(&call->processor);
 
-	if (in_softirq() || work_busy(&call->processor))
+	if (rcu_read_lock_held() || work_busy(&call->processor))
 		/* Can't use the rxrpc workqueue as we need to cancel/flush
 		 * something that may be running/waiting there.
 		 */
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index 0b3e096e3d50..ee2e36c46ae2 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -9,6 +9,34 @@ 
 
 #include "ar-internal.h"
 
+/*
+ * handle data received on the local endpoint
+ * - may be called in interrupt context
+ *
+ * [!] Note that as this is called from the encap_rcv hook, the socket is not
+ * held locked by the caller and nothing prevents sk_user_data on the UDP from
+ * being cleared in the middle of processing this function.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
+ */
+int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
+{
+	struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
+
+	if (unlikely(!local)) {
+		kfree_skb(skb);
+		return 0;
+	}
+	if (skb->tstamp == 0)
+		skb->tstamp = ktime_get_real();
+
+	skb->mark = RXRPC_SKB_MARK_PACKET;
+	rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
+	skb_queue_tail(&local->rx_queue, skb);
+	rxrpc_wake_up_io_thread(local);
+	return 0;
+}
+
 /*
  * post connection-level events to the connection
  * - this includes challenges, responses, some aborts and call terminal packet
@@ -98,18 +126,10 @@  static bool rxrpc_extract_abort(struct sk_buff *skb)
 }
 
 /*
- * handle data received on the local endpoint
- * - may be called in interrupt context
- *
- * [!] Note that as this is called from the encap_rcv hook, the socket is not
- * held locked by the caller and nothing prevents sk_user_data on the UDP from
- * being cleared in the middle of processing this function.
- *
- * Called with the RCU read lock held from the IP layer via UDP.
+ * Process packets received on the local endpoint
  */
-int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
+static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
 {
-	struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
 	struct rxrpc_connection *conn;
 	struct rxrpc_channel *chan;
 	struct rxrpc_call *call = NULL;
@@ -118,17 +138,9 @@  int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	struct rxrpc_sock *rx = NULL;
 	unsigned int channel;
 
-	_enter("%p", udp_sk);
-
-	if (unlikely(!local)) {
-		kfree_skb(skb);
-		return 0;
-	}
 	if (skb->tstamp == 0)
 		skb->tstamp = ktime_get_real();
 
-	rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
-
 	skb_pull(skb, sizeof(struct udphdr));
 
 	/* The UDP protocol already released all skb resources;
@@ -387,8 +399,17 @@  int rxrpc_io_thread(void *data)
 
 		/* Process received packets and errors. */
 		if ((skb = __skb_dequeue(&rx_queue))) {
-			// TODO: Input packet
-			rxrpc_free_skb(skb, rxrpc_skb_put_input);
+			switch (skb->mark) {
+			case RXRPC_SKB_MARK_PACKET:
+				rcu_read_lock();
+				rxrpc_input_packet(local, skb);
+				rcu_read_unlock();
+				break;
+			default:
+				WARN_ON_ONCE(1);
+				rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
+				break;
+			}
 			continue;
 		}
 
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 7c61349984e3..6b4d77219f36 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -154,7 +154,7 @@  static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 	}
 
 	tuncfg.encap_type = UDP_ENCAP_RXRPC;
-	tuncfg.encap_rcv = rxrpc_input_packet;
+	tuncfg.encap_rcv = rxrpc_encap_rcv;
 	tuncfg.encap_err_rcv = rxrpc_encap_err_rcv;
 	tuncfg.sk_user_data = local;
 	setup_udp_tunnel_sock(net, local->socket, &tuncfg);