diff mbox series

[net-next,25/35] rxrpc: Move DATA transmission into call processor work item

Message ID 166982747419.621383.2996927769382062225.stgit@warthog.procyon.org.uk (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series rxrpc: Increasing SACK size and moving away from softirq, parts 2 & 3 | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count fail Series longer than 15 patches (and no cover letter)
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 7 this patch: 7
netdev/cc_maintainers warning 6 maintainers not CCed: edumazet@google.com davem@davemloft.net rostedt@goodmis.org kuba@kernel.org pabeni@redhat.com mhiramat@kernel.org
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 6 this patch: 6
netdev/checkpatch fail ERROR: space prohibited before that close parenthesis ')' WARNING: line length of 81 exceeds 80 columns WARNING: line length of 82 exceeds 80 columns WARNING: line length of 88 exceeds 80 columns WARNING: memory barrier without comment WARNING: networking block comments don't use an empty /* line, use /* Comment...
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline fail Was 0 now: 1

Commit Message

David Howells Nov. 30, 2022, 4:57 p.m. UTC
Move DATA transmission into the call processor work item.  In a future
patch, this will be called from the I/O thread rather than being itsown
work item.

This will allow DATA transmission to be driven directly by incoming ACKs,
pokes and timers as those are processed.

The Tx queue is also split: The queue of packets prepared by sendmsg is now
places in call->tx_sendmsg and the packet dispatcher decants the packets
into call->tx_buffer as space becomes available in the transmission
window.  This allows sendmsg to run ahead of the available space to try and
prevent an underflow in transmission.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
---

 include/trace/events/rxrpc.h |    6 +++
 net/rxrpc/ar-internal.h      |    5 ++-
 net/rxrpc/call_event.c       |   83 +++++++++++++++++++++++++++++++++++++++---
 net/rxrpc/call_object.c      |    6 +++
 net/rxrpc/output.c           |   48 ++++++++++++++++++++++++
 net/rxrpc/sendmsg.c          |   83 +++++++-----------------------------------
 net/rxrpc/txbuf.c            |   10 ++++-
 7 files changed, 161 insertions(+), 80 deletions(-)
diff mbox series

Patch

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 8bd48358f757..c3043fbea0e6 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -183,6 +183,7 @@ 
 	EM(rxrpc_call_queue_requeue,		"QUE requeue ") \
 	EM(rxrpc_call_queue_resend,		"QUE resend  ") \
 	EM(rxrpc_call_queue_timer,		"QUE timer   ") \
+	EM(rxrpc_call_queue_tx_data,		"QUE tx-data ") \
 	EM(rxrpc_call_see_accept,		"SEE accept  ") \
 	EM(rxrpc_call_see_activate_client,	"SEE act-clnt") \
 	EM(rxrpc_call_see_connect_failed,	"SEE con-fail") \
@@ -738,6 +739,7 @@  TRACE_EVENT(rxrpc_txqueue,
 		    __field(rxrpc_seq_t,		acks_hard_ack	)
 		    __field(rxrpc_seq_t,		tx_bottom	)
 		    __field(rxrpc_seq_t,		tx_top		)
+		    __field(rxrpc_seq_t,		tx_prepared	)
 		    __field(int,			tx_winsize	)
 			     ),
 
@@ -747,16 +749,18 @@  TRACE_EVENT(rxrpc_txqueue,
 		    __entry->acks_hard_ack = call->acks_hard_ack;
 		    __entry->tx_bottom = call->tx_bottom;
 		    __entry->tx_top = call->tx_top;
+		    __entry->tx_prepared = call->tx_prepared;
 		    __entry->tx_winsize = call->tx_winsize;
 			   ),
 
-	    TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u",
+	    TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u/%u",
 		      __entry->call,
 		      __print_symbolic(__entry->why, rxrpc_txqueue_traces),
 		      __entry->tx_bottom,
 		      __entry->acks_hard_ack,
 		      __entry->tx_top - __entry->tx_bottom,
 		      __entry->tx_top - __entry->acks_hard_ack,
+		      __entry->tx_prepared - __entry->tx_bottom,
 		      __entry->tx_winsize)
 	    );
 
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 3bd6a5eb2fb7..6af7298af39b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -646,9 +646,11 @@  struct rxrpc_call {
 
 	/* Transmitted data tracking. */
 	spinlock_t		tx_lock;	/* Transmit queue lock */
+	struct list_head	tx_sendmsg;	/* Sendmsg prepared packets */
 	struct list_head	tx_buffer;	/* Buffer of transmissible packets */
 	rxrpc_seq_t		tx_bottom;	/* First packet in buffer */
 	rxrpc_seq_t		tx_transmitted;	/* Highest packet transmitted */
+	rxrpc_seq_t		tx_prepared;	/* Highest Tx slot prepared. */
 	rxrpc_seq_t		tx_top;		/* Highest Tx slot allocated. */
 	u16			tx_backoff;	/* Delay to insert due to Tx failure */
 	u8			tx_winsize;	/* Maximum size of Tx window */
@@ -766,7 +768,7 @@  struct rxrpc_send_params {
  */
 struct rxrpc_txbuf {
 	struct rcu_head		rcu;
-	struct list_head	call_link;	/* Link in call->tx_queue */
+	struct list_head	call_link;	/* Link in call->tx_sendmsg/tx_buffer */
 	struct list_head	tx_link;	/* Link in live Enc queue or Tx queue */
 	struct rxrpc_call	*call;		/* Call to which belongs */
 	ktime_t			last_sent;	/* Time at which last transmitted */
@@ -1067,6 +1069,7 @@  int rxrpc_send_abort_packet(struct rxrpc_call *);
 int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
 void rxrpc_reject_packets(struct rxrpc_local *);
 void rxrpc_send_keepalive(struct rxrpc_peer *);
+void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
 
 /*
  * peer_event.c
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 3925b55e2064..c9f835292f7b 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -291,6 +291,72 @@  static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
 	_leave("");
 }
 
+static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
+{
+	unsigned int winsize = min_t(unsigned int, call->tx_winsize,
+				     call->cong_cwnd + call->cong_extra);
+	rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize;
+	rxrpc_seq_t tx_top = call->tx_top;
+	int space;
+
+	space = wtop - tx_top;
+	return space > 0;
+}
+
+/*
+ * Decant some if the sendmsg prepared queue into the transmission buffer.
+ */
+static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+{
+	struct rxrpc_txbuf *txb;
+
+	if (rxrpc_is_client_call(call) &&
+	    !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+		rxrpc_expose_client_call(call);
+
+	while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
+					       struct rxrpc_txbuf, call_link))) {
+		spin_lock(&call->tx_lock);
+		list_del(&txb->call_link);
+		spin_unlock(&call->tx_lock);
+
+		call->tx_top = txb->seq;
+		list_add_tail(&txb->call_link, &call->tx_buffer);
+
+		rxrpc_transmit_one(call, txb);
+
+		// TODO: Drain the transmission buffers.  Do this somewhere better
+		if (after(call->acks_hard_ack, call->tx_bottom + 16))
+			rxrpc_shrink_call_tx_buffer(call);
+
+		if (!rxrpc_tx_window_has_space(call))
+			break;
+	}
+}
+
+static void rxrpc_transmit_some_data(struct rxrpc_call *call)
+{
+	switch (call->state) {
+	case RXRPC_CALL_SERVER_ACK_REQUEST:
+		if (list_empty(&call->tx_sendmsg))
+			return;
+		fallthrough;
+
+	case RXRPC_CALL_SERVER_SEND_REPLY:
+	case RXRPC_CALL_SERVER_AWAIT_ACK:
+	case RXRPC_CALL_CLIENT_SEND_REQUEST:
+	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
+		if (!rxrpc_tx_window_has_space(call))
+			return;
+		if (list_empty(&call->tx_sendmsg))
+			return;
+		rxrpc_decant_prepared_tx(call);
+		break;
+	default:
+		return;
+	}
+}
+
 /*
  * Handle retransmission and deferred ACK/abort generation.
  */
@@ -309,19 +375,22 @@  void rxrpc_process_call(struct work_struct *work)
 	       call->debug_id, rxrpc_call_states[call->state], call->events);
 
 recheck_state:
