diff mbox series

[mptcp-next,03/10] Squash to "mptcp: add get_subflow wrappers"

Message ID 727a87da10932296150becc361b48b60b43531f4.1653033459.git.geliang.tang@suse.com (mailing list archive)
State Superseded, archived
Headers show
Series BPF packet scheduler | expand

Checks

Context Check Description
matttbe/checkpatch warning total: 0 errors, 8 warnings, 0 checks, 387 lines checked
matttbe/build fail Build error with: -Werror
matttbe/KVM_Validation__normal warning Unstable: 2 failed test(s): packetdrill_sockopts selftest_mptcp_join
matttbe/KVM_Validation__debug warning Unstable: 2 failed test(s): selftest_diag selftest_mptcp_join - Critical: 11 Call Trace(s) - Critical: Global Timeout ❌

Commit Message

Geliang Tang May 20, 2022, 8:04 a.m. UTC
This patch adds the redundant subflows support, sending all packets
redundantly on all available subflows.

Signed-off-by: Geliang Tang <geliang.tang@suse.com>
---
 net/mptcp/protocol.c | 192 +++++++++++++++++++++++++++----------------
 net/mptcp/protocol.h |   5 +-
 net/mptcp/sched.c    |  78 ++++++++++++++----
 3 files changed, 188 insertions(+), 87 deletions(-)
diff mbox series

Patch

diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 96cf1620348b..274818480a36 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -1564,37 +1564,50 @@  void __mptcp_push_pending(struct sock *sk, unsigned int flags)
 		info.limit = dfrag->data_len;
 		len = dfrag->data_len - dfrag->already_sent;
 		while (len > 0) {
+			struct mptcp_subflow_context *subflow;
 			int ret = 0;
+			int max = 0;
+			int err;
 
-			prev_ssk = ssk;
-			ssk = mptcp_sched_get_send(msk);
-
-			/* First check. If the ssk has changed since
-			 * the last round, release prev_ssk
-			 */
-			if (ssk != prev_ssk && prev_ssk)
-				mptcp_push_release(prev_ssk, &info);
-			if (!ssk)
-				goto out;
-
-			/* Need to lock the new subflow only if different
-			 * from the previous one, otherwise we are still
-			 * helding the relevant lock
-			 */
-			if (ssk != prev_ssk)
-				lock_sock(ssk);
-
-			ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
-			if (ret <= 0) {
-				mptcp_push_release(ssk, &info);
+			err = mptcp_sched_get_send(msk);
+			if (err)
 				goto out;
+			mptcp_for_each_subflow(msk, subflow) {
+				if (subflow->scheduled) {
+					prev_ssk = ssk;
+					ssk = mptcp_subflow_tcp_sock(subflow);
+
+					/* First check. If the ssk has changed since
+					 * the last round, release prev_ssk
+					 */
+					if (ssk != prev_ssk && prev_ssk)
+						mptcp_push_release(prev_ssk, &info);
+					if (!ssk)
+						goto out;
+
+					/* Need to lock the new subflow only if different
+					 * from the previous one, otherwise we are still
+					 * helding the relevant lock
+					 */
+					if (ssk != prev_ssk)
+						lock_sock(ssk);
+
+					ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+					if (ret <= 0) {
+						mptcp_push_release(ssk, &info);
+						goto out;
+					}
+
+					if (ret > max)
+						max = ret;
+				}
 			}
 
-			info.sent += ret;
-			copied += ret;
-			len -= ret;
+			info.sent += max;
+			copied += max;
+			len -= max;
 
-			mptcp_update_post_push(msk, dfrag, ret);
+			mptcp_update_post_push(msk, dfrag, max);
 		}
 		WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
 	}
