diff mbox series

[net-next,18/35] rxrpc: Create a per-local endpoint receive queue and I/O thread

Message ID 166982741440.621383.4325041430555712070.stgit@warthog.procyon.org.uk (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Increasing SACK size and moving away from softirq, parts 2 & 3 | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count fail Series longer than 15 patches (and no cover letter)
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 7 this patch: 7
netdev/cc_maintainers warning 4 maintainers not CCed: edumazet@google.com davem@davemloft.net pabeni@redhat.com kuba@kernel.org
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 6 this patch: 6
netdev/checkpatch fail ERROR: do not use assignment in if condition WARNING: networking block comments don't use an empty /* line, use /* Comment... WARNING: quoted string split across lines
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

David Howells Nov. 30, 2022, 4:56 p.m. UTC
Create a per-local receive queue to which, in a future patch, all incoming
packets will be directed and an I/O thread that will process those packets
and perform all transmission of packets.

Destruction of the local endpoint is also moved from the local processor
work item (which will be absorbed) to the thread.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
---

 net/rxrpc/ar-internal.h  |   10 +++++++++
 net/rxrpc/io_thread.c    |   51 +++++++++++++++++++++++++++++++++++++++++++++-
 net/rxrpc/local_object.c |   39 ++++++++++++++++++++---------------
 net/rxrpc/proc.c         |   12 ++++++++---
 4 files changed, 91 insertions(+), 21 deletions(-)
diff mbox series

Patch

diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 523cc9c5ab12..de82c25956a6 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -110,6 +110,8 @@  struct rxrpc_net {
 	atomic_t		stat_rx_acks[256];
 
 	atomic_t		stat_why_req_ack[8];
+
+	atomic_t		stat_io_loop;
 };
 
 /*
@@ -280,12 +282,14 @@  struct rxrpc_local {
 	struct hlist_node	link;
 	struct socket		*socket;	/* my UDP socket */
 	struct work_struct	processor;
+	struct task_struct	*io_thread;
 	struct list_head	ack_tx_queue;	/* List of ACKs that need sending */
 	spinlock_t		ack_tx_lock;	/* ACK list lock */
 	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
+	struct sk_buff_head	rx_queue;	/* Received packets */
 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 	spinlock_t		lock;		/* access lock */
@@ -954,6 +958,11 @@  void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
  * io_thread.c
  */
 int rxrpc_input_packet(struct sock *, struct sk_buff *);
+int rxrpc_io_thread(void *data);
+static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
+{
+	wake_up_process(local->io_thread);
+}
 
 /*
  * insecure.c
@@ -984,6 +993,7 @@  void rxrpc_put_local(struct rxrpc_local *, enum rxrpc_local_trace);
 struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *, enum rxrpc_local_trace);
 void rxrpc_unuse_local(struct rxrpc_local *, enum rxrpc_local_trace);
 void rxrpc_queue_local(struct rxrpc_local *);
+void rxrpc_destroy_local(struct rxrpc_local *local);
 void rxrpc_destroy_all_locals(struct rxrpc_net *);
 
 static inline bool __rxrpc_unuse_local(struct rxrpc_local *local,
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index d2aaad5afa1d..0b3e096e3d50 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -1,7 +1,7 @@ 
 // SPDX-License-Identifier: GPL-2.0-or-later
 /* RxRPC packet reception
  *
- * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  */
 
@@ -368,3 +368,52 @@  int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	_leave(" [badmsg]");
 	return 0;
 }
+
+/*
+ * I/O and event handling thread.
+ */
+int rxrpc_io_thread(void *data)
+{
+	struct sk_buff_head rx_queue;
+	struct rxrpc_local *local = data;
+	struct sk_buff *skb;
+
+	skb_queue_head_init(&rx_queue);
+
+	set_user_nice(current, MIN_NICE);
+
+	for (;;) {
+		rxrpc_inc_stat(local->rxnet, stat_io_loop);
+
+		/* Process received packets and errors. */
+		if ((skb = __skb_dequeue(&rx_queue))) {
+			// TODO: Input packet
+			rxrpc_free_skb(skb, rxrpc_skb_put_input);
+			continue;
+		}
+
+		if (!skb_queue_empty(&local->rx_queue)) {
+			spin_lock_irq(&local->rx_queue.lock);
+			skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
+			spin_unlock_irq(&local->rx_queue.lock);
+			continue;
+		}
+
+		set_current_state(TASK_INTERRUPTIBLE);
+		if (!skb_queue_empty(&local->rx_queue)) {
+			__set_current_state(TASK_RUNNING);
+			continue;
+		}
+
+		if (kthread_should_stop())
+			break;
+		schedule();
+	}
+
+	__set_current_state(TASK_RUNNING);
+	rxrpc_see_local(local, rxrpc_local_stop);
+	rxrpc_destroy_local(local);
+	local->io_thread = NULL;
+	rxrpc_see_local(local, rxrpc_local_stopped);
+	return 0;
+}
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 1617ce651b9b..7c61349984e3 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -103,6 +103,7 @@  static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 		init_rwsem(&local->defrag_sem);
 		skb_queue_head_init(&local->reject_queue);
 		skb_queue_head_init(&local->event_queue);