+	if (call->acks_hard_ack != call->tx_bottom)
+		rxrpc_shrink_call_tx_buffer(call);
+
 	/* Limit the number of times we do this before returning to the manager */
-	iterations++;
-	if (iterations > 5)
-		goto requeue;
+	if (!rxrpc_tx_window_has_space(call) ||
+	    list_empty(&call->tx_sendmsg)) {
+		iterations++;
+		if (iterations > 5)
+			goto requeue;
+	}
 
 	if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
 		rxrpc_send_abort_packet(call);
 		goto recheck_state;
 	}
 
-	if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom)
-		rxrpc_shrink_call_tx_buffer(call);
-
 	if (call->state == RXRPC_CALL_COMPLETE) {
 		del_timer_sync(&call->timer);
 		goto out;
@@ -387,6 +456,8 @@  void rxrpc_process_call(struct work_struct *work)
 		set_bit(RXRPC_CALL_EV_RESEND, &call->events);
 	}
 
+	rxrpc_transmit_some_data(call);
+
 	/* Process events */
 	if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
 		if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 2622d06bb0d6..96a7edd3a842 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -156,6 +156,7 @@  struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	INIT_LIST_HEAD(&call->recvmsg_link);
 	INIT_LIST_HEAD(&call->sock_link);
 	INIT_LIST_HEAD(&call->attend_link);
