diff mbox series

[RFC,v8,11/19] virtio/vsock: dequeue callback for SOCK_SEQPACKET

Message ID 20210413124443.3403382-1-arseny.krasnov@kaspersky.com (mailing list archive)
State New, archived
Headers show
Series virtio/vsock: introduce SOCK_SEQPACKET support | expand

Commit Message

Arseny Krasnov April 13, 2021, 12:44 p.m. UTC
This adds transport callback and it's logic for SEQPACKET dequeue.
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data.

Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
---
v7 -> v8:
 - Things like SEQ_BEGIN, SEQ_END, 'msg_len' and 'msg_id' now removed.
   This callback fetches and copies RW packets to user's buffer, until
   last packet of message found(this packet is marked in 'flags' field
   of header).

 include/linux/virtio_vsock.h            |  5 ++
 net/vmw_vsock/virtio_transport_common.c | 73 +++++++++++++++++++++++++
 2 files changed, 78 insertions(+)

Comments

Arseny Krasnov April 14, 2021, 5:21 a.m. UTC | #1
I'll fix some issues of this patch found by kernel test robot

On 13.04.2021 15:44, Arseny Krasnov wrote:
> This adds transport callback and it's logic for SEQPACKET dequeue.
> Callback fetches RW packets from rx queue of socket until whole record
> is copied(if user's buffer is full, user is not woken up). This is done
> to not stall sender, because if we wake up user and it leaves syscall,
> nobody will send credit update for rest of record, and sender will wait
> for next enter of read syscall at receiver's side. So if user buffer is
> full, we just send credit update and drop data.
>
> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
> ---
> v7 -> v8:
>  - Things like SEQ_BEGIN, SEQ_END, 'msg_len' and 'msg_id' now removed.
>    This callback fetches and copies RW packets to user's buffer, until
>    last packet of message found(this packet is marked in 'flags' field
>    of header).
>
>  include/linux/virtio_vsock.h            |  5 ++
>  net/vmw_vsock/virtio_transport_common.c | 73 +++++++++++++++++++++++++
>  2 files changed, 78 insertions(+)
>
> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
> index dc636b727179..02acf6e9ae04 100644
> --- a/include/linux/virtio_vsock.h
> +++ b/include/linux/virtio_vsock.h
> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>  			       struct msghdr *msg,
>  			       size_t len, int flags);
>  
> +ssize_t
> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> +				   struct msghdr *msg,
> +				   int flags,
> +				   bool *msg_ready);
>  s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>  s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>  
> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
> index 833104b71a1c..8492b8bd5df5 100644
> --- a/net/vmw_vsock/virtio_transport_common.c
> +++ b/net/vmw_vsock/virtio_transport_common.c
> @@ -393,6 +393,67 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>  	return err;
>  }
>  
> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
> +						 struct msghdr *msg,
> +						 int flags,
> +						 bool *msg_ready)
> +{
> +	struct virtio_vsock_sock *vvs = vsk->trans;
> +	struct virtio_vsock_pkt *pkt;
> +	int err = 0;
> +	size_t user_buf_len = msg->msg_iter.count;
> +
> +	*msg_ready = false;
> +	spin_lock_bh(&vvs->rx_lock);
> +
> +	while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
> +
> +		if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RW) {
> +			size_t bytes_to_copy;
> +			size_t pkt_len;
> +
> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
> +			bytes_to_copy = min(user_buf_len, pkt_len);
> +
> +			/* sk_lock is held by caller so no one else can dequeue.
> +			 * Unlock rx_lock since memcpy_to_msg() may sleep.
> +			 */
> +			spin_unlock_bh(&vvs->rx_lock);
> +
> +			if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
> +				err = -EINVAL;
> +				break;
> +			}
> +
> +			spin_lock_bh(&vvs->rx_lock);
> +
> +			/* If user sets 'MSG_TRUNC' we return real length
> +			 * of message.
> +			 */
> +			if (flags & MSG_TRUNC)
> +				err += pkt_len;
> +			else
> +				err += bytes_to_copy;
> +
> +			user_buf_len -= bytes_to_copy;
> +
> +			if (pkt->hdr.flags & VIRTIO_VSOCK_SEQ_EOR)
> +				*msg_ready = true;
> +		}
> +
> +		virtio_transport_dec_rx_pkt(vvs, pkt);
> +		list_del(&pkt->list);
> +		virtio_transport_free_pkt(pkt);
> +	}
> +
> +	spin_unlock_bh(&vvs->rx_lock);
> +
> +	virtio_transport_send_credit_update(vsk);
> +
> +	return err;
> +}
> +
>  ssize_t
>  virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>  				struct msghdr *msg,
> @@ -405,6 +466,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>  }
>  EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>  
> +ssize_t
> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
> +				   struct msghdr *msg,
> +				   int flags, bool *msg_ready)
> +{
> +	if (flags & MSG_PEEK)
> +		return -EOPNOTSUPP;
> +
> +	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags, msg_ready);
> +}
> +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
> +
>  int
>  virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>  			       struct msghdr *msg,
Stefano Garzarella April 21, 2021, 8:56 a.m. UTC | #2
On Tue, Apr 13, 2021 at 03:44:40PM +0300, Arseny Krasnov wrote:
>This adds transport callback and it's logic for SEQPACKET dequeue.
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data.
>
>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>---
>v7 -> v8:
> - Things like SEQ_BEGIN, SEQ_END, 'msg_len' and 'msg_id' now removed.
>   This callback fetches and copies RW packets to user's buffer, until
>   last packet of message found(this packet is marked in 'flags' field
>   of header).
>
> include/linux/virtio_vsock.h            |  5 ++
> net/vmw_vsock/virtio_transport_common.c | 73 +++++++++++++++++++++++++
> 2 files changed, 78 insertions(+)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..02acf6e9ae04 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>                              struct msghdr *msg,
>                              size_t len, int flags);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+                                 struct msghdr *msg,
>+                                 int flags,
>+                                 bool *msg_ready);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index 833104b71a1c..8492b8bd5df5 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -393,6 +393,67 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>       return err;
> }
>
>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+                                               struct msghdr *msg,
>+                                               int flags,
>+                                               bool *msg_ready)
>+{
>+      struct virtio_vsock_sock *vvs = vsk->trans;
>+      struct virtio_vsock_pkt *pkt;
>+      int err = 0;
>+      size_t user_buf_len = msg->msg_iter.count;
>+
>+      *msg_ready = false;
>+      spin_lock_bh(&vvs->rx_lock);
>+
>+      while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
>+              pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+              if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RW) {

Is this check still necessary, should they all be RW?

>+                      size_t bytes_to_copy;
>+                      size_t pkt_len;
>+
>+                      pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+                      bytes_to_copy = min(user_buf_len, pkt_len);
>+

If bytes_to_copy == 0, we can avoid the next steps (release the lock try 
to copy 0 bytes, reacquire the lock)

>+                      /* sk_lock is held by caller so no one else can dequeue.
>+                       * Unlock rx_lock since memcpy_to_msg() may sleep.
>+                       */
>+                      spin_unlock_bh(&vvs->rx_lock);
>+
>+                      if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
>+                              err = -EINVAL;

Here we should reacquire the lock or prevent it from being released out
of cycle.

>+                              break;
>+                      }
>+
>+                      spin_lock_bh(&vvs->rx_lock);
>+

