diff mbox series

[RFC,v4,02/17] af_vsock: separate wait data loop

Message ID 20210207151451.804498-1-arseny.krasnov@kaspersky.com (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series virtio/vsock: introduce SOCK_SEQPACKET support | expand

Checks

Context Check Description
netdev/cover_letter success Link
netdev/fixes_present success Link
netdev/patch_count fail Series longer than 15 patches
netdev/tree_selection success Guessed tree name to be net-next
netdev/subject_prefix success Link
netdev/cc_maintainers warning 1 maintainers not CCed: jeffv@google.com
netdev/source_inline success Was 0 now: 0
netdev/verify_signedoff success Link
netdev/module_param success Was 0 now: 0
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/verify_fixes success Link
netdev/checkpatch warning WARNING: line length of 81 exceeds 80 columns WARNING: line length of 87 exceeds 80 columns
netdev/build_allmodconfig_warn success Errors and warnings before: 0 this patch: 0
netdev/header_inline success Link
netdev/stable success Stable not CCed

Commit Message

Arseny Krasnov Feb. 7, 2021, 3:14 p.m. UTC
This moves wait loop for data to dedicated function, because later
it will be used by SEQPACKET data receive loop.

Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
---
 net/vmw_vsock/af_vsock.c | 158 +++++++++++++++++++++------------------
 1 file changed, 86 insertions(+), 72 deletions(-)

Comments

Stefano Garzarella Feb. 11, 2021, 11:24 a.m. UTC | #1
On Sun, Feb 07, 2021 at 06:14:48PM +0300, Arseny Krasnov wrote:
>This moves wait loop for data to dedicated function, because later
>it will be used by SEQPACKET data receive loop.
>
>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>---
> net/vmw_vsock/af_vsock.c | 158 +++++++++++++++++++++------------------
> 1 file changed, 86 insertions(+), 72 deletions(-)
>
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index f4fabec50650..38927695786f 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1833,6 +1833,71 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> 	return err;
> }
>
>+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
>+			   long timeout,
>+			   struct vsock_transport_recv_notify_data *recv_data,
>+			   size_t target)
>+{
>+	const struct vsock_transport *transport;
>+	struct vsock_sock *vsk;
>+	s64 data;
>+	int err;
>+
>+	vsk = vsock_sk(sk);
>+	err = 0;
>+	transport = vsk->transport;
>+	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
>+
>+	while ((data = vsock_stream_has_data(vsk)) == 0) {
>+		if (sk->sk_err != 0 ||
>+		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>+		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>+			goto out;
>+		}
>+
>+		/* Don't wait for non-blocking sockets. */
>+		if (timeout == 0) {
>+			err = -EAGAIN;
>+			goto out;
>+		}
>+
>+		if (recv_data) {
>+			err = transport->notify_recv_pre_block(vsk, target, recv_data);
>+			if (err < 0)
>+				goto out;
>+		}
>+
>+		release_sock(sk);
>+		timeout = schedule_timeout(timeout);
>+		lock_sock(sk);
>+
>+		if (signal_pending(current)) {
>+			err = sock_intr_errno(timeout);
>+			goto out;
>+		} else if (timeout == 0) {
>+			err = -EAGAIN;
>+			goto out;
>+		}
>+	}
>+
>+	finish_wait(sk_sleep(sk), wait);
>+
>+	/* Invalid queue pair content. XXX This should
>+	 * be changed to a connection reset in a later
>+	 * change.
>+	 */
>+	if (data < 0)
>+		return -ENOMEM;
>+
>+	/* Have some data, return. */
>+	if (data)
>+		return data;

IIUC here data must be != 0 so you can simply return data in any case.

Or cleaner, you can do 'break' instead of 'goto out' in the error paths 
and after the while loop you can do something like this:

	finish_wait(sk_sleep(sk), wait);

	if (err)
		return err;

	if (data < 0)
		return -ENOMEM;

	return data;
}