+	INIT_LIST_HEAD(&call->tx_sendmsg);
 	INIT_LIST_HEAD(&call->tx_buffer);
 	skb_queue_head_init(&call->recvmsg_queue);
 	skb_queue_head_init(&call->rx_oos_queue);
@@ -641,6 +642,11 @@  static void rxrpc_destroy_call(struct work_struct *work)
 	del_timer_sync(&call->timer);
 
 	rxrpc_cleanup_ring(call);
+	while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
+					       struct rxrpc_txbuf, call_link))) {
+		list_del(&txb->call_link);
+		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
+	}
 	while ((txb = list_first_entry_or_null(&call->tx_buffer,
 					       struct rxrpc_txbuf, call_link))) {
 		list_del(&txb->call_link);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index e2ce7dadbb7a..c8147e50060b 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -465,6 +465,14 @@  int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
 
 	trace_rxrpc_tx_data(call, txb->seq, serial, txb->wire.flags,
 			    test_bit(RXRPC_TXBUF_RESENT, &txb->flags), false);
+
+	/* Track what we've attempted to transmit at least once so that the
+	 * retransmission algorithm doesn't try to resend what we haven't sent
+	 * yet.  However, this can race as we can receive an ACK before we get
+	 * to this point.  But, OTOH, if we won't get an ACK mentioning this
+	 * packet unless the far side received it (though it could have
+	 * discarded it anyway and NAK'd it).
+	 */
 	cmpxchg(&call->tx_transmitted, txb->seq - 1, txb->seq);
 
 	/* send the packet with the don't fragment bit set if we currently
@@ -712,3 +720,43 @@  void rxrpc_send_keepalive(struct rxrpc_peer *peer)
 	peer->last_tx_at = ktime_get_seconds();
 	_leave("");
 }
+
+/*
+ * Schedule an instant Tx resend.
+ */
+static inline void rxrpc_instant_resend(struct rxrpc_call *call,
+					struct rxrpc_txbuf *txb)
+{
+	if (call->state < RXRPC_CALL_COMPLETE)
+		kdebug("resend");
+}
+
+/*
+ * Transmit one packet.
+ */
+void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
+{
+	int ret;
+
+	ret = rxrpc_send_data_packet(call, txb);
+	if (ret < 0) {
+		switch (ret) {
+		case -ENETUNREACH:
+		case -EHOSTUNREACH:
+		case -ECONNREFUSED:
+			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
+						  0, ret);
+			break;
+		default:
+			_debug("need instant resend %d", ret);
+			rxrpc_instant_resend(call, txb);
+		}
+	} else {
+		unsigned long now = jiffies;
+		unsigned long resend_at = now + call->peer->rto_j;
+
+		WRITE_ONCE(call->resend_at, resend_at);
+		rxrpc_reduce_call_timer(call, resend_at, now,
+					rxrpc_timer_set_for_send);
+	}
+}
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 76b1e2e89c1e..11af37275d5b 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -22,30 +22,9 @@ 
  */
 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
 {
-	unsigned int win_size;
-	rxrpc_seq_t tx_win = smp_load_acquire(&call->acks_hard_ack);
-
-	/* If we haven't transmitted anything for >1RTT, we should reset the
-	 * congestion management state.
-	 */
-	if (ktime_before(ktime_add_us(call->tx_last_sent,
-				      call->peer->srtt_us >> 3),
-			 ktime_get_real())) {
-		if (RXRPC_TX_SMSS > 2190)
-			win_size = 2;
-		else if (RXRPC_TX_SMSS > 1095)
-			win_size = 3;
-		else
-			win_size = 4;
-		win_size += call->cong_extra;
-	} else {
-		win_size = min_t(unsigned int, call->tx_winsize,
-				 call->cong_cwnd + call->cong_extra);
-	}
-
 	if (_tx_win)
-		*_tx_win = tx_win;
-	return call->tx_top - tx_win < win_size;
+		*_tx_win = call->tx_bottom;
+	return call->tx_prepared - call->tx_bottom < 256;
 }
 
 /*
@@ -66,11 +45,6 @@  static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
 		if (signal_pending(current))
 			return sock_intr_errno(*timeo);
 
-		if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-			rxrpc_shrink_call_tx_buffer(call);
-			continue;
-		}
-
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
 		*timeo = schedule_timeout(*timeo);
 	}
@@ -107,11 +81,6 @@  static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
 		    tx_win == tx_start && signal_pending(current))
 			return -EINTR;
 
-		if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-			rxrpc_shrink_call_tx_buffer(call);
-			continue;
-		}
-
 		if (tx_win != tx_start) {
 			timeout = rtt;
 			tx_start = tx_win;
@@ -137,11 +106,6 @@  static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
 		if (call->state >= RXRPC_CALL_COMPLETE)
 			return call->error;
 
-		if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-			rxrpc_shrink_call_tx_buffer(call);
-			continue;
-		}
-
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
 		*timeo = schedule_timeout(*timeo);
 	}
@@ -207,29 +171,27 @@  static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 	unsigned long now;
 	rxrpc_seq_t seq = txb->seq;
 	bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
-	int ret;
 
 	rxrpc_inc_stat(call->rxnet, stat_tx_data);
 
-	ASSERTCMP(seq, ==, call->tx_top + 1);
+	ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
 
 	/* We have to set the timestamp before queueing as the retransmit
 	 * algorithm can see the packet as soon as we queue it.
 	 */
 	txb->last_sent = ktime_get_real();
 
