diff mbox series

[for-next,8/8] io_uring: multishot recv

Message ID 20220628150228.1379645-9-dylany@fb.com (mailing list archive)
State New
Headers show
Series io_uring: multishot recv | expand

Commit Message

Dylan Yudaken June 28, 2022, 3:02 p.m. UTC
Support multishot receive for io_uring.
Typical server applications will run a loop where for each recv CQE it
requeues another recv/recvmsg.

This can be simplified by using the existing multishot functionality
combined with io_uring's provided buffers.
The API is to add the IORING_RECV_MULTISHOT flag to the SQE. CQEs will
then be posted (with IORING_CQE_F_MORE flag set) when data is available
and is read. Once an error occurs or the socket ends, the multishot will
be removed and a completion without IORING_CQE_F_MORE will be posted.

The benefit to this is that the recv is much more performant.
 * Subsequent receives are queued up straight away without requiring the
   application to finish a processing loop.
 * If there are more data in the socket (sat the provided buffer size is
   smaller than the socket buffer) then the data is immediately
   returned, improving batching.
 * Poll is only armed once and reused, saving CPU cycles

Signed-off-by: Dylan Yudaken <dylany@fb.com>
---
 io_uring/net.c | 93 +++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 81 insertions(+), 12 deletions(-)

Comments

Jens Axboe June 28, 2022, 3:17 p.m. UTC | #1
On 6/28/22 9:02 AM, Dylan Yudaken wrote:
> @@ -399,13 +401,22 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
>  	sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
>  	sr->len = READ_ONCE(sqe->len);
>  	sr->flags = READ_ONCE(sqe->addr2);
> -	if (sr->flags & ~IORING_RECVSEND_POLL_FIRST)
> +	if (sr->flags & ~(RECVMSG_FLAGS))
>  		return -EINVAL;
>  	sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
>  	if (sr->msg_flags & MSG_DONTWAIT)
>  		req->flags |= REQ_F_NOWAIT;
>  	if (sr->msg_flags & MSG_ERRQUEUE)
>  		req->flags |= REQ_F_CLEAR_POLLIN;
> +	if (sr->flags & IORING_RECV_MULTISHOT) {
> +		if (!(req->flags & REQ_F_BUFFER_SELECT))
> +			return -EINVAL;
> +		if (sr->msg_flags & MSG_WAITALL)
> +			return -EINVAL;
> +		if (req->opcode == IORING_OP_RECV && sr->len)
> +			return -EINVAL;
> +		req->flags |= REQ_F_APOLL_MULTISHOT;
> +	}

Do we want to forbid not using provided buffers? If you have a ping-pong
type setup, eg you know you'll have to send something before you receive
anything again, seems like it'd be feasible to use this with a normal
buffer?

I strongly suspect that most use cases will use provided buffers for
this, just wondering if there are any particular reasons for forbidding
it explicitly.