@@ -1628,7 +1641,10 @@  static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
 		info.limit = dfrag->data_len;
 		len = dfrag->data_len - dfrag->already_sent;
 		while (len > 0) {
+			struct mptcp_subflow_context *subflow;
 			int ret = 0;
+			int max = 0;
+			int err;
 
 			/* the caller already invoked the packet scheduler,
 			 * check for a different subflow usage only after
@@ -1639,31 +1655,41 @@  static void __mptcp_subflow_push_pending(struct sock *sk, struct sock *ssk)
 
 				if (!xmit_ssk)
 					goto out;
-				ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
-				if (ret <= 0)
+				max = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+				if (max <= 0)
 					goto out;
 				first = false;
 			} else {
-				xmit_ssk = mptcp_sched_get_send(mptcp_sk(sk));
-
-				if (!xmit_ssk)
-					goto out;
-				if (xmit_ssk != ssk) {
-					mptcp_subflow_delegate(mptcp_subflow_ctx(xmit_ssk),
-							       MPTCP_DELEGATE_SEND);
+				err = mptcp_sched_get_send(mptcp_sk(sk));
+				if (err)
 					goto out;
+				mptcp_for_each_subflow(msk, subflow) {
+					if (subflow->scheduled) {
+						xmit_ssk = mptcp_subflow_tcp_sock(subflow);
+
+						if (!xmit_ssk)
+							goto out;
+						if (xmit_ssk != ssk) {
+							mptcp_subflow_delegate(mptcp_subflow_ctx(xmit_ssk),
+									       MPTCP_DELEGATE_SEND);
+							goto out;
+						}
+
+						ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+						if (ret <= 0)
+							goto out;
+
+						if (ret > max)
+							max = ret;
+					}
 				}
-
-				ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
-				if (ret <= 0)
-					goto out;
 			}
 
-			info.sent += ret;
-			copied += ret;
-			len -= ret;
+			info.sent += max;
+			copied += max;
+			len -= max;
 
-			mptcp_update_post_push(msk, dfrag, ret);
+			mptcp_update_post_push(msk, dfrag, max);
 		}
 		WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
 	}
@@ -2455,16 +2481,17 @@  static void mptcp_check_fastclose(struct mptcp_sock *msk)
 static void __mptcp_retrans(struct sock *sk)
 {
 	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct mptcp_subflow_context *subflow;
 	struct mptcp_sendmsg_info info = {};
 	struct mptcp_data_frag *dfrag;
 	size_t copied = 0;
 	struct sock *ssk;
-	int ret;
+	int err;
 
 	mptcp_clean_una_wakeup(sk);
 
 	/* first check ssk: need to kick "stale" logic */
-	ssk = mptcp_sched_get_retrans(msk);
+	err = mptcp_sched_get_retrans(msk);
 	dfrag = mptcp_rtx_head(sk);
 	if (!dfrag) {
 		if (mptcp_data_fin_enabled(msk)) {
@@ -2483,31 +2510,45 @@  static void __mptcp_retrans(struct sock *sk)
 		goto reset_timer;
 	}
 
-	if (!ssk)
+	if (err)
 		goto reset_timer;
 
-	lock_sock(ssk);
+	mptcp_for_each_subflow(msk, subflow) {
+		if (subflow->scheduled) {
+			int ret = 0;
+			int max = 0;
 
-	/* limit retransmission to the bytes already sent on some subflows */
-	info.sent = 0;
-	info.limit = READ_ONCE(msk->csum_enabled) ? dfrag->data_len : dfrag->already_sent;
-	while (info.sent < info.limit) {
-		ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
-		if (ret <= 0)
-			break;
+			ssk = mptcp_subflow_tcp_sock(subflow);
+			if (!ssk)
+				goto reset_timer;
 
-		MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
-		copied += ret;
-		info.sent += ret;
-	}
-	if (copied) {
-		dfrag->already_sent = max(dfrag->already_sent, info.sent);
-		tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
-			 info.size_goal);
-		WRITE_ONCE(msk->allow_infinite_fallback, false);
-	}
+			lock_sock(ssk);
 
-	release_sock(ssk);
+			/* limit retransmission to the bytes already sent on some subflows */
+			info.sent = 0;
+			info.limit = READ_ONCE(msk->csum_enabled) ? dfrag->data_len : dfrag->already_sent;
+			while (info.sent < info.limit) {
+				ret = mptcp_sendmsg_frag(sk, ssk, dfrag, &info);
+				if (ret <= 0)
+					break;
+
+				if (ret > max)
+					max = ret;
+
+				MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RETRANSSEGS);
+				copied += max;
+				info.sent += max;
+			}
+			if (copied) {
+				dfrag->already_sent = max(dfrag->already_sent, info.sent);
+				tcp_push(ssk, 0, info.mss_now, tcp_sk(ssk)->nonagle,
+					 info.size_goal);
+				WRITE_ONCE(msk->allow_infinite_fallback, false);
+			}
+
+			release_sock(ssk);
+		}
+	}
 
 reset_timer:
 	mptcp_check_and_set_pending(sk);