+		skb_queue_head_init(&local->rx_queue);
 		local->client_bundles = RB_ROOT;
 		spin_lock_init(&local->client_bundles_lock);
 		spin_lock_init(&local->lock);
@@ -126,6 +127,7 @@  static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 	struct udp_tunnel_sock_cfg tuncfg = {NULL};
 	struct sockaddr_rxrpc *srx = &local->srx;
 	struct udp_port_cfg udp_conf = {0};
+	struct task_struct *io_thread;
 	struct sock *usk;
 	int ret;
 
@@ -185,8 +187,23 @@  static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		BUG();
 	}
 
+	io_thread = kthread_run(rxrpc_io_thread, local,
+				"krxrpcio/%u", ntohs(udp_conf.local_udp_port));
+	if (IS_ERR(io_thread)) {
+		ret = PTR_ERR(io_thread);
+		goto error_sock;
+	}
+
+	local->io_thread = io_thread;
 	_leave(" = 0");
 	return 0;
+
+error_sock:
+	kernel_sock_shutdown(local->socket, SHUT_RDWR);
+	local->socket->sk->sk_user_data = NULL;
+	sock_release(local->socket);
+	local->socket = NULL;
+	return ret;
 }
 
 /*
@@ -360,19 +377,8 @@  struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local,
  */
 void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
 {
-	unsigned int debug_id;
-	int r, u;
-
-	if (local) {
-		debug_id = local->debug_id;
-		r = refcount_read(&local->ref);
-		u = atomic_dec_return(&local->active_users);
-		trace_rxrpc_local(debug_id, why, r, u);
-		if (u == 0) {
-			rxrpc_get_local(local, rxrpc_local_get_queue);
-			rxrpc_queue_local(local);
-		}
-	}
+	if (local && __rxrpc_unuse_local(local, why))
+		kthread_stop(local->io_thread);
 }
 
 /*
@@ -382,7 +388,7 @@  void rxrpc_unuse_local(struct rxrpc_local *local, enum rxrpc_local_trace why)
  * Closing the socket cannot be done from bottom half context or RCU callback
  * context because it might sleep.
  */
-static void rxrpc_local_destroyer(struct rxrpc_local *local)
+void rxrpc_destroy_local(struct rxrpc_local *local)
 {
 	struct socket *socket = local->socket;
 	struct rxrpc_net *rxnet = local->rxnet;
@@ -411,6 +417,7 @@  static void rxrpc_local_destroyer(struct rxrpc_local *local)
 	 */
 	rxrpc_purge_queue(&local->reject_queue);
 	rxrpc_purge_queue(&local->event_queue);
+	rxrpc_purge_queue(&local->rx_queue);
 }
 
 /*
@@ -430,10 +437,8 @@  static void rxrpc_local_processor(struct work_struct *work)
 
 	do {
 		again = false;
-		if (!__rxrpc_use_local(local, rxrpc_local_use_work)) {
-			rxrpc_local_destroyer(local);
+		if (!__rxrpc_use_local(local, rxrpc_local_use_work))
 			break;
-		}
 
 		if (!list_empty(&local->ack_tx_queue)) {
 			rxrpc_transmit_ack_packets(local);
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index d3a6d24cf871..35d5b43c677e 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -342,7 +342,7 @@  static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
 	if (v == SEQ_START_TOKEN) {
 		seq_puts(seq,
 			 "Proto Local                                          "
-			 " Use Act\n");
+			 " Use Act RxQ\n");
 		return 0;
 	}
 
@@ -351,10 +351,11 @@  static int rxrpc_local_seq_show(struct seq_file *seq, void *v)
 	sprintf(lbuff, "%pISpc", &local->srx.transport);
 
 	seq_printf(seq,
-		   "UDP   %-47.47s %3u %3u\n",
+		   "UDP   %-47.47s %3u %3u %3u\n",
 		   lbuff,
 		   refcount_read(&local->ref),
-		   atomic_read(&local->active_users));
+		   atomic_read(&local->active_users),
+		   local->rx_queue.qlen);
 
 	return 0;
 }
@@ -463,6 +464,9 @@  int rxrpc_stats_show(struct seq_file *seq, void *v)
 		   "Buffers  : txb=%u rxb=%u\n",
 		   atomic_read(&rxrpc_nr_txbuf),
 		   atomic_read(&rxrpc_n_rx_skbs));
+	seq_printf(seq,
+		   "IO-thread: loops=%u\n",
+		   atomic_read(&rxnet->stat_io_loop));
 	return 0;
 }
 
@@ -492,5 +496,7 @@  int rxrpc_stats_clear(struct file *file, char *buf, size_t size)
 	memset(&rxnet->stat_rx_acks, 0, sizeof(rxnet->stat_rx_acks));
 
 	memset(&rxnet->stat_why_req_ack, 0, sizeof(rxnet->stat_why_req_ack));
+
+	atomic_set(&rxnet->stat_io_loop, 0);
 	return size;
 }