>  
>  #ifdef CONFIG_COMPAT
>  	if (req->ctx->compat)
> @@ -415,6 +426,14 @@ int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
>  	return 0;
>  }
>  
> +static inline void io_recv_prep_retry(struct io_kiocb *req)
> +{
> +	struct io_sr_msg *sr = io_kiocb_to_cmd(req);
> +
> +	sr->done_io = 0;
> +	sr->len = 0; /* get from the provided buffer */
> +}
> +
>  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
>  {
>  	struct io_sr_msg *sr = io_kiocb_to_cmd(req);
> @@ -424,6 +443,7 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
>  	unsigned flags;
>  	int ret, min_ret = 0;
>  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
> +	size_t len = sr->len;
>  
>  	sock = sock_from_file(req->file);
>  	if (unlikely(!sock))
> @@ -442,16 +462,17 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
>  	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
>  		return io_setup_async_msg(req, kmsg);
>  
> +retry_multishot:
>  	if (io_do_buffer_select(req)) {
>  		void __user *buf;
>  
> -		buf = io_buffer_select(req, &sr->len, issue_flags);
> +		buf = io_buffer_select(req, &len, issue_flags);
>  		if (!buf)
>  			return -ENOBUFS;
>  		kmsg->fast_iov[0].iov_base = buf;
> -		kmsg->fast_iov[0].iov_len = sr->len;
> +		kmsg->fast_iov[0].iov_len = len;
>  		iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->fast_iov, 1,
> -				sr->len);
> +				len);
>  	}
>  
>  	flags = sr->msg_flags;
> @@ -463,8 +484,15 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
>  	kmsg->msg.msg_get_inq = 1;
>  	ret = __sys_recvmsg_sock(sock, &kmsg->msg, sr->umsg, kmsg->uaddr, flags);
>  	if (ret < min_ret) {
> -		if (ret == -EAGAIN && force_nonblock)
> -			return io_setup_async_msg(req, kmsg);
> +		if (ret == -EAGAIN && force_nonblock) {
> +			ret = io_setup_async_msg(req, kmsg);
> +			if (ret == -EAGAIN && (req->flags & IO_APOLL_MULTI_POLLED) ==
> +					       IO_APOLL_MULTI_POLLED) {
> +				io_kbuf_recycle(req, issue_flags);
> +				ret = IOU_ISSUE_SKIP_COMPLETE;
> +			}
> +			return ret;
> +		}
>  		if (ret == -ERESTARTSYS)
>  			ret = -EINTR;
>  		if (ret > 0 && io_net_retry(sock, flags)) {
> @@ -491,8 +519,24 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
>  	cflags = io_put_kbuf(req, issue_flags);
>  	if (kmsg->msg.msg_inq)
>  		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
> +
> +	if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
> +		io_req_set_res(req, ret, cflags);
> +		return IOU_OK;
> +	}
> +
> +	if (ret > 0) {
> +		if (io_post_aux_cqe(req->ctx, req->cqe.user_data, ret,
> +				    cflags | IORING_CQE_F_MORE)) {
> +			io_recv_prep_retry(req);
> +			goto retry_multishot;
> +		} else {
> +			ret = -ECANCELED;
> +		}
> +	}
> +
>  	io_req_set_res(req, ret, cflags);
> -	return IOU_OK;
> +	return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT : ret;
>  }

Minor style, but I prefer avoiding ternaries if possible. This is much
easier to read for me:

	if (req->flags & REQ_F_POLLED)
		return IOU_STOP_MULTISHOT;
	return ret;

> @@ -505,6 +549,7 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
>  	unsigned flags;
>  	int ret, min_ret = 0;
>  	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
> +	size_t len = sr->len;
>  
>  	if (!(req->flags & REQ_F_POLLED) &&
>  	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
> @@ -514,16 +559,17 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
>  	if (unlikely(!sock))
>  		return -ENOTSOCK;
>  
> +retry_multishot:
>  	if (io_do_buffer_select(req)) {
>  		void __user *buf;
>  
> -		buf = io_buffer_select(req, &sr->len, issue_flags);
> +		buf = io_buffer_select(req, &len, issue_flags);
>  		if (!buf)
>  			return -ENOBUFS;
>  		sr->buf = buf;
>  	}
>  
> -	ret = import_single_range(READ, sr->buf, sr->len, &iov, &msg.msg_iter);
> +	ret = import_single_range(READ, sr->buf, len, &iov, &msg.msg_iter);
>  	if (unlikely(ret))
>  		goto out_free;
>  
> @@ -543,8 +589,14 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
>  
>  	ret = sock_recvmsg(sock, &msg, flags);
>  	if (ret < min_ret) {
> -		if (ret == -EAGAIN && force_nonblock)
> -			return -EAGAIN;
> +		if (ret == -EAGAIN && force_nonblock) {
> +			if ((req->flags & IO_APOLL_MULTI_POLLED) == IO_APOLL_MULTI_POLLED) {
> +				io_kbuf_recycle(req, issue_flags);
> +				ret = IOU_ISSUE_SKIP_COMPLETE;
> +			}
> +
> +			return ret;
> +		}

Maybe:
		if ((req->flags & IO_APOLL_MULTI_POLLED) == IO_APOLL_MULTI_POLLED) {
			io_kbuf_recycle(req, issue_flags);
			return IOU_ISSUE_SKIP_COMPLETE;
		}

		return ret;

> @@ -570,8 +622,25 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
>  	cflags = io_put_kbuf(req, issue_flags);
>  	if (msg.msg_inq)
>  		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
> +
> +
> +	if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
> +		io_req_set_res(req, ret, cflags);
> +		return IOU_OK;
> +	}
> +
> +	if (ret > 0) {
> +		if (io_post_aux_cqe(req->ctx, req->cqe.user_data, ret,
> +				    cflags | IORING_CQE_F_MORE)) {
> +			io_recv_prep_retry(req);
> +			goto retry_multishot;
> +		} else {
> +			ret = -ECANCELED;
> +		}
> +	}
> +
>  	io_req_set_res(req, ret, cflags);
> -	return IOU_OK;
> +	return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT : ret;
>  }

