diff mbox series

[v1,15/15] io_uring/zcrx: throttle receive requests

Message ID 20241007221603.1703699-16-dw@davidwei.uk (mailing list archive)
State New
Headers show
Series io_uring zero copy rx | expand

Commit Message

David Wei Oct. 7, 2024, 10:16 p.m. UTC
From: Pavel Begunkov <asml.silence@gmail.com>

io_zc_rx_tcp_recvmsg() continues until it fails or there is nothing to
receive. If the other side sends fast enough, we might get stuck in
io_zc_rx_tcp_recvmsg() producing more and more CQEs but not letting the
user to handle them leading to unbound latencies.

Break out of it based on an arbitrarily chosen limit, the upper layer
will either return to userspace or requeue the request.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: David Wei <dw@davidwei.uk>
---
 io_uring/net.c  |  5 ++++-
 io_uring/zcrx.c | 17 ++++++++++++++---
 io_uring/zcrx.h |  6 ++++--
 3 files changed, 22 insertions(+), 6 deletions(-)

Comments

Jens Axboe Oct. 9, 2024, 6:43 p.m. UTC | #1
On 10/7/24 4:16 PM, David Wei wrote:
> From: Pavel Begunkov <asml.silence@gmail.com>
> 
> io_zc_rx_tcp_recvmsg() continues until it fails or there is nothing to
> receive. If the other side sends fast enough, we might get stuck in
> io_zc_rx_tcp_recvmsg() producing more and more CQEs but not letting the
> user to handle them leading to unbound latencies.
> 
> Break out of it based on an arbitrarily chosen limit, the upper layer
> will either return to userspace or requeue the request.

Probably prudent, and hand wavy limits are just fine as all we really
care about is breaking out.

Looks fine to me.
diff mbox series

Patch

diff --git a/io_uring/net.c b/io_uring/net.c
index 482e138d2994..c99e62c7dcfb 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -1253,10 +1253,13 @@  int io_recvzc(struct io_kiocb *req, unsigned int issue_flags)
 	if (!ifq)
 		return -EINVAL;
 
-	ret = io_zcrx_recv(req, ifq, sock, zc->msg_flags | MSG_DONTWAIT);
+	ret = io_zcrx_recv(req, ifq, sock, zc->msg_flags | MSG_DONTWAIT,
+			   issue_flags);
 	if (unlikely(ret <= 0) && ret != -EAGAIN) {
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
+		if (ret == IOU_REQUEUE)
+			return IOU_REQUEUE;
 
 		req_set_fail(req);
 		io_req_set_res(req, ret, 0);
diff --git a/io_uring/zcrx.c b/io_uring/zcrx.c
index 7939f830cf5b..a78d82a2d404 100644
--- a/io_uring/zcrx.c
+++ b/io_uring/zcrx.c
@@ -26,10 +26,13 @@ 
 
 #if defined(CONFIG_PAGE_POOL) && defined(CONFIG_INET)
 
+#define IO_SKBS_PER_CALL_LIMIT	20
+
 struct io_zcrx_args {
 	struct io_kiocb		*req;
 	struct io_zcrx_ifq	*ifq;
 	struct socket		*sock;
+	unsigned		nr_skbs;
 };
 
 struct io_zc_refill_data {
@@ -708,6 +711,9 @@  io_zcrx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb,
 	int i, copy, end, off;
 	int ret = 0;
 
+	if (unlikely(args->nr_skbs++ > IO_SKBS_PER_CALL_LIMIT))
+		return -EAGAIN;
+
 	if (unlikely(offset < skb_headlen(skb))) {
 		ssize_t copied;
 		size_t to_copy;
@@ -785,7 +791,8 @@  io_zcrx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb,
 }
 
 static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
-				struct sock *sk, int flags)
+				struct sock *sk, int flags,
+				unsigned int issue_flags)
 {
 	struct io_zcrx_args args = {
 		.req = req,
@@ -811,6 +818,9 @@  static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
 			ret = -ENOTCONN;
 		else
 			ret = -EAGAIN;
+	} else if (unlikely(args.nr_skbs > IO_SKBS_PER_CALL_LIMIT) &&
+		   (issue_flags & IO_URING_F_MULTISHOT)) {
+		ret = IOU_REQUEUE;
 	} else if (sock_flag(sk, SOCK_DONE)) {
 		/* Make it to retry until it finally gets 0. */
 		ret = -EAGAIN;
@@ -821,7 +831,8 @@  static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
 }
 
 int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
-		 struct socket *sock, unsigned int flags)
+		 struct socket *sock, unsigned int flags,
+		 unsigned int issue_flags)
 {
 	struct sock *sk = sock->sk;
 	const struct proto *prot = READ_ONCE(sk->sk_prot);
@@ -830,7 +841,7 @@  int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
 		return -EPROTONOSUPPORT;
 
 	sock_rps_record_flow(sk);
-	return io_zcrx_tcp_recvmsg(req, ifq, sk, flags);
+	return io_zcrx_tcp_recvmsg(req, ifq, sk, flags, issue_flags);
 }
 
 #endif
diff --git a/io_uring/zcrx.h b/io_uring/zcrx.h
index ddd68098122a..bb7ca61a251e 100644
--- a/io_uring/zcrx.h
+++ b/io_uring/zcrx.h
@@ -46,7 +46,8 @@  int io_register_zcrx_ifq(struct io_ring_ctx *ctx,
 void io_unregister_zcrx_ifqs(struct io_ring_ctx *ctx);
 void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx);
 int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
-		 struct socket *sock, unsigned int flags);
+		 struct socket *sock, unsigned int flags,
+		 unsigned int issue_flags);
 #else
 static inline int io_register_zcrx_ifq(struct io_ring_ctx *ctx,
 					struct io_uring_zcrx_ifq_reg __user *arg)
@@ -60,7 +61,8 @@  static inline void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx)
 {
 }
 static inline int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
-			       struct socket *sock, unsigned int flags)
+				struct socket *sock, unsigned int flags,
+				unsigned int issue_flags)
 {
 	return -EOPNOTSUPP;
 }