@@ -3118,12 +3159,25 @@  void __mptcp_check_push(struct sock *sk, struct sock *ssk)
 		return;
 
 	if (!sock_owned_by_user(sk)) {
-		struct sock *xmit_ssk = mptcp_sched_get_send(mptcp_sk(sk));
+		struct mptcp_sock *msk = mptcp_sk(sk);
+		struct mptcp_subflow_context *subflow;
+		struct sock *xmit_ssk;
+		int err;
 
-		if (xmit_ssk == ssk)
-			__mptcp_subflow_push_pending(sk, ssk);
-		else if (xmit_ssk)
-			mptcp_subflow_delegate(mptcp_subflow_ctx(xmit_ssk), MPTCP_DELEGATE_SEND);
+		pr_debug("%s", __func__);
+		err = mptcp_sched_get_send(msk);
+		if (err)
+			return;
+		mptcp_for_each_subflow(msk, subflow) {
+			if (subflow->scheduled) {
+				xmit_ssk = mptcp_subflow_tcp_sock(subflow);
+
+				if (xmit_ssk == ssk)
+					__mptcp_subflow_push_pending(sk, ssk);
+				else if (xmit_ssk)
+					mptcp_subflow_delegate(mptcp_subflow_ctx(xmit_ssk), MPTCP_DELEGATE_SEND);
+			}
+		}
 	} else {
 		__set_bit(MPTCP_PUSH_PENDING, &mptcp_sk(sk)->cb_flags);
 	}
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 8739794166d8..1ce01db60b7e 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -461,6 +461,7 @@  struct mptcp_subflow_context {
 		send_mp_fail : 1,
 		send_fastclose : 1,
 		send_infinite_map : 1,
+		scheduled : 1,
 		rx_eof : 1,
 		can_ack : 1,        /* only after processing the remote a key */
 		disposable : 1,	    /* ctx can be free at ulp release time */
@@ -630,8 +631,8 @@  int mptcp_init_sched(struct mptcp_sock *msk,
 void mptcp_release_sched(struct mptcp_sock *msk);
 struct sock *mptcp_subflow_get_send(struct mptcp_sock *msk);
 struct sock *mptcp_subflow_get_retrans(struct mptcp_sock *msk);
-struct sock *mptcp_sched_get_send(struct mptcp_sock *msk);
-struct sock *mptcp_sched_get_retrans(struct mptcp_sock *msk);
+int mptcp_sched_get_send(struct mptcp_sock *msk);
+int mptcp_sched_get_retrans(struct mptcp_sock *msk);
 
 static inline bool __mptcp_subflow_active(struct mptcp_subflow_context *subflow)
 {
diff --git a/net/mptcp/sched.c b/net/mptcp/sched.c
index 3ceb721e6489..207ab422ac5d 100644
--- a/net/mptcp/sched.c
+++ b/net/mptcp/sched.c
@@ -91,51 +91,97 @@  void mptcp_release_sched(struct mptcp_sock *msk)
 static int mptcp_sched_data_init(struct mptcp_sock *msk,
 				 struct mptcp_sched_data *data)
 {
-	data->sock = NULL;
-	data->call_again = 0;
+	data->bitmap = 0;
 
 	return 0;
 }
 
-struct sock *mptcp_sched_get_send(struct mptcp_sock *msk)
+int mptcp_sched_get_send(struct mptcp_sock *msk)
 {
+	struct mptcp_subflow_context *subflow;
 	struct mptcp_sched_data data;
+	struct sock *ssk;
 
 	sock_owned_by_me((struct sock *)msk);
 
+	mptcp_for_each_subflow(msk, subflow)
+		subflow->scheduled = 0;
+
 	/* the following check is moved out of mptcp_subflow_get_send */
 	if (__mptcp_check_fallback(msk)) {
-		if (!msk->first)
-			return NULL;
-		return sk_stream_memory_free(msk->first) ? msk->first : NULL;
+		if (msk->first && sk_stream_memory_free(msk->first)) {
+			subflow = mptcp_subflow_ctx(msk->first);
+			subflow->scheduled = 1;
+			return 0;
+		}
+		return -EINVAL;
 	}
 
-	if (!msk->sched)
-		return mptcp_subflow_get_send(msk);
+	if (!msk->sched) {
+		ssk = mptcp_subflow_get_send(msk);
+		if (!ssk)
+			goto err;
+
+		subflow = mptcp_subflow_ctx(ssk);
+		if (!subflow)
+			goto err;
+
+		subflow->scheduled = 1;
+		return 0;
+	}
 
 	mptcp_sched_data_init(msk, &data);
 	msk->sched->get_subflow(msk, false, &data);
 
-	msk->last_snd = data.sock;
-	return data.sock;
+	return 0;
+
+err:
+	if (msk->first) {
+		subflow = mptcp_subflow_ctx(msk->first);
+		subflow->scheduled = 1;
+		return 0;
+	}
+	return -EINVAL;
 }
 
-struct sock *mptcp_sched_get_retrans(struct mptcp_sock *msk)
+int mptcp_sched_get_retrans(struct mptcp_sock *msk)
 {
+	struct mptcp_subflow_context *subflow;
 	struct mptcp_sched_data data;
+	struct sock *ssk;
 
 	sock_owned_by_me((const struct sock *)msk);
 
+	mptcp_for_each_subflow(msk, subflow)
+		subflow->scheduled = 0;
+
 	/* the following check is moved out of mptcp_subflow_get_retrans */
 	if (__mptcp_check_fallback(msk))
-		return NULL;
+		goto err;
+
+	if (!msk->sched) {
+		ssk = mptcp_subflow_get_retrans(msk);
+		if (!ssk)
+			goto err;
+
+		subflow = mptcp_subflow_ctx(ssk);
+		if (!subflow)
+			goto err;
 
-	if (!msk->sched)
-		return mptcp_subflow_get_retrans(msk);
+		subflow->scheduled = 1;
+		return 0;
+	}
 
 	mptcp_sched_data_init(msk, &data);
 	msk->sched->get_subflow(msk, true, &data);
 
-	msk->last_snd = data.sock;
-	return data.sock;
+	return 0;
+
+err:
+	if (msk->first) {
+		subflow = mptcp_subflow_ctx(msk->first);
+		subflow->scheduled = 1;
+		return 0;
+	}
+	return -EINVAL;
 }