Same here, and maybe this needs to be a helper so you could just do

	return io_recv_finish(req, ret, cflags);

or something like that? It's non-trivial duplicated code.
Dylan Yudaken June 28, 2022, 4:15 p.m. UTC | #2
On Tue, 2022-06-28 at 09:17 -0600, Jens Axboe wrote:
> On 6/28/22 9:02 AM, Dylan Yudaken wrote:
> > @@ -399,13 +401,22 @@ int io_recvmsg_prep(struct io_kiocb *req,
> > const struct io_uring_sqe *sqe)
> >         sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
> >         sr->len = READ_ONCE(sqe->len);
> >         sr->flags = READ_ONCE(sqe->addr2);
> > -       if (sr->flags & ~IORING_RECVSEND_POLL_FIRST)
> > +       if (sr->flags & ~(RECVMSG_FLAGS))
> >                 return -EINVAL;
> >         sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
> >         if (sr->msg_flags & MSG_DONTWAIT)
> >                 req->flags |= REQ_F_NOWAIT;
> >         if (sr->msg_flags & MSG_ERRQUEUE)
> >                 req->flags |= REQ_F_CLEAR_POLLIN;
> > +       if (sr->flags & IORING_RECV_MULTISHOT) {
> > +               if (!(req->flags & REQ_F_BUFFER_SELECT))
> > +                       return -EINVAL;
> > +               if (sr->msg_flags & MSG_WAITALL)
> > +                       return -EINVAL;
> > +               if (req->opcode == IORING_OP_RECV && sr->len)
> > +                       return -EINVAL;
> > +               req->flags |= REQ_F_APOLL_MULTISHOT;
> > +       }
> 
> Do we want to forbid not using provided buffers? If you have a ping-
> pong
> type setup, eg you know you'll have to send something before you
> receive
> anything again, seems like it'd be feasible to use this with a normal
> buffer?
> 
> I strongly suspect that most use cases will use provided buffers for
> this, just wondering if there are any particular reasons for
> forbidding
> it explicitly.

My feeling is that getting the user API right without provided buffers
is going to be potentially complex, and probably will overlap with the
MSG_WAITALL case.
Expanding it later is easy but without an actual use case I think
leaving it as provided buffers only for now makes sense (as you say,
this is by far the most likely usecase).