As mentioned before, I think we could move this part into the core and 
here always return the real dimension.

>+                      /* If user sets 'MSG_TRUNC' we return real 
>length
>+                       * of message.
>+                       */
>+                      if (flags & MSG_TRUNC)
>+                              err += pkt_len;
>+                      else
>+                              err += bytes_to_copy;
>+
>+                      user_buf_len -= bytes_to_copy;
>+
>+                      if (pkt->hdr.flags & VIRTIO_VSOCK_SEQ_EOR)
                                     ^
We should use le32_to_cpu() to read the flags.


>+                              *msg_ready = true;
>+              }
>+
>+              virtio_transport_dec_rx_pkt(vvs, pkt);
>+              list_del(&pkt->list);
>+              virtio_transport_free_pkt(pkt);
>+      }
>+
>+      spin_unlock_bh(&vvs->rx_lock);
>+
>+      virtio_transport_send_credit_update(vsk);
>+
>+      return err;
>+}
>+
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>                               struct msghdr *msg,
>@@ -405,6 +466,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+                                 struct msghdr *msg,
>+                                 int flags, bool *msg_ready)
>+{
>+      if (flags & MSG_PEEK)
>+              return -EOPNOTSUPP;
>+
>+      return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags,
>msg_ready);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>+
> int
> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>                              struct msghdr *msg,
>--
>2.25.1
>
Arseny Krasnov April 21, 2021, 2:49 p.m. UTC | #3
On 21.04.2021 11:56, Stefano Garzarella wrote:
> On Tue, Apr 13, 2021 at 03:44:40PM +0300, Arseny Krasnov wrote:
>> This adds transport callback and it's logic for SEQPACKET dequeue.
>> Callback fetches RW packets from rx queue of socket until whole record
>> is copied(if user's buffer is full, user is not woken up). This is done
>> to not stall sender, because if we wake up user and it leaves syscall,
>> nobody will send credit update for rest of record, and sender will wait
>> for next enter of read syscall at receiver's side. So if user buffer is
>> full, we just send credit update and drop data.
>>
>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>> ---
>> v7 -> v8:
>> - Things like SEQ_BEGIN, SEQ_END, 'msg_len' and 'msg_id' now removed.
>>   This callback fetches and copies RW packets to user's buffer, until
>>   last packet of message found(this packet is marked in 'flags' field
>>   of header).
>>
>> include/linux/virtio_vsock.h            |  5 ++
>> net/vmw_vsock/virtio_transport_common.c | 73 +++++++++++++++++++++++++
>> 2 files changed, 78 insertions(+)
>>
>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> index dc636b727179..02acf6e9ae04 100644
>> --- a/include/linux/virtio_vsock.h
>> +++ b/include/linux/virtio_vsock.h
>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>                              struct msghdr *msg,
>>                              size_t len, int flags);
>>
>> +ssize_t
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> +                                 struct msghdr *msg,
>> +                                 int flags,
>> +                                 bool *msg_ready);
>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>
>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>> index 833104b71a1c..8492b8bd5df5 100644
>> --- a/net/vmw_vsock/virtio_transport_common.c
>> +++ b/net/vmw_vsock/virtio_transport_common.c
>> @@ -393,6 +393,67 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>       return err;
>> }
>>
>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>> +                                               struct msghdr *msg,
>> +                                               int flags,
>> +                                               bool *msg_ready)
>> +{
>> +      struct virtio_vsock_sock *vvs = vsk->trans;
>> +      struct virtio_vsock_pkt *pkt;
>> +      int err = 0;
>> +      size_t user_buf_len = msg->msg_iter.count;
>> +
>> +      *msg_ready = false;
>> +      spin_lock_bh(&vvs->rx_lock);
>> +
>> +      while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
>> +              pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>> +
>> +              if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RW) {
> Is this check still necessary, should they all be RW?
Ack
>
>> +                      size_t bytes_to_copy;
>> +                      size_t pkt_len;
>> +
>> +                      pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>> +                      bytes_to_copy = min(user_buf_len, pkt_len);
>> +
> If bytes_to_copy == 0, we can avoid the next steps (release the lock try 
> to copy 0 bytes, reacquire the lock)
Ack
>
>> +                      /* sk_lock is held by caller so no one else can dequeue.
>> +                       * Unlock rx_lock since memcpy_to_msg() may sleep.
>> +                       */
>> +                      spin_unlock_bh(&vvs->rx_lock);
>> +
>> +                      if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
>> +                              err = -EINVAL;
> Here we should reacquire the lock or prevent it from being released out
> of cycle.
Ack
>
>> +                              break;
>> +                      }
>> +
>> +                      spin_lock_bh(&vvs->rx_lock);
>> +
> As mentioned before, I think we could move this part into the core and 
> here always return the real dimension.
Ack
>
>> +                      /* If user sets 'MSG_TRUNC' we return real 
>> length
>> +                       * of message.
>> +                       */
>> +                      if (flags & MSG_TRUNC)
>> +                              err += pkt_len;
>> +                      else
>> +                              err += bytes_to_copy;
>> +
>> +                      user_buf_len -= bytes_to_copy;
>> +
>> +                      if (pkt->hdr.flags & VIRTIO_VSOCK_SEQ_EOR)
>                                      ^
> We should use le32_to_cpu() to read the flags.
>
Ack
>> +                              *msg_ready = true;
>> +              }
>> +
>> +              virtio_transport_dec_rx_pkt(vvs, pkt);
>> +              list_del(&pkt->list);
>> +              virtio_transport_free_pkt(pkt);
>> +      }
>> +
>> +      spin_unlock_bh(&vvs->rx_lock);
>> +
>> +      virtio_transport_send_credit_update(vsk);
>> +
>> +      return err;
>> +}
>> +
>> ssize_t
>> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>>                               struct msghdr *msg,
>> @@ -405,6 +466,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>> }
>> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>>
>> +ssize_t
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> +                                 struct msghdr *msg,
>> +                                 int flags, bool *msg_ready)
>> +{
>> +      if (flags & MSG_PEEK)
>> +              return -EOPNOTSUPP;
>> +
>> +      return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags,
>> msg_ready);
>> +}
>> +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>> +
>> int
>> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>                              struct msghdr *msg,
>> --
>> 2.25.1
>>
>
diff mbox series