>+
>+out:
>+	finish_wait(sk_sleep(sk), wait);
>+	return err;
>+}
>+
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 			  int flags)
>@@ -1912,85 +1977,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>
>
> 	while (1) {
>-		s64 ready;
>+		ssize_t read;
>
>-		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>-		ready = vsock_stream_has_data(vsk);
>-
>-		if (ready == 0) {
>-			if (sk->sk_err != 0 ||
>-			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>-			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-			/* Don't wait for non-blocking sockets. */
>-			if (timeout == 0) {
>-				err = -EAGAIN;
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-
>-			err = transport->notify_recv_pre_block(
>-					vsk, target, &recv_data);
>-			if (err < 0) {
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-			release_sock(sk);
>-			timeout = schedule_timeout(timeout);
>-			lock_sock(sk);
>-
>-			if (signal_pending(current)) {
>-				err = sock_intr_errno(timeout);
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			} else if (timeout == 0) {
>-				err = -EAGAIN;
>-				finish_wait(sk_sleep(sk), &wait);
>-				break;
>-			}
>-		} else {
>-			ssize_t read;
>+		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
>+		if (err <= 0)
>+			break;
>
>-			finish_wait(sk_sleep(sk), &wait);
>-
>-			if (ready < 0) {
>-				/* Invalid queue pair content. XXX This should
>-				* be changed to a connection reset in a later
>-				* change.
>-				*/
>-
>-				err = -ENOMEM;
>-				goto out;
>-			}
>-
>-			err = transport->notify_recv_pre_dequeue(
>-					vsk, target, &recv_data);
>-			if (err < 0)
>-				break;
>+		err = transport->notify_recv_pre_dequeue(vsk, target,
>+							 &recv_data);
>+		if (err < 0)
>+			break;
>
>-			read = transport->stream_dequeue(
>-					vsk, msg,
>-					len - copied, flags);
>-			if (read < 0) {
>-				err = -ENOMEM;
>-				break;
>-			}
>+		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>+		if (read < 0) {
>+			err = -ENOMEM;
>+			break;
>+		}
>
>-			copied += read;
>+		copied += read;
>
>-			err = transport->notify_recv_post_dequeue(
>-					vsk, target, read,
>-					!(flags & MSG_PEEK), &recv_data);
>-			if (err < 0)
>-				goto out;
>+		err = transport->notify_recv_post_dequeue(vsk, target, read,
>+						!(flags & MSG_PEEK), &recv_data);
>+		if (err < 0)
>+			goto out;
>
>-			if (read >= target || flags & MSG_PEEK)
>-				break;
>+		if (read >= target || flags & MSG_PEEK)
>+			break;
>
>-			target -= read;
>-		}
>+		target -= read;
> 	}

This part looks okay, maybe we could improve the loop a bit and make it 
more readable, but it's out of the scope of this patch.

Thanks,
Stefano
Jorgen Hansen Feb. 11, 2021, 3:11 p.m. UTC | #2
> On 7 Feb 2021, at 16:14, Arseny Krasnov <arseny.krasnov@kaspersky.com> wrote:
> 
> This moves wait loop for data to dedicated function, because later
> it will be used by SEQPACKET data receive loop.
> 
> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
> ---
> net/vmw_vsock/af_vsock.c | 158 +++++++++++++++++++++------------------
> 1 file changed, 86 insertions(+), 72 deletions(-)
> 
> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
> index f4fabec50650..38927695786f 100644
> --- a/net/vmw_vsock/af_vsock.c
> +++ b/net/vmw_vsock/af_vsock.c
> @@ -1833,6 +1833,71 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
> 	return err;
> }
> 
> +static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> +			   long timeout,
> +			   struct vsock_transport_recv_notify_data *recv_data,
> +			   size_t target)
> +{
> +	const struct vsock_transport *transport;
> +	struct vsock_sock *vsk;
> +	s64 data;
> +	int err;
> +
> +	vsk = vsock_sk(sk);
> +	err = 0;
> +	transport = vsk->transport;
> +	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
> +
> +	while ((data = vsock_stream_has_data(vsk)) == 0) {
> +		if (sk->sk_err != 0 ||
> +		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
> +		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
> +			goto out;
> +		}
> +
> +		/* Don't wait for non-blocking sockets. */
> +		if (timeout == 0) {
> +			err = -EAGAIN;
> +			goto out;
> +		}
> +
> +		if (recv_data) {
> +			err = transport->notify_recv_pre_block(vsk, target, recv_data);
> +			if (err < 0)
> +				goto out;
> +		}
> +
> +		release_sock(sk);
> +		timeout = schedule_timeout(timeout);
> +		lock_sock(sk);
> +
> +		if (signal_pending(current)) {
> +			err = sock_intr_errno(timeout);
> +			goto out;
> +		} else if (timeout == 0) {
> +			err = -EAGAIN;
> +			goto out;
> +		}
> +	}
> +
> +	finish_wait(sk_sleep(sk), wait);
> +
> +	/* Invalid queue pair content. XXX This should
> +	 * be changed to a connection reset in a later
> +	 * change.
> +	 */

Since you are here, could you update this comment to something like:

/* Internal transport error when checking for available
 * data. XXX This should be changed to a connection
 * reset in a later change.
 */

> +	if (data < 0)
> +		return -ENOMEM;
> +
> +	/* Have some data, return. */
> +	if (data)
> +		return data;
> +
> +out:
> +	finish_wait(sk_sleep(sk), wait);
> +	return err;
> +}

I agree with Stefanos suggestion to get rid of the out: part  and just have the single finish_wait().

> +
> static int
> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 			  int flags)
> @@ -1912,85 +1977,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> 
> 
> 	while (1) {
> -		s64 ready;
> +		ssize_t read;
> 
> -		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
> -		ready = vsock_stream_has_data(vsk);
> -
> -		if (ready == 0) {
> -			if (sk->sk_err != 0 ||
> -			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
> -			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -			/* Don't wait for non-blocking sockets. */
> -			if (timeout == 0) {
> -				err = -EAGAIN;
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -
> -			err = transport->notify_recv_pre_block(
> -					vsk, target, &recv_data);
> -			if (err < 0) {
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -			release_sock(sk);
> -			timeout = schedule_timeout(timeout);
> -			lock_sock(sk);
> -
> -			if (signal_pending(current)) {
> -				err = sock_intr_errno(timeout);
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			} else if (timeout == 0) {
> -				err = -EAGAIN;
> -				finish_wait(sk_sleep(sk), &wait);
> -				break;
> -			}
> -		} else {
> -			ssize_t read;
> +		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
> +		if (err <= 0)
> +			break;

There is a small change in the behaviour here if vsock_stream_has_data(vsk)
returned something < 0. Since you just do a break, the err value can be updated
if there is an sk->sk_err, a receive shutdown has been performed or data has
already been copied. That should be ok, though.

> -			finish_wait(sk_sleep(sk), &wait);
> -
> -			if (ready < 0) {
> -				/* Invalid queue pair content. XXX This should
> -				* be changed to a connection reset in a later
> -				* change.
> -				*/
> -
> -				err = -ENOMEM;
> -				goto out;
> -			}
> -
> -			err = transport->notify_recv_pre_dequeue(
> -					vsk, target, &recv_data);
> -			if (err < 0)
> -				break;
> +		err = transport->notify_recv_pre_dequeue(vsk, target,
> +							 &recv_data);
> +		if (err < 0)
> +			break;
> 
> -			read = transport->stream_dequeue(
> -					vsk, msg,
> -					len - copied, flags);
> -			if (read < 0) {
> -				err = -ENOMEM;
> -				break;
> -			}
> +		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
> +		if (read < 0) {
> +			err = -ENOMEM;
> +			break;
> +		}
> 
> -			copied += read;
> +		copied += read;
> 
> -			err = transport->notify_recv_post_dequeue(
> -					vsk, target, read,
> -					!(flags & MSG_PEEK), &recv_data);
> -			if (err < 0)
> -				goto out;
> +		err = transport->notify_recv_post_dequeue(vsk, target, read,
> +						!(flags & MSG_PEEK), &recv_data);
> +		if (err < 0)
> +			goto out;
> 
> -			if (read >= target || flags & MSG_PEEK)
> -				break;
> +		if (read >= target || flags & MSG_PEEK)
> +			break;
> 
> -			target -= read;
> -		}
> +		target -= read;
> 	}
> 
> 	if (sk->sk_err)
> -- 
> 2.25.1
>
Arseny Krasnov Feb. 16, 2021, 6:58 a.m. UTC | #3
On 11.02.2021 18:11, Jorgen Hansen wrote:
>> On 7 Feb 2021, at 16:14, Arseny Krasnov <arseny.krasnov@kaspersky.com> wrote:
>>
>> This moves wait loop for data to dedicated function, because later
>> it will be used by SEQPACKET data receive loop.
>>
>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>> ---
>> net/vmw_vsock/af_vsock.c | 158 +++++++++++++++++++++------------------
>> 1 file changed, 86 insertions(+), 72 deletions(-)
>>
>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>> index f4fabec50650..38927695786f 100644
>> --- a/net/vmw_vsock/af_vsock.c
>> +++ b/net/vmw_vsock/af_vsock.c
>> @@ -1833,6 +1833,71 @@ static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
>> 	return err;
>> }
>>
>> +static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
>> +			   long timeout,
>> +			   struct vsock_transport_recv_notify_data *recv_data,
>> +			   size_t target)
>> +{
>> +	const struct vsock_transport *transport;
>> +	struct vsock_sock *vsk;
>> +	s64 data;
>> +	int err;
>> +
>> +	vsk = vsock_sk(sk);
>> +	err = 0;
>> +	transport = vsk->transport;
>> +	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
>> +
>> +	while ((data = vsock_stream_has_data(vsk)) == 0) {
>> +		if (sk->sk_err != 0 ||
>> +		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>> +		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>> +			goto out;
>> +		}
>> +
>> +		/* Don't wait for non-blocking sockets. */
>> +		if (timeout == 0) {
>> +			err = -EAGAIN;
>> +			goto out;
>> +		}
>> +
>> +		if (recv_data) {
>> +			err = transport->notify_recv_pre_block(vsk, target, recv_data);
>> +			if (err < 0)
>> +				goto out;
>> +		}
>> +
>> +		release_sock(sk);
>> +		timeout = schedule_timeout(timeout);
>> +		lock_sock(sk);
>> +
>> +		if (signal_pending(current)) {
>> +			err = sock_intr_errno(timeout);
>> +			goto out;
>> +		} else if (timeout == 0) {
>> +			err = -EAGAIN;
>> +			goto out;
>> +		}
>> +	}
>> +
>> +	finish_wait(sk_sleep(sk), wait);
>> +
>> +	/* Invalid queue pair content. XXX This should
>> +	 * be changed to a connection reset in a later
>> +	 * change.
>> +	 */
> Since you are here, could you update this comment to something like:
>
> /* Internal transport error when checking for available
>  * data. XXX This should be changed to a connection
>  * reset in a later change.
>  */
>
>> +	if (data < 0)
>> +		return -ENOMEM;
>> +
>> +	/* Have some data, return. */
>> +	if (data)
>> +		return data;
>> +
>> +out:
>> +	finish_wait(sk_sleep(sk), wait);
>> +	return err;
>> +}
> I agree with Stefanos suggestion to get rid of the out: part  and just have the single finish_wait().
>
>> +
>> static int
>> vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>> 			  int flags)
>> @@ -1912,85 +1977,34 @@ vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
>>
>>
>> 	while (1) {
>> -		s64 ready;
>> +		ssize_t read;
>>
>> -		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>> -		ready = vsock_stream_has_data(vsk);
>> -
>> -		if (ready == 0) {
>> -			if (sk->sk_err != 0 ||
>> -			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
>> -			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -			/* Don't wait for non-blocking sockets. */
>> -			if (timeout == 0) {
>> -				err = -EAGAIN;
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -
>> -			err = transport->notify_recv_pre_block(
>> -					vsk, target, &recv_data);
>> -			if (err < 0) {
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -			release_sock(sk);
>> -			timeout = schedule_timeout(timeout);
>> -			lock_sock(sk);
>> -
>> -			if (signal_pending(current)) {
>> -				err = sock_intr_errno(timeout);
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			} else if (timeout == 0) {
>> -				err = -EAGAIN;
>> -				finish_wait(sk_sleep(sk), &wait);
>> -				break;
>> -			}
>> -		} else {
>> -			ssize_t read;
>> +		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
>> +		if (err <= 0)
>> +			break;
> There is a small change in the behaviour here if vsock_stream_has_data(vsk)
> returned something < 0. Since you just do a break, the err value can be updated
> if there is an sk->sk_err, a receive shutdown has been performed or data has
> already been copied. That should be ok, though.

May be i can add the following 'if' after while (1) loop:

There was:

if (sk->sk_err)
    err = -sk->sk->sk_err;
else if (sk->sk_shutdown & RCV_SHUTDOWN)
    err = 0;
if (copied > 0)
    err = copied;

Will be:

if (err == 0) {
    if (sk->sk_err)
        err = -sk->sk->sk_err;
     else if (sk->sk_shutdown & RCV_SHUTDOWN)
       err = 0;

    if (copied > 0)
        err = copied;

}

E.g. update 'err' only if it is clear. Don't touch otherwise


>
>> -			finish_wait(sk_sleep(sk), &wait);
>> -
>> -			if (ready < 0) {
>> -				/* Invalid queue pair content. XXX This should
>> -				* be changed to a connection reset in a later
>> -				* change.
>> -				*/
>> -
>> -				err = -ENOMEM;
>> -				goto out;
>> -			}
>> -
>> -			err = transport->notify_recv_pre_dequeue(
>> -					vsk, target, &recv_data);
>> -			if (err < 0)
>> -				break;
>> +		err = transport->notify_recv_pre_dequeue(vsk, target,
>> +							 &recv_data);
>> +		if (err < 0)
>> +			break;
>>
>> -			read = transport->stream_dequeue(
>> -					vsk, msg,
>> -					len - copied, flags);
>> -			if (read < 0) {
>> -				err = -ENOMEM;
>> -				break;
>> -			}
>> +		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>> +		if (read < 0) {
>> +			err = -ENOMEM;
>> +			break;
>> +		}
>>
>> -			copied += read;
>> +		copied += read;
>>
>> -			err = transport->notify_recv_post_dequeue(
>> -					vsk, target, read,
>> -					!(flags & MSG_PEEK), &recv_data);
>> -			if (err < 0)
>> -				goto out;
>> +		err = transport->notify_recv_post_dequeue(vsk, target, read,
>> +						!(flags & MSG_PEEK), &recv_data);
>> +		if (err < 0)
>> +			goto out;
>>
>> -			if (read >= target || flags & MSG_PEEK)
>> -				break;
>> +		if (read >= target || flags & MSG_PEEK)
>> +			break;
>>
>> -			target -= read;
>> -		}
>> +		target -= read;
>> 	}
>>
>> 	if (sk->sk_err)
>> -- 
>> 2.25.1
>>
>
diff mbox series

Patch

diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
index f4fabec50650..38927695786f 100644
--- a/net/vmw_vsock/af_vsock.c
+++ b/net/vmw_vsock/af_vsock.c
@@ -1833,6 +1833,71 @@  static int vsock_connectible_sendmsg(struct socket *sock, struct msghdr *msg,
 	return err;
 }
 
+static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
+			   long timeout,
+			   struct vsock_transport_recv_notify_data *recv_data,
+			   size_t target)
+{
+	const struct vsock_transport *transport;
+	struct vsock_sock *vsk;
+	s64 data;
+	int err;
+
+	vsk = vsock_sk(sk);
+	err = 0;
+	transport = vsk->transport;
+	prepare_to_wait(sk_sleep(sk), wait, TASK_INTERRUPTIBLE);
+
+	while ((data = vsock_stream_has_data(vsk)) == 0) {
+		if (sk->sk_err != 0 ||
+		    (sk->sk_shutdown & RCV_SHUTDOWN) ||
+		    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
+			goto out;
+		}
+
+		/* Don't wait for non-blocking sockets. */
+		if (timeout == 0) {
+			err = -EAGAIN;
+			goto out;
+		}
+
+		if (recv_data) {
+			err = transport->notify_recv_pre_block(vsk, target, recv_data);
+			if (err < 0)
+				goto out;
+		}
+
+		release_sock(sk);
+		timeout = schedule_timeout(timeout);
+		lock_sock(sk);
+
+		if (signal_pending(current)) {
+			err = sock_intr_errno(timeout);
+			goto out;
+		} else if (timeout == 0) {
+			err = -EAGAIN;
+			goto out;
+		}
+	}
+
+	finish_wait(sk_sleep(sk), wait);
+
+	/* Invalid queue pair content. XXX This should
+	 * be changed to a connection reset in a later
+	 * change.
+	 */
+	if (data < 0)
+		return -ENOMEM;
+
+	/* Have some data, return. */
+	if (data)
+		return data;
+
+out:
+	finish_wait(sk_sleep(sk), wait);
+	return err;
+}
+
 static int
 vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 			  int flags)
@@ -1912,85 +1977,34 @@  vsock_connectible_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 
 
 	while (1) {
-		s64 ready;
+		ssize_t read;
 
-		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
-		ready = vsock_stream_has_data(vsk);
-
-		if (ready == 0) {
-			if (sk->sk_err != 0 ||
-			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
-			    (vsk->peer_shutdown & SEND_SHUTDOWN)) {
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-			/* Don't wait for non-blocking sockets. */
-			if (timeout == 0) {
-				err = -EAGAIN;
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-
-			err = transport->notify_recv_pre_block(
-					vsk, target, &recv_data);
-			if (err < 0) {
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-			release_sock(sk);
-			timeout = schedule_timeout(timeout);
-			lock_sock(sk);
-
-			if (signal_pending(current)) {
-				err = sock_intr_errno(timeout);
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			} else if (timeout == 0) {
-				err = -EAGAIN;
-				finish_wait(sk_sleep(sk), &wait);
-				break;
-			}
-		} else {
-			ssize_t read;
+		err = vsock_wait_data(sk, &wait, timeout, &recv_data, target);
+		if (err <= 0)
+			break;
 
-			finish_wait(sk_sleep(sk), &wait);
-
-			if (ready < 0) {
-				/* Invalid queue pair content. XXX This should
-				* be changed to a connection reset in a later
-				* change.
-				*/
-
-				err = -ENOMEM;
-				goto out;
-			}
-
-			err = transport->notify_recv_pre_dequeue(
-					vsk, target, &recv_data);
-			if (err < 0)
-				break;
+		err = transport->notify_recv_pre_dequeue(vsk, target,
+							 &recv_data);
+		if (err < 0)
+			break;
 
-			read = transport->stream_dequeue(
-					vsk, msg,
-					len - copied, flags);
-			if (read < 0) {
-				err = -ENOMEM;
-				break;
-			}
+		read = transport->stream_dequeue(vsk, msg, len - copied, flags);
+		if (read < 0) {
+			err = -ENOMEM;
+			break;
+		}
 
-			copied += read;
+		copied += read;
 
-			err = transport->notify_recv_post_dequeue(
-					vsk, target, read,
-					!(flags & MSG_PEEK), &recv_data);
-			if (err < 0)
-				goto out;
+		err = transport->notify_recv_post_dequeue(vsk, target, read,
+						!(flags & MSG_PEEK), &recv_data);
+		if (err < 0)
+			goto out;
 
-			if (read >= target || flags & MSG_PEEK)
-				break;
+		if (read >= target || flags & MSG_PEEK)
+			break;
 
-			target -= read;
-		}
+		target -= read;
 	}
 
 	if (sk->sk_err)