> 
> >  
> >  #ifdef CONFIG_COMPAT
> >         if (req->ctx->compat)
> > @@ -415,6 +426,14 @@ int io_recvmsg_prep(struct io_kiocb *req,
> > const struct io_uring_sqe *sqe)
> >         return 0;
> >  }
> >  
> > +static inline void io_recv_prep_retry(struct io_kiocb *req)
> > +{
> > +       struct io_sr_msg *sr = io_kiocb_to_cmd(req);
> > +
> > +       sr->done_io = 0;
> > +       sr->len = 0; /* get from the provided buffer */
> > +}
> > +
> >  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
> >  {
> >         struct io_sr_msg *sr = io_kiocb_to_cmd(req);
> > @@ -424,6 +443,7 @@ int io_recvmsg(struct io_kiocb *req, unsigned
> > int issue_flags)
> >         unsigned flags;
> >         int ret, min_ret = 0;
> >         bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
> > +       size_t len = sr->len;
> >  
> >         sock = sock_from_file(req->file);
> >         if (unlikely(!sock))
> > @@ -442,16 +462,17 @@ int io_recvmsg(struct io_kiocb *req, unsigned
> > int issue_flags)
> >             (sr->flags & IORING_RECVSEND_POLL_FIRST))
> >                 return io_setup_async_msg(req, kmsg);
> >  
> > +retry_multishot:
> >         if (io_do_buffer_select(req)) {
> >                 void __user *buf;
> >  
> > -               buf = io_buffer_select(req, &sr->len, issue_flags);
> > +               buf = io_buffer_select(req, &len, issue_flags);
> >                 if (!buf)
> >                         return -ENOBUFS;
> >                 kmsg->fast_iov[0].iov_base = buf;
> > -               kmsg->fast_iov[0].iov_len = sr->len;
> > +               kmsg->fast_iov[0].iov_len = len;
> >                 iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg-
> > >fast_iov, 1,
> > -                               sr->len);
> > +                               len);
> >         }
> >  
> >         flags = sr->msg_flags;
> > @@ -463,8 +484,15 @@ int io_recvmsg(struct io_kiocb *req, unsigned
> > int issue_flags)
> >         kmsg->msg.msg_get_inq = 1;
> >         ret = __sys_recvmsg_sock(sock, &kmsg->msg, sr->umsg, kmsg-
> > >uaddr, flags);
> >         if (ret < min_ret) {
> > -               if (ret == -EAGAIN && force_nonblock)
> > -                       return io_setup_async_msg(req, kmsg);
> > +               if (ret == -EAGAIN && force_nonblock) {
> > +                       ret = io_setup_async_msg(req, kmsg);
> > +                       if (ret == -EAGAIN && (req->flags &
> > IO_APOLL_MULTI_POLLED) ==
> > +                                             
> > IO_APOLL_MULTI_POLLED) {
> > +                               io_kbuf_recycle(req, issue_flags);
> > +                               ret = IOU_ISSUE_SKIP_COMPLETE;
> > +                       }
> > +                       return ret;
> > +               }
> >                 if (ret == -ERESTARTSYS)
> >                         ret = -EINTR;
> >                 if (ret > 0 && io_net_retry(sock, flags)) {
> > @@ -491,8 +519,24 @@ int io_recvmsg(struct io_kiocb *req, unsigned
> > int issue_flags)
> >         cflags = io_put_kbuf(req, issue_flags);
> >         if (kmsg->msg.msg_inq)
> >                 cflags |= IORING_CQE_F_SOCK_NONEMPTY;
> > +
> > +       if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
> > +               io_req_set_res(req, ret, cflags);
> > +               return IOU_OK;
> > +       }
> > +
> > +       if (ret > 0) {
> > +               if (io_post_aux_cqe(req->ctx, req->cqe.user_data,
> > ret,
> > +                                   cflags | IORING_CQE_F_MORE)) {
> > +                       io_recv_prep_retry(req);
> > +                       goto retry_multishot;
> > +               } else {
> > +                       ret = -ECANCELED;
> > +               }
> > +       }
> > +
> >         io_req_set_res(req, ret, cflags);
> > -       return IOU_OK;
> > +       return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT :
> > ret;
> >  }
> 
> Minor style, but I prefer avoiding ternaries if possible. This is
> much
> easier to read for me:
> 
>         if (req->flags & REQ_F_POLLED)
>                 return IOU_STOP_MULTISHOT;
>         return ret;

OK

> 
> > @@ -505,6 +549,7 @@ int io_recv(struct io_kiocb *req, unsigned int
> > issue_flags)
> >         unsigned flags;
> >         int ret, min_ret = 0;
> >         bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
> > +       size_t len = sr->len;
> >  
> >         if (!(req->flags & REQ_F_POLLED) &&
> >             (sr->flags & IORING_RECVSEND_POLL_FIRST))
> > @@ -514,16 +559,17 @@ int io_recv(struct io_kiocb *req, unsigned
> > int issue_flags)
> >         if (unlikely(!sock))
> >                 return -ENOTSOCK;
> >  
> > +retry_multishot:
> >         if (io_do_buffer_select(req)) {
> >                 void __user *buf;
> >  
> > -               buf = io_buffer_select(req, &sr->len, issue_flags);
> > +               buf = io_buffer_select(req, &len, issue_flags);
> >                 if (!buf)
> >                         return -ENOBUFS;
> >                 sr->buf = buf;
> >         }
> >  
> > -       ret = import_single_range(READ, sr->buf, sr->len, &iov,
> > &msg.msg_iter);
> > +       ret = import_single_range(READ, sr->buf, len, &iov,
> > &msg.msg_iter);
> >         if (unlikely(ret))
> >                 goto out_free;
> >  
> > @@ -543,8 +589,14 @@ int io_recv(struct io_kiocb *req, unsigned int
> > issue_flags)
> >  
> >         ret = sock_recvmsg(sock, &msg, flags);
> >         if (ret < min_ret) {
> > -               if (ret == -EAGAIN && force_nonblock)
> > -                       return -EAGAIN;
> > +               if (ret == -EAGAIN && force_nonblock) {
> > +                       if ((req->flags & IO_APOLL_MULTI_POLLED) ==
> > IO_APOLL_MULTI_POLLED) {
> > +                               io_kbuf_recycle(req, issue_flags);
> > +                               ret = IOU_ISSUE_SKIP_COMPLETE;
> > +                       }
> > +
> > +                       return ret;
> > +               }
> 
> Maybe:
>                 if ((req->flags & IO_APOLL_MULTI_POLLED) ==
> IO_APOLL_MULTI_POLLED) {
>                         io_kbuf_recycle(req, issue_flags);
>                         return IOU_ISSUE_SKIP_COMPLETE;
>                 }
> 
>                 return ret;
> 
> > @@ -570,8 +622,25 @@ int io_recv(struct io_kiocb *req, unsigned int
> > issue_flags)
> >         cflags = io_put_kbuf(req, issue_flags);
> >         if (msg.msg_inq)
> >                 cflags |= IORING_CQE_F_SOCK_NONEMPTY;
> > +
> > +
> > +       if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
> > +               io_req_set_res(req, ret, cflags);
> > +               return IOU_OK;
> > +       }
> > +
> > +       if (ret > 0) {
> > +               if (io_post_aux_cqe(req->ctx, req->cqe.user_data,
> > ret,
> > +                                   cflags | IORING_CQE_F_MORE)) {
> > +                       io_recv_prep_retry(req);
> > +                       goto retry_multishot;
> > +               } else {
> > +                       ret = -ECANCELED;
> > +               }
> > +       }
> > +
> >         io_req_set_res(req, ret, cflags);
> > -       return IOU_OK;
> > +       return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT :
> > ret;
> >  }
> 
> Same here, and maybe this needs to be a helper so you could just do
> 
>         return io_recv_finish(req, ret, cflags);
> 
> or something like that? It's non-trivial duplicated code.
> 