-	/* Add the packet to the call's output buffer */
-	rxrpc_get_txbuf(txb, rxrpc_txbuf_get_buffer);
-	spin_lock(&call->tx_lock);
-	list_add_tail(&txb->call_link, &call->tx_buffer);
-	call->tx_top = seq;
-	spin_unlock(&call->tx_lock);
-
 	if (last)
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
 	else
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
 
+	/* Add the packet to the call's output buffer */
+	spin_lock(&call->tx_lock);
+	list_add_tail(&txb->call_link, &call->tx_sendmsg);
+	call->tx_prepared = seq;
+	spin_unlock(&call->tx_lock);
+
 	if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
 		_debug("________awaiting reply/ACK__________");
 		write_lock_bh(&call->state_lock);
@@ -258,30 +220,11 @@  static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 		write_unlock_bh(&call->state_lock);
 	}
 
-	if (seq == 1 && rxrpc_is_client_call(call))
-		rxrpc_expose_client_call(call);
-
-	ret = rxrpc_send_data_packet(call, txb);
-	if (ret < 0) {
-		switch (ret) {
-		case -ENETUNREACH:
-		case -EHOSTUNREACH:
-		case -ECONNREFUSED:
-			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
-						  0, ret);
-			goto out;
-		}
-	} else {
-		unsigned long now = jiffies;
-		unsigned long resend_at = now + call->peer->rto_j;
 
-		WRITE_ONCE(call->resend_at, resend_at);
-		rxrpc_reduce_call_timer(call, resend_at, now,
-					rxrpc_timer_set_for_send);
-	}
-
-out:
-	rxrpc_put_txbuf(txb, rxrpc_txbuf_put_trans);
+	/* Stick the packet on the crypto queue or the transmission queue as
+	 * appropriate.
+	 */
+	rxrpc_queue_call(call, rxrpc_call_queue_tx_data);
 }
 
 /*
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index 90ff00c340cd..a5054389dfbb 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -34,7 +34,7 @@  struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
 		txb->offset		= 0;
 		txb->flags		= 0;
 		txb->ack_why		= 0;
-		txb->seq		= call->tx_top + 1;
+		txb->seq		= call->tx_prepared + 1;
 		txb->wire.epoch		= htonl(call->conn->proto.epoch);
 		txb->wire.cid		= htonl(call->cid);
 		txb->wire.callNumber	= htonl(call->call_id);
@@ -107,6 +107,7 @@  void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
 {
 	struct rxrpc_txbuf *txb;
 	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
+	bool wake = false;
 
 	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
 
@@ -123,7 +124,7 @@  void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
 		if (txb->seq != call->tx_bottom + 1)
 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
 		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
-		call->tx_bottom++;
+		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
 		list_del_rcu(&txb->call_link);
 
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
@@ -131,7 +132,12 @@  void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
 		spin_unlock(&call->tx_lock);
 
 		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
+		if (after(call->acks_hard_ack, call->tx_bottom + 128))
+			wake = true;
 	}
 
 	spin_unlock(&call->tx_lock);
+
+	if (wake)
+		wake_up(&call->waitq);
 }