Patch

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..02acf6e9ae04 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -80,6 +80,11 @@  virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,
 			       size_t len, int flags);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags,
+				   bool *msg_ready);
 s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
 s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
 
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index 833104b71a1c..8492b8bd5df5 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -393,6 +393,67 @@  virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
 	return err;
 }
 
+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+						 struct msghdr *msg,
+						 int flags,
+						 bool *msg_ready)
+{
+	struct virtio_vsock_sock *vvs = vsk->trans;
+	struct virtio_vsock_pkt *pkt;
+	int err = 0;
+	size_t user_buf_len = msg->msg_iter.count;
+
+	*msg_ready = false;
+	spin_lock_bh(&vvs->rx_lock);
+
+	while (!*msg_ready && !list_empty(&vvs->rx_queue) && err >= 0) {
+		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+		if (le16_to_cpu(pkt->hdr.op) == VIRTIO_VSOCK_OP_RW) {
+			size_t bytes_to_copy;
+			size_t pkt_len;
+
+			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+			bytes_to_copy = min(user_buf_len, pkt_len);
+
+			/* sk_lock is held by caller so no one else can dequeue.
+			 * Unlock rx_lock since memcpy_to_msg() may sleep.
+			 */
+			spin_unlock_bh(&vvs->rx_lock);
+
+			if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) {
+				err = -EINVAL;
+				break;
+			}
+
+			spin_lock_bh(&vvs->rx_lock);
+
+			/* If user sets 'MSG_TRUNC' we return real length
+			 * of message.
+			 */
+			if (flags & MSG_TRUNC)
+				err += pkt_len;
+			else
+				err += bytes_to_copy;
+
+			user_buf_len -= bytes_to_copy;
+
+			if (pkt->hdr.flags & VIRTIO_VSOCK_SEQ_EOR)
+				*msg_ready = true;
+		}
+
+		virtio_transport_dec_rx_pkt(vvs, pkt);
+		list_del(&pkt->list);
+		virtio_transport_free_pkt(pkt);
+	}
+
+	spin_unlock_bh(&vvs->rx_lock);
+
+	virtio_transport_send_credit_update(vsk);
+
+	return err;
+}
+
 ssize_t
 virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 				struct msghdr *msg,
@@ -405,6 +466,18 @@  virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 }
 EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags, bool *msg_ready)
+{
+	if (flags & MSG_PEEK)
+		return -EOPNOTSUPP;
+
+	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags, msg_ready);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
 int
 virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,