Makes sense - I'll do that
diff mbox series

Patch

diff --git a/io_uring/net.c b/io_uring/net.c
index 0268c4603f5d..9bf8c6c0b549 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -389,6 +389,8 @@  int io_recvmsg_prep_async(struct io_kiocb *req)
 	return ret;
 }
 
+#define RECVMSG_FLAGS (IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT)
+
 int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req);
@@ -399,13 +401,22 @@  int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	sr->umsg = u64_to_user_ptr(READ_ONCE(sqe->addr));
 	sr->len = READ_ONCE(sqe->len);
 	sr->flags = READ_ONCE(sqe->addr2);
-	if (sr->flags & ~IORING_RECVSEND_POLL_FIRST)
+	if (sr->flags & ~(RECVMSG_FLAGS))
 		return -EINVAL;
 	sr->msg_flags = READ_ONCE(sqe->msg_flags) | MSG_NOSIGNAL;
 	if (sr->msg_flags & MSG_DONTWAIT)
 		req->flags |= REQ_F_NOWAIT;
 	if (sr->msg_flags & MSG_ERRQUEUE)
 		req->flags |= REQ_F_CLEAR_POLLIN;
+	if (sr->flags & IORING_RECV_MULTISHOT) {
+		if (!(req->flags & REQ_F_BUFFER_SELECT))
+			return -EINVAL;
+		if (sr->msg_flags & MSG_WAITALL)
+			return -EINVAL;
+		if (req->opcode == IORING_OP_RECV && sr->len)
+			return -EINVAL;
+		req->flags |= REQ_F_APOLL_MULTISHOT;
+	}
 
 #ifdef CONFIG_COMPAT
 	if (req->ctx->compat)
@@ -415,6 +426,14 @@  int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return 0;
 }
 
+static inline void io_recv_prep_retry(struct io_kiocb *req)
+{
+	struct io_sr_msg *sr = io_kiocb_to_cmd(req);
+
+	sr->done_io = 0;
+	sr->len = 0; /* get from the provided buffer */
+}
+
 int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_sr_msg *sr = io_kiocb_to_cmd(req);
@@ -424,6 +443,7 @@  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	unsigned flags;
 	int ret, min_ret = 0;
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+	size_t len = sr->len;
 
 	sock = sock_from_file(req->file);
 	if (unlikely(!sock))
@@ -442,16 +462,17 @@  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
 		return io_setup_async_msg(req, kmsg);
 
+retry_multishot:
 	if (io_do_buffer_select(req)) {
 		void __user *buf;
 
-		buf = io_buffer_select(req, &sr->len, issue_flags);
+		buf = io_buffer_select(req, &len, issue_flags);
 		if (!buf)
 			return -ENOBUFS;
 		kmsg->fast_iov[0].iov_base = buf;
-		kmsg->fast_iov[0].iov_len = sr->len;
+		kmsg->fast_iov[0].iov_len = len;
 		iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->fast_iov, 1,
-				sr->len);
+				len);
 	}
 
 	flags = sr->msg_flags;
@@ -463,8 +484,15 @@  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	kmsg->msg.msg_get_inq = 1;
 	ret = __sys_recvmsg_sock(sock, &kmsg->msg, sr->umsg, kmsg->uaddr, flags);
 	if (ret < min_ret) {
-		if (ret == -EAGAIN && force_nonblock)
-			return io_setup_async_msg(req, kmsg);
+		if (ret == -EAGAIN && force_nonblock) {
+			ret = io_setup_async_msg(req, kmsg);
+			if (ret == -EAGAIN && (req->flags & IO_APOLL_MULTI_POLLED) ==
+					       IO_APOLL_MULTI_POLLED) {
+				io_kbuf_recycle(req, issue_flags);
+				ret = IOU_ISSUE_SKIP_COMPLETE;
+			}
+			return ret;
+		}
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
 		if (ret > 0 && io_net_retry(sock, flags)) {
@@ -491,8 +519,24 @@  int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	cflags = io_put_kbuf(req, issue_flags);
 	if (kmsg->msg.msg_inq)
 		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
+
+	if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
+		io_req_set_res(req, ret, cflags);
+		return IOU_OK;
+	}
+
+	if (ret > 0) {
+		if (io_post_aux_cqe(req->ctx, req->cqe.user_data, ret,
+				    cflags | IORING_CQE_F_MORE)) {
+			io_recv_prep_retry(req);
+			goto retry_multishot;
+		} else {
+			ret = -ECANCELED;
+		}
+	}
+
 	io_req_set_res(req, ret, cflags);
-	return IOU_OK;
+	return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT : ret;
 }
 
 int io_recv(struct io_kiocb *req, unsigned int issue_flags)
@@ -505,6 +549,7 @@  int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	unsigned flags;
 	int ret, min_ret = 0;
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+	size_t len = sr->len;
 
 	if (!(req->flags & REQ_F_POLLED) &&
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
@@ -514,16 +559,17 @@  int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
+retry_multishot:
 	if (io_do_buffer_select(req)) {
 		void __user *buf;
 
-		buf = io_buffer_select(req, &sr->len, issue_flags);
+		buf = io_buffer_select(req, &len, issue_flags);
 		if (!buf)
 			return -ENOBUFS;
 		sr->buf = buf;
 	}
 
-	ret = import_single_range(READ, sr->buf, sr->len, &iov, &msg.msg_iter);
+	ret = import_single_range(READ, sr->buf, len, &iov, &msg.msg_iter);
 	if (unlikely(ret))
 		goto out_free;
 
@@ -543,8 +589,14 @@  int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 
 	ret = sock_recvmsg(sock, &msg, flags);
 	if (ret < min_ret) {
-		if (ret == -EAGAIN && force_nonblock)
-			return -EAGAIN;
+		if (ret == -EAGAIN && force_nonblock) {
+			if ((req->flags & IO_APOLL_MULTI_POLLED) == IO_APOLL_MULTI_POLLED) {
+				io_kbuf_recycle(req, issue_flags);
+				ret = IOU_ISSUE_SKIP_COMPLETE;
+			}
+
+			return ret;
+		}
 		if (ret == -ERESTARTSYS)
 			ret = -EINTR;
 		if (ret > 0 && io_net_retry(sock, flags)) {
@@ -570,8 +622,25 @@  int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	cflags = io_put_kbuf(req, issue_flags);
 	if (msg.msg_inq)
 		cflags |= IORING_CQE_F_SOCK_NONEMPTY;
+
+
+	if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
+		io_req_set_res(req, ret, cflags);
+		return IOU_OK;
+	}
+
+	if (ret > 0) {
+		if (io_post_aux_cqe(req->ctx, req->cqe.user_data, ret,
+				    cflags | IORING_CQE_F_MORE)) {
+			io_recv_prep_retry(req);
+			goto retry_multishot;
+		} else {
+			ret = -ECANCELED;
+		}
+	}
+
 	io_req_set_res(req, ret, cflags);
-	return IOU_OK;
+	return req->flags & REQ_F_POLLED ? IOU_STOP_MULTISHOT : ret;
 }
 
 int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)