diff mbox series

[v11,11/18] virtio/vsock: dequeue callback for SOCK_SEQPACKET

Message ID 20210611111241.3652274-1-arseny.krasnov@kaspersky.com (mailing list archive)
State Accepted
Commit 44931195a5412a97c46d299227fbabad4e09010d
Delegated to: Netdev Maintainers
Headers show
Series virtio/vsock: introduce SOCK_SEQPACKET support | expand

Checks

Context Check Description
netdev/cover_letter success Link
netdev/fixes_present success Link
netdev/patch_count fail Series longer than 15 patches
netdev/tree_selection success Guessed tree name to be net-next
netdev/subject_prefix success Link
netdev/cc_maintainers success CCed 9 of 9 maintainers
netdev/source_inline success Was 0 now: 0
netdev/verify_signedoff success Link
netdev/module_param success Was 0 now: 0
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/verify_fixes success Link
netdev/checkpatch warning WARNING: line length of 82 exceeds 80 columns WARNING: line length of 83 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns WARNING: line length of 88 exceeds 80 columns
netdev/build_allmodconfig_warn success Errors and warnings before: 0 this patch: 0
netdev/header_inline success Link

Commit Message

Arseny Krasnov June 11, 2021, 11:12 a.m. UTC
Callback fetches RW packets from rx queue of socket until whole record
is copied(if user's buffer is full, user is not woken up). This is done
to not stall sender, because if we wake up user and it leaves syscall,
nobody will send credit update for rest of record, and sender will wait
for next enter of read syscall at receiver's side. So if user buffer is
full, we just send credit update and drop data.

Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
---
 v10 -> v11:
 1) 'msg_count' field added to count current number of EORs.
 2) 'msg_ready' argument removed from callback.
 3) If 'memcpy_to_msg()' failed during copy loop, there will be
    no next attempts to copy data, rest of record will be freed.

 include/linux/virtio_vsock.h            |  5 ++
 net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
 2 files changed, 89 insertions(+)

Comments

Stefano Garzarella June 18, 2021, 1:44 p.m. UTC | #1
Hi Arseny,
the series looks great, I have just a question below about 
seqpacket_dequeue.

I also sent a couple a simple fixes, it would be great if you can review 
them: 
https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/


On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>Callback fetches RW packets from rx queue of socket until whole record
>is copied(if user's buffer is full, user is not woken up). This is done
>to not stall sender, because if we wake up user and it leaves syscall,
>nobody will send credit update for rest of record, and sender will wait
>for next enter of read syscall at receiver's side. So if user buffer is
>full, we just send credit update and drop data.
>
>Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>---
> v10 -> v11:
> 1) 'msg_count' field added to count current number of EORs.
> 2) 'msg_ready' argument removed from callback.
> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>    no next attempts to copy data, rest of record will be freed.
>
> include/linux/virtio_vsock.h            |  5 ++
> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
> 2 files changed, 89 insertions(+)
>
>diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>index dc636b727179..1d9a302cb91d 100644
>--- a/include/linux/virtio_vsock.h
>+++ b/include/linux/virtio_vsock.h
>@@ -36,6 +36,7 @@ struct virtio_vsock_sock {
> 	u32 rx_bytes;
> 	u32 buf_alloc;
> 	struct list_head rx_queue;
>+	u32 msg_count;
> };
>
> struct virtio_vsock_pkt {
>@@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> 			       struct msghdr *msg,
> 			       size_t len, int flags);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+				   struct msghdr *msg,
>+				   int flags);
> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>
>diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>index ad0d34d41444..1e1df19ec164 100644
>--- a/net/vmw_vsock/virtio_transport_common.c
>+++ b/net/vmw_vsock/virtio_transport_common.c
>@@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
> 	return err;
> }
>
>+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>+						 struct msghdr *msg,
>+						 int flags)
>+{
>+	struct virtio_vsock_sock *vvs = vsk->trans;
>+	struct virtio_vsock_pkt *pkt;
>+	int dequeued_len = 0;
>+	size_t user_buf_len = msg_data_left(msg);
>+	bool copy_failed = false;
>+	bool msg_ready = false;
>+
>+	spin_lock_bh(&vvs->rx_lock);
>+
>+	if (vvs->msg_count == 0) {
>+		spin_unlock_bh(&vvs->rx_lock);
>+		return 0;
>+	}
>+
>+	while (!msg_ready) {
>+		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>+
>+		if (!copy_failed) {
>+			size_t pkt_len;
>+			size_t bytes_to_copy;
>+
>+			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>+			bytes_to_copy = min(user_buf_len, pkt_len);
>+
>+			if (bytes_to_copy) {
>+				int err;
>+
>+				/* sk_lock is held by caller so no one else can dequeue.
>+				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>+				 */
>+				spin_unlock_bh(&vvs->rx_lock);
>+
>+				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>+				if (err) {
>+					/* Copy of message failed, set flag to skip
>+					 * copy path for rest of fragments. Rest of
>+					 * fragments will be freed without copy.
>+					 */
>+					copy_failed = true;
>+					dequeued_len = err;

If we fail to copy the message we will discard the entire packet.
Is it acceptable for the user point of view, or we should leave the 
packet in the queue and the user can retry, maybe with a different 
buffer?

Then we can remove the packets only when we successfully copied all the 
fragments.

I'm not sure make sense, maybe better to check also other 
implementations :-)

Thanks,
Stefano

>+				} else {
>+					user_buf_len -= bytes_to_copy;
>+				}
>+
>+				spin_lock_bh(&vvs->rx_lock);
>+			}
>+
>+			if (dequeued_len >= 0)
>+				dequeued_len += pkt_len;
>+		}
>+
>+		if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
>+			msg_ready = true;
>+			vvs->msg_count--;
>+		}
>+
>+		virtio_transport_dec_rx_pkt(vvs, pkt);
>+		list_del(&pkt->list);
>+		virtio_transport_free_pkt(pkt);
>+	}
>+
>+	spin_unlock_bh(&vvs->rx_lock);
>+
>+	virtio_transport_send_credit_update(vsk);
>+
>+	return dequeued_len;
>+}
>+
> ssize_t
> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> 				struct msghdr *msg,
>@@ -405,6 +477,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
> }
> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>
>+ssize_t
>+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>+				   struct msghdr *msg,
>+				   int flags)
>+{
>+	if (flags & MSG_PEEK)
>+		return -EOPNOTSUPP;
>+
>+	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags);
>+}
>+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>+
> int
> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
> 			       struct msghdr *msg,
>-- 
>2.25.1
>
Michael S. Tsirkin June 18, 2021, 1:51 p.m. UTC | #2
On Fri, Jun 18, 2021 at 03:44:23PM +0200, Stefano Garzarella wrote:
> Hi Arseny,
> the series looks great, I have just a question below about
> seqpacket_dequeue.
> 
> I also sent a couple a simple fixes, it would be great if you can review
> them:
> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/

So given this was picked into net next, what's the plan? Just make spec
follow code? We can wait and see, if there are issues with the spec just
remember to mask the feature before release.
Stefano Garzarella June 18, 2021, 2:44 p.m. UTC | #3
On Fri, Jun 18, 2021 at 09:51:44AM -0400, Michael S. Tsirkin wrote:
>On Fri, Jun 18, 2021 at 03:44:23PM +0200, Stefano Garzarella wrote:
>> Hi Arseny,
>> the series looks great, I have just a question below about
>> seqpacket_dequeue.
>>
>> I also sent a couple a simple fixes, it would be great if you can review
>> them:
>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>
>So given this was picked into net next, what's the plan? Just make spec
>follow code? We can wait and see, if there are issues with the spec just
>remember to mask the feature before release.

Yep, the spec patches was already posted, but not merged yet: 
https://lists.oasis-open.org/archives/virtio-comment/202105/msg00017.html

The changes are quite small and they are aligned with the current 
implementation.

Anyway, I perfectly agree with you about waiting and mask it before 
v5.14 release if there are any issue.

Thanks,
Stefano
Arseny Krasnov June 18, 2021, 3:04 p.m. UTC | #4
On 18.06.2021 16:44, Stefano Garzarella wrote:
> Hi Arseny,
> the series looks great, I have just a question below about 
> seqpacket_dequeue.
>
> I also sent a couple a simple fixes, it would be great if you can review 
> them: 
> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>
>
> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>> Callback fetches RW packets from rx queue of socket until whole record
>> is copied(if user's buffer is full, user is not woken up). This is done
>> to not stall sender, because if we wake up user and it leaves syscall,
>> nobody will send credit update for rest of record, and sender will wait
>> for next enter of read syscall at receiver's side. So if user buffer is
>> full, we just send credit update and drop data.
>>
>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>> ---
>> v10 -> v11:
>> 1) 'msg_count' field added to count current number of EORs.
>> 2) 'msg_ready' argument removed from callback.
>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>    no next attempts to copy data, rest of record will be freed.
>>
>> include/linux/virtio_vsock.h            |  5 ++
>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>> 2 files changed, 89 insertions(+)
>>
>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>> index dc636b727179..1d9a302cb91d 100644
>> --- a/include/linux/virtio_vsock.h
>> +++ b/include/linux/virtio_vsock.h
>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>> 	u32 rx_bytes;
>> 	u32 buf_alloc;
>> 	struct list_head rx_queue;
>> +	u32 msg_count;
>> };
>>
>> struct virtio_vsock_pkt {
>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>> 			       struct msghdr *msg,
>> 			       size_t len, int flags);
>>
>> +ssize_t
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> +				   struct msghdr *msg,
>> +				   int flags);
>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>
>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>> index ad0d34d41444..1e1df19ec164 100644
>> --- a/net/vmw_vsock/virtio_transport_common.c
>> +++ b/net/vmw_vsock/virtio_transport_common.c
>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>> 	return err;
>> }
>>
>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>> +						 struct msghdr *msg,
>> +						 int flags)
>> +{
>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>> +	struct virtio_vsock_pkt *pkt;
>> +	int dequeued_len = 0;
>> +	size_t user_buf_len = msg_data_left(msg);
>> +	bool copy_failed = false;
>> +	bool msg_ready = false;
>> +
>> +	spin_lock_bh(&vvs->rx_lock);
>> +
>> +	if (vvs->msg_count == 0) {
>> +		spin_unlock_bh(&vvs->rx_lock);
>> +		return 0;
>> +	}
>> +
>> +	while (!msg_ready) {
>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>> +
>> +		if (!copy_failed) {
>> +			size_t pkt_len;
>> +			size_t bytes_to_copy;
>> +
>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>> +
>> +			if (bytes_to_copy) {
>> +				int err;
>> +
>> +				/* sk_lock is held by caller so no one else can dequeue.
>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>> +				 */
>> +				spin_unlock_bh(&vvs->rx_lock);
>> +
>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>> +				if (err) {
>> +					/* Copy of message failed, set flag to skip
>> +					 * copy path for rest of fragments. Rest of
>> +					 * fragments will be freed without copy.
>> +					 */
>> +					copy_failed = true;
>> +					dequeued_len = err;
> If we fail to copy the message we will discard the entire packet.
> Is it acceptable for the user point of view, or we should leave the 
> packet in the queue and the user can retry, maybe with a different 
> buffer?
>
> Then we can remove the packets only when we successfully copied all the 
> fragments.
>
> I'm not sure make sense, maybe better to check also other 
> implementations :-)
>
> Thanks,
> Stefano

Understand, i'll check it on weekend, anyway I think it is

not critical for implementation.


I have another question: may be it is useful to research for

approach where packets are not queued until whole message

is received, but copied to user's buffer thus freeing memory.

(like previous implementation, of course with solution of problem

where part of message still in queue, while reader was woken

by timeout or signal).

I think it is better, because  in current version, sender may set

'peer_alloc_buf' to  for example 1MB, so at receiver we get

1MB of 'kmalloc()' memory allocated, while having user's buffer

to copy data there or drop it(if user's buffer is full). This way

won't change spec(e.g. no message id or SEQ_BEGIN will be added).


What do You think?

>
>> +				} else {
>> +					user_buf_len -= bytes_to_copy;
>> +				}
>> +
>> +				spin_lock_bh(&vvs->rx_lock);
>> +			}
>> +
>> +			if (dequeued_len >= 0)
>> +				dequeued_len += pkt_len;
>> +		}
>> +
>> +		if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
>> +			msg_ready = true;
>> +			vvs->msg_count--;
>> +		}
>> +
>> +		virtio_transport_dec_rx_pkt(vvs, pkt);
>> +		list_del(&pkt->list);
>> +		virtio_transport_free_pkt(pkt);
>> +	}
>> +
>> +	spin_unlock_bh(&vvs->rx_lock);
>> +
>> +	virtio_transport_send_credit_update(vsk);
>> +
>> +	return dequeued_len;
>> +}
>> +
>> ssize_t
>> virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>> 				struct msghdr *msg,
>> @@ -405,6 +477,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk,
>> }
>> EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
>>
>> +ssize_t
>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>> +				   struct msghdr *msg,
>> +				   int flags)
>> +{
>> +	if (flags & MSG_PEEK)
>> +		return -EOPNOTSUPP;
>> +
>> +	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags);
>> +}
>> +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
>> +
>> int
>> virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>> 			       struct msghdr *msg,
>> -- 
>> 2.25.1
>>
>
Stefano Garzarella June 18, 2021, 3:55 p.m. UTC | #5
On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>
>On 18.06.2021 16:44, Stefano Garzarella wrote:
>> Hi Arseny,
>> the series looks great, I have just a question below about
>> seqpacket_dequeue.
>>
>> I also sent a couple a simple fixes, it would be great if you can review
>> them:
>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>
>>
>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>> Callback fetches RW packets from rx queue of socket until whole record
>>> is copied(if user's buffer is full, user is not woken up). This is done
>>> to not stall sender, because if we wake up user and it leaves syscall,
>>> nobody will send credit update for rest of record, and sender will wait
>>> for next enter of read syscall at receiver's side. So if user buffer is
>>> full, we just send credit update and drop data.
>>>
>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>> ---
>>> v10 -> v11:
>>> 1) 'msg_count' field added to count current number of EORs.
>>> 2) 'msg_ready' argument removed from callback.
>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>    no next attempts to copy data, rest of record will be freed.
>>>
>>> include/linux/virtio_vsock.h            |  5 ++
>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>> 2 files changed, 89 insertions(+)
>>>
>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>> index dc636b727179..1d9a302cb91d 100644
>>> --- a/include/linux/virtio_vsock.h
>>> +++ b/include/linux/virtio_vsock.h
>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>> 	u32 rx_bytes;
>>> 	u32 buf_alloc;
>>> 	struct list_head rx_queue;
>>> +	u32 msg_count;
>>> };
>>>
>>> struct virtio_vsock_pkt {
>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>> 			       struct msghdr *msg,
>>> 			       size_t len, int flags);
>>>
>>> +ssize_t
>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>> +				   struct msghdr *msg,
>>> +				   int flags);
>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>
>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>> index ad0d34d41444..1e1df19ec164 100644
>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>> 	return err;
>>> }
>>>
>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>> +						 struct msghdr *msg,
>>> +						 int flags)
>>> +{
>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>> +	struct virtio_vsock_pkt *pkt;
>>> +	int dequeued_len = 0;
>>> +	size_t user_buf_len = msg_data_left(msg);
>>> +	bool copy_failed = false;
>>> +	bool msg_ready = false;
>>> +
>>> +	spin_lock_bh(&vvs->rx_lock);
>>> +
>>> +	if (vvs->msg_count == 0) {
>>> +		spin_unlock_bh(&vvs->rx_lock);
>>> +		return 0;
>>> +	}
>>> +
>>> +	while (!msg_ready) {
>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>> +
>>> +		if (!copy_failed) {
>>> +			size_t pkt_len;
>>> +			size_t bytes_to_copy;
>>> +
>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>> +
>>> +			if (bytes_to_copy) {
>>> +				int err;
>>> +
>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>> +				 */
>>> +				spin_unlock_bh(&vvs->rx_lock);
>>> +
>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>> +				if (err) {
>>> +					/* Copy of message failed, set flag to skip
>>> +					 * copy path for rest of fragments. Rest of
>>> +					 * fragments will be freed without copy.
>>> +					 */
>>> +					copy_failed = true;
>>> +					dequeued_len = err;
>> If we fail to copy the message we will discard the entire packet.
>> Is it acceptable for the user point of view, or we should leave the
>> packet in the queue and the user can retry, maybe with a different
>> buffer?
>>
>> Then we can remove the packets only when we successfully copied all the
>> fragments.
>>
>> I'm not sure make sense, maybe better to check also other
>> implementations :-)
>>
>> Thanks,
>> Stefano
>
>Understand, i'll check it on weekend, anyway I think it is
>not critical for implementation.

Yep, I agree.

>
>
>I have another question: may be it is useful to research for
>approach where packets are not queued until whole message
>is received, but copied to user's buffer thus freeing memory.
>(like previous implementation, of course with solution of problem
>where part of message still in queue, while reader was woken
>by timeout or signal).
>
>I think it is better, because  in current version, sender may set
>'peer_alloc_buf' to  for example 1MB, so at receiver we get
>1MB of 'kmalloc()' memory allocated, while having user's buffer
>to copy data there or drop it(if user's buffer is full). This way
>won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>
>What do You think?

Yep, I see your point and it would be great, but I think the main issues 
to fix is how to handle a signal while we are waiting other fragments 
since the other peer can take unspecified time to send them.

Note that the 'peer_alloc_buf' in the sender, is the value get from the 
receiver, so if the receiver doesn't want to allocate 1MB, can advertise 
a small buffer size.

Thanks,
Stefano
Arseny Krasnov June 18, 2021, 4:08 p.m. UTC | #6
On 18.06.2021 18:55, Stefano Garzarella wrote:
> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>> Hi Arseny,
>>> the series looks great, I have just a question below about
>>> seqpacket_dequeue.
>>>
>>> I also sent a couple a simple fixes, it would be great if you can review
>>> them:
>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>
>>>
>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>> nobody will send credit update for rest of record, and sender will wait
>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>> full, we just send credit update and drop data.
>>>>
>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>> ---
>>>> v10 -> v11:
>>>> 1) 'msg_count' field added to count current number of EORs.
>>>> 2) 'msg_ready' argument removed from callback.
>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>    no next attempts to copy data, rest of record will be freed.
>>>>
>>>> include/linux/virtio_vsock.h            |  5 ++
>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>> 2 files changed, 89 insertions(+)
>>>>
>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>> index dc636b727179..1d9a302cb91d 100644
>>>> --- a/include/linux/virtio_vsock.h
>>>> +++ b/include/linux/virtio_vsock.h
>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>> 	u32 rx_bytes;
>>>> 	u32 buf_alloc;
>>>> 	struct list_head rx_queue;
>>>> +	u32 msg_count;
>>>> };
>>>>
>>>> struct virtio_vsock_pkt {
>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>> 			       struct msghdr *msg,
>>>> 			       size_t len, int flags);
>>>>
>>>> +ssize_t
>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>> +				   struct msghdr *msg,
>>>> +				   int flags);
>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>
>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>> index ad0d34d41444..1e1df19ec164 100644
>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>> 	return err;
>>>> }
>>>>
>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>> +						 struct msghdr *msg,
>>>> +						 int flags)
>>>> +{
>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>> +	struct virtio_vsock_pkt *pkt;
>>>> +	int dequeued_len = 0;
>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>> +	bool copy_failed = false;
>>>> +	bool msg_ready = false;
>>>> +
>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>> +
>>>> +	if (vvs->msg_count == 0) {
>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>> +		return 0;
>>>> +	}
>>>> +
>>>> +	while (!msg_ready) {
>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>> +
>>>> +		if (!copy_failed) {
>>>> +			size_t pkt_len;
>>>> +			size_t bytes_to_copy;
>>>> +
>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>> +
>>>> +			if (bytes_to_copy) {
>>>> +				int err;
>>>> +
>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>> +				 */
>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>> +
>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>> +				if (err) {
>>>> +					/* Copy of message failed, set flag to skip
>>>> +					 * copy path for rest of fragments. Rest of
>>>> +					 * fragments will be freed without copy.
>>>> +					 */
>>>> +					copy_failed = true;
>>>> +					dequeued_len = err;
>>> If we fail to copy the message we will discard the entire packet.
>>> Is it acceptable for the user point of view, or we should leave the
>>> packet in the queue and the user can retry, maybe with a different
>>> buffer?
>>>
>>> Then we can remove the packets only when we successfully copied all the
>>> fragments.
>>>
>>> I'm not sure make sense, maybe better to check also other
>>> implementations :-)
>>>
>>> Thanks,
>>> Stefano
>> Understand, i'll check it on weekend, anyway I think it is
>> not critical for implementation.
> Yep, I agree.
>
>>
>> I have another question: may be it is useful to research for
>> approach where packets are not queued until whole message
>> is received, but copied to user's buffer thus freeing memory.
>> (like previous implementation, of course with solution of problem
>> where part of message still in queue, while reader was woken
>> by timeout or signal).
>>
>> I think it is better, because  in current version, sender may set
>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>> to copy data there or drop it(if user's buffer is full). This way
>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>
>> What do You think?
> Yep, I see your point and it would be great, but I think the main issues 
> to fix is how to handle a signal while we are waiting other fragments 
> since the other peer can take unspecified time to send them.

What about transport callback, something like 'seqpacket_drain()' or

'seqpacket_drop_curr()' - when we got signal or timeout, notify transport

to drop current message. In virtio case this will set special flag in transport,

so on next dequeue, this flag is checked and if it is set - we drop all packets

until EOR found. Then we can copy untouched new record.

> Note that the 'peer_alloc_buf' in the sender, is the value get from the 
> receiver, so if the receiver doesn't want to allocate 1MB, can advertise 
> a small buffer size.
>
> Thanks,
> Stefano
>
>
Stefano Garzarella June 18, 2021, 4:25 p.m. UTC | #7
On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>
>On 18.06.2021 18:55, Stefano Garzarella wrote:
>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>> Hi Arseny,
>>>> the series looks great, I have just a question below about
>>>> seqpacket_dequeue.
>>>>
>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>> them:
>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>>
>>>>
>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>> full, we just send credit update and drop data.
>>>>>
>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>>> ---
>>>>> v10 -> v11:
>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>> 2) 'msg_ready' argument removed from callback.
>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>    no next attempts to copy data, rest of record will be freed.
>>>>>
>>>>> include/linux/virtio_vsock.h            |  5 ++
>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>> 2 files changed, 89 insertions(+)
>>>>>
>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>> --- a/include/linux/virtio_vsock.h
>>>>> +++ b/include/linux/virtio_vsock.h
>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>> 	u32 rx_bytes;
>>>>> 	u32 buf_alloc;
>>>>> 	struct list_head rx_queue;
>>>>> +	u32 msg_count;
>>>>> };
>>>>>
>>>>> struct virtio_vsock_pkt {
>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>> 			       struct msghdr *msg,
>>>>> 			       size_t len, int flags);
>>>>>
>>>>> +ssize_t
>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>> +				   struct msghdr *msg,
>>>>> +				   int flags);
>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>
>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>> 	return err;
>>>>> }
>>>>>
>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>> +						 struct msghdr *msg,
>>>>> +						 int flags)
>>>>> +{
>>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>>> +	struct virtio_vsock_pkt *pkt;
>>>>> +	int dequeued_len = 0;
>>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>>> +	bool copy_failed = false;
>>>>> +	bool msg_ready = false;
>>>>> +
>>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>>> +
>>>>> +	if (vvs->msg_count == 0) {
>>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>>> +		return 0;
>>>>> +	}
>>>>> +
>>>>> +	while (!msg_ready) {
>>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>> +
>>>>> +		if (!copy_failed) {
>>>>> +			size_t pkt_len;
>>>>> +			size_t bytes_to_copy;
>>>>> +
>>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>>> +
>>>>> +			if (bytes_to_copy) {
>>>>> +				int err;
>>>>> +
>>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>> +				 */
>>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>>> +
>>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>> +				if (err) {
>>>>> +					/* Copy of message failed, set flag to skip
>>>>> +					 * copy path for rest of fragments. Rest of
>>>>> +					 * fragments will be freed without copy.
>>>>> +					 */
>>>>> +					copy_failed = true;
>>>>> +					dequeued_len = err;
>>>> If we fail to copy the message we will discard the entire packet.
>>>> Is it acceptable for the user point of view, or we should leave the
>>>> packet in the queue and the user can retry, maybe with a different
>>>> buffer?
>>>>
>>>> Then we can remove the packets only when we successfully copied all the
>>>> fragments.
>>>>
>>>> I'm not sure make sense, maybe better to check also other
>>>> implementations :-)
>>>>
>>>> Thanks,
>>>> Stefano
>>> Understand, i'll check it on weekend, anyway I think it is
>>> not critical for implementation.
>> Yep, I agree.
>>
>>>
>>> I have another question: may be it is useful to research for
>>> approach where packets are not queued until whole message
>>> is received, but copied to user's buffer thus freeing memory.
>>> (like previous implementation, of course with solution of problem
>>> where part of message still in queue, while reader was woken
>>> by timeout or signal).
>>>
>>> I think it is better, because  in current version, sender may set
>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>> to copy data there or drop it(if user's buffer is full). This way
>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>
>>> What do You think?
>> Yep, I see your point and it would be great, but I think the main issues
>> to fix is how to handle a signal while we are waiting other fragments
>> since the other peer can take unspecified time to send them.
>
>What about transport callback, something like 'seqpacket_drain()' or
>
>'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>
>to drop current message. In virtio case this will set special flag in transport,
>
>so on next dequeue, this flag is checked and if it is set - we drop all packets
>
>until EOR found. Then we can copy untouched new record.
>

But in this way, we will lose the entire message.

Is it acceptable for seqpacket?

Stefano
Arseny Krasnov June 18, 2021, 4:26 p.m. UTC | #8
On 18.06.2021 19:25, Stefano Garzarella wrote:
> On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>> On 18.06.2021 18:55, Stefano Garzarella wrote:
>>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>>> Hi Arseny,
>>>>> the series looks great, I have just a question below about
>>>>> seqpacket_dequeue.
>>>>>
>>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>>> them:
>>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>>>
>>>>>
>>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>>> full, we just send credit update and drop data.
>>>>>>
>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>>>> ---
>>>>>> v10 -> v11:
>>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>>> 2) 'msg_ready' argument removed from callback.
>>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>>    no next attempts to copy data, rest of record will be freed.
>>>>>>
>>>>>> include/linux/virtio_vsock.h            |  5 ++
>>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>>> 2 files changed, 89 insertions(+)
>>>>>>
>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>>> --- a/include/linux/virtio_vsock.h
>>>>>> +++ b/include/linux/virtio_vsock.h
>>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>>> 	u32 rx_bytes;
>>>>>> 	u32 buf_alloc;
>>>>>> 	struct list_head rx_queue;
>>>>>> +	u32 msg_count;
>>>>>> };
>>>>>>
>>>>>> struct virtio_vsock_pkt {
>>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>>> 			       struct msghdr *msg,
>>>>>> 			       size_t len, int flags);
>>>>>>
>>>>>> +ssize_t
>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>>> +				   struct msghdr *msg,
>>>>>> +				   int flags);
>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>>
>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>>> 	return err;
>>>>>> }
>>>>>>
>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>>> +						 struct msghdr *msg,
>>>>>> +						 int flags)
>>>>>> +{
>>>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>>>> +	struct virtio_vsock_pkt *pkt;
>>>>>> +	int dequeued_len = 0;
>>>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>>>> +	bool copy_failed = false;
>>>>>> +	bool msg_ready = false;
>>>>>> +
>>>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>>>> +
>>>>>> +	if (vvs->msg_count == 0) {
>>>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>>>> +		return 0;
>>>>>> +	}
>>>>>> +
>>>>>> +	while (!msg_ready) {
>>>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>>> +
>>>>>> +		if (!copy_failed) {
>>>>>> +			size_t pkt_len;
>>>>>> +			size_t bytes_to_copy;
>>>>>> +
>>>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>>>> +
>>>>>> +			if (bytes_to_copy) {
>>>>>> +				int err;
>>>>>> +
>>>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>>> +				 */
>>>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>>>> +
>>>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>>> +				if (err) {
>>>>>> +					/* Copy of message failed, set flag to skip
>>>>>> +					 * copy path for rest of fragments. Rest of
>>>>>> +					 * fragments will be freed without copy.
>>>>>> +					 */
>>>>>> +					copy_failed = true;
>>>>>> +					dequeued_len = err;
>>>>> If we fail to copy the message we will discard the entire packet.
>>>>> Is it acceptable for the user point of view, or we should leave the
>>>>> packet in the queue and the user can retry, maybe with a different
>>>>> buffer?
>>>>>
>>>>> Then we can remove the packets only when we successfully copied all the
>>>>> fragments.
>>>>>
>>>>> I'm not sure make sense, maybe better to check also other
>>>>> implementations :-)
>>>>>
>>>>> Thanks,
>>>>> Stefano
>>>> Understand, i'll check it on weekend, anyway I think it is
>>>> not critical for implementation.
>>> Yep, I agree.
>>>
>>>> I have another question: may be it is useful to research for
>>>> approach where packets are not queued until whole message
>>>> is received, but copied to user's buffer thus freeing memory.
>>>> (like previous implementation, of course with solution of problem
>>>> where part of message still in queue, while reader was woken
>>>> by timeout or signal).
>>>>
>>>> I think it is better, because  in current version, sender may set
>>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>>> to copy data there or drop it(if user's buffer is full). This way
>>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>>
>>>> What do You think?
>>> Yep, I see your point and it would be great, but I think the main issues
>>> to fix is how to handle a signal while we are waiting other fragments
>>> since the other peer can take unspecified time to send them.
>> What about transport callback, something like 'seqpacket_drain()' or
>>
>> 'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>>
>> to drop current message. In virtio case this will set special flag in transport,
>>
>> so on next dequeue, this flag is checked and if it is set - we drop all packets
>>
>> until EOR found. Then we can copy untouched new record.
>>
> But in this way, we will lose the entire message.
>
> Is it acceptable for seqpacket?
>
> Stefano
Hm, i'll check it. At least for unix domain sockets - it supports SEQPACKET
>
>
Arseny Krasnov June 21, 2021, 6:55 a.m. UTC | #9
On 18.06.2021 19:26, Arseny Krasnov wrote:
> On 18.06.2021 19:25, Stefano Garzarella wrote:
>> On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>>> On 18.06.2021 18:55, Stefano Garzarella wrote:
>>>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>>>> Hi Arseny,
>>>>>> the series looks great, I have just a question below about
>>>>>> seqpacket_dequeue.
>>>>>>
>>>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>>>> them:
>>>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>>>>
>>>>>>
>>>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>>>> full, we just send credit update and drop data.
>>>>>>>
>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>>>>> ---
>>>>>>> v10 -> v11:
>>>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>>>> 2) 'msg_ready' argument removed from callback.
>>>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>>>    no next attempts to copy data, rest of record will be freed.
>>>>>>>
>>>>>>> include/linux/virtio_vsock.h            |  5 ++
>>>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>>>> 2 files changed, 89 insertions(+)
>>>>>>>
>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>>>> --- a/include/linux/virtio_vsock.h
>>>>>>> +++ b/include/linux/virtio_vsock.h
>>>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>>>> 	u32 rx_bytes;
>>>>>>> 	u32 buf_alloc;
>>>>>>> 	struct list_head rx_queue;
>>>>>>> +	u32 msg_count;
>>>>>>> };
>>>>>>>
>>>>>>> struct virtio_vsock_pkt {
>>>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>>>> 			       struct msghdr *msg,
>>>>>>> 			       size_t len, int flags);
>>>>>>>
>>>>>>> +ssize_t
>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>>>> +				   struct msghdr *msg,
>>>>>>> +				   int flags);
>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>>>
>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>>>> 	return err;
>>>>>>> }
>>>>>>>
>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>>>> +						 struct msghdr *msg,
>>>>>>> +						 int flags)
>>>>>>> +{
>>>>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>>>>> +	struct virtio_vsock_pkt *pkt;
>>>>>>> +	int dequeued_len = 0;
>>>>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>>>>> +	bool copy_failed = false;
>>>>>>> +	bool msg_ready = false;
>>>>>>> +
>>>>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>>>>> +
>>>>>>> +	if (vvs->msg_count == 0) {
>>>>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>>>>> +		return 0;
>>>>>>> +	}
>>>>>>> +
>>>>>>> +	while (!msg_ready) {
>>>>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>>>> +
>>>>>>> +		if (!copy_failed) {
>>>>>>> +			size_t pkt_len;
>>>>>>> +			size_t bytes_to_copy;
>>>>>>> +
>>>>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>>>>> +
>>>>>>> +			if (bytes_to_copy) {
>>>>>>> +				int err;
>>>>>>> +
>>>>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>>>> +				 */
>>>>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>>>>> +
>>>>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>>>> +				if (err) {
>>>>>>> +					/* Copy of message failed, set flag to skip
>>>>>>> +					 * copy path for rest of fragments. Rest of
>>>>>>> +					 * fragments will be freed without copy.
>>>>>>> +					 */
>>>>>>> +					copy_failed = true;
>>>>>>> +					dequeued_len = err;
>>>>>> If we fail to copy the message we will discard the entire packet.
>>>>>> Is it acceptable for the user point of view, or we should leave the
>>>>>> packet in the queue and the user can retry, maybe with a different
>>>>>> buffer?
>>>>>>
>>>>>> Then we can remove the packets only when we successfully copied all the
>>>>>> fragments.
>>>>>>
>>>>>> I'm not sure make sense, maybe better to check also other
>>>>>> implementations :-)
>>>>>>
>>>>>> Thanks,
>>>>>> Stefano
>>>>> Understand, i'll check it on weekend, anyway I think it is
>>>>> not critical for implementation.
>>>> Yep, I agree.
>>>>
>>>>> I have another question: may be it is useful to research for
>>>>> approach where packets are not queued until whole message
>>>>> is received, but copied to user's buffer thus freeing memory.
>>>>> (like previous implementation, of course with solution of problem
>>>>> where part of message still in queue, while reader was woken
>>>>> by timeout or signal).
>>>>>
>>>>> I think it is better, because  in current version, sender may set
>>>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>>>> to copy data there or drop it(if user's buffer is full). This way
>>>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>>>
>>>>> What do You think?
>>>> Yep, I see your point and it would be great, but I think the main issues
>>>> to fix is how to handle a signal while we are waiting other fragments
>>>> since the other peer can take unspecified time to send them.
>>> What about transport callback, something like 'seqpacket_drain()' or
>>>
>>> 'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>>>
>>> to drop current message. In virtio case this will set special flag in transport,
>>>
>>> so on next dequeue, this flag is checked and if it is set - we drop all packets
>>>
>>> until EOR found. Then we can copy untouched new record.
>>>
>> But in this way, we will lose the entire message.
>>
>> Is it acceptable for seqpacket?
>>
>> Stefano
> Hm, i'll check it. At least for unix domain sockets - it supports SEQPACKET

Hello, i've checked AF_UNIX and AF_AX25 SEQPACKET implementations,

in both cases:

1) Datagram is dequeued first, then copied to user's buffer.

2) Datagram is also freed when copying to user's buffer fail

(it is not reinserted back).


But, in case of virtio vsock, i've got the following concern in

this approach: in cases of AF_UNIX or AF_AX25 there is maximum

datagram size, strictly limited by spec, so no 'setsockopt()' call allows

to exceed this. Also these limits are significantly smaller that current

amounts of RAM. But, in our case, there is no such limit: peer could

say 'i want to use 100MB datagram', and receiver just answer 'ok',

 as there is just variable assignment to setup new limit. Now, consider

that there will be 10 peers, 100MB each(no one limit such request,

because each socket doesn't know about each other). I think we get

out-of-service in this case - all kmalloc() memory will be wasted for

pending record.


I still think, that approach when we copy data from packet to user's

buffer without waiting EOR is better.


Also i'll rebase QEMU patch today or tomorrow.


What do You Think?

>>
Stefano Garzarella June 21, 2021, 10:23 a.m. UTC | #10
On Mon, Jun 21, 2021 at 09:55:13AM +0300, Arseny Krasnov wrote:
>
>On 18.06.2021 19:26, Arseny Krasnov wrote:
>> On 18.06.2021 19:25, Stefano Garzarella wrote:
>>> On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>>>> On 18.06.2021 18:55, Stefano Garzarella wrote:
>>>>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>>>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>>>>> Hi Arseny,
>>>>>>> the series looks great, I have just a question below about
>>>>>>> seqpacket_dequeue.
>>>>>>>
>>>>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>>>>> them:
>>>>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>>>>> full, we just send credit update and drop data.
>>>>>>>>
>>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>>>>>> ---
>>>>>>>> v10 -> v11:
>>>>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>>>>> 2) 'msg_ready' argument removed from callback.
>>>>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>>>>    no next attempts to copy data, rest of record will be freed.
>>>>>>>>
>>>>>>>> include/linux/virtio_vsock.h            |  5 ++
>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>>>>> 2 files changed, 89 insertions(+)
>>>>>>>>
>>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>>>>> --- a/include/linux/virtio_vsock.h
>>>>>>>> +++ b/include/linux/virtio_vsock.h
>>>>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>>>>> 	u32 rx_bytes;
>>>>>>>> 	u32 buf_alloc;
>>>>>>>> 	struct list_head rx_queue;
>>>>>>>> +	u32 msg_count;
>>>>>>>> };
>>>>>>>>
>>>>>>>> struct virtio_vsock_pkt {
>>>>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>>>>> 			       struct msghdr *msg,
>>>>>>>> 			       size_t len, int flags);
>>>>>>>>
>>>>>>>> +ssize_t
>>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>>>>> +				   struct msghdr *msg,
>>>>>>>> +				   int flags);
>>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>>>>
>>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>>>>> 	return err;
>>>>>>>> }
>>>>>>>>
>>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>>>>> +						 struct msghdr *msg,
>>>>>>>> +						 int flags)
>>>>>>>> +{
>>>>>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>>>>>> +	struct virtio_vsock_pkt *pkt;
>>>>>>>> +	int dequeued_len = 0;
>>>>>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>>>>>> +	bool copy_failed = false;
>>>>>>>> +	bool msg_ready = false;
>>>>>>>> +
>>>>>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>>>>>> +
>>>>>>>> +	if (vvs->msg_count == 0) {
>>>>>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>>>>>> +		return 0;
>>>>>>>> +	}
>>>>>>>> +
>>>>>>>> +	while (!msg_ready) {
>>>>>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>>>>> +
>>>>>>>> +		if (!copy_failed) {
>>>>>>>> +			size_t pkt_len;
>>>>>>>> +			size_t bytes_to_copy;
>>>>>>>> +
>>>>>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>>>>>> +
>>>>>>>> +			if (bytes_to_copy) {
>>>>>>>> +				int err;
>>>>>>>> +
>>>>>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>>>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>>>>> +				 */
>>>>>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>>>>>> +
>>>>>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>>>>> +				if (err) {
>>>>>>>> +					/* Copy of message failed, set flag to skip
>>>>>>>> +					 * copy path for rest of fragments. Rest of
>>>>>>>> +					 * fragments will be freed without copy.
>>>>>>>> +					 */
>>>>>>>> +					copy_failed = true;
>>>>>>>> +					dequeued_len = err;
>>>>>>> If we fail to copy the message we will discard the entire packet.
>>>>>>> Is it acceptable for the user point of view, or we should leave the
>>>>>>> packet in the queue and the user can retry, maybe with a different
>>>>>>> buffer?
>>>>>>>
>>>>>>> Then we can remove the packets only when we successfully copied all the
>>>>>>> fragments.
>>>>>>>
>>>>>>> I'm not sure make sense, maybe better to check also other
>>>>>>> implementations :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Stefano
>>>>>> Understand, i'll check it on weekend, anyway I think it is
>>>>>> not critical for implementation.
>>>>> Yep, I agree.
>>>>>
>>>>>> I have another question: may be it is useful to research for
>>>>>> approach where packets are not queued until whole message
>>>>>> is received, but copied to user's buffer thus freeing memory.
>>>>>> (like previous implementation, of course with solution of problem
>>>>>> where part of message still in queue, while reader was woken
>>>>>> by timeout or signal).
>>>>>>
>>>>>> I think it is better, because  in current version, sender may set
>>>>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>>>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>>>>> to copy data there or drop it(if user's buffer is full). This way
>>>>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>>>>
>>>>>> What do You think?
>>>>> Yep, I see your point and it would be great, but I think the main issues
>>>>> to fix is how to handle a signal while we are waiting other fragments
>>>>> since the other peer can take unspecified time to send them.
>>>> What about transport callback, something like 'seqpacket_drain()' or
>>>>
>>>> 'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>>>>
>>>> to drop current message. In virtio case this will set special flag in transport,
>>>>
>>>> so on next dequeue, this flag is checked and if it is set - we drop all packets
>>>>
>>>> until EOR found. Then we can copy untouched new record.
>>>>
>>> But in this way, we will lose the entire message.
>>>
>>> Is it acceptable for seqpacket?
>>>
>>> Stefano
>> Hm, i'll check it. At least for unix domain sockets - it supports SEQPACKET
>
>Hello, i've checked AF_UNIX and AF_AX25 SEQPACKET implementations,

Great! Thanks for checking!

>
>in both cases:
>
>1) Datagram is dequeued first, then copied to user's buffer.
>
>2) Datagram is also freed when copying to user's buffer fail
>
>(it is not reinserted back).
>
>
>But, in case of virtio vsock, i've got the following concern in

>this approach: in cases of AF_UNIX or AF_AX25 there is maximum
>
>datagram size, strictly limited by spec, so no 'setsockopt()' call allows
>
>to exceed this. Also these limits are significantly smaller that current
>
>amounts of RAM. But, in our case, there is no such limit: peer could
>
>say 'i want to use 100MB datagram', and receiver just answer 'ok',

The receiver sets the limit of its receive buffer and tells the 
transmitter that it should not exceed it. The default should be 256 KB, 
so IIUC this scenario can happen only if the receiver do a 
'setsockopt()' increasing the limit to 100MB. Right?

Maybe we should limit it.

>
> as there is just variable assignment to setup new limit. Now, consider
>
>that there will be 10 peers, 100MB each(no one limit such request,
>
>because each socket doesn't know about each other). I think we get
>
>out-of-service in this case - all kmalloc() memory will be wasted for
>
>pending record.
>
>
>I still think, that approach when we copy data from packet to user's
>
>buffer without waiting EOR is better.

Okay, in this way we can remove the receive buffer limit and maybe if we 
receive a signal, we can set MSG_TRUNC, return the partially received 
packet to the user, but we must free any next fragments.

So, as you proposed, we need a `seqpacket_drop()` to tell to the 
transport that if we were copying an uncompleted message, then it should 
delete the queued fragments and any others until the next EOR.

>
>
>Also i'll rebase QEMU patch today or tomorrow.

Great, please CC me, this is something high priority to test 
SOCK_SEQPACKET with a guest.

>
>
>What do You Think?

I'm fine with both, but I slightly prefer the approach we implemented 
because it's easier to handle.

Thanks,
Stefano
Arseny Krasnov June 21, 2021, 12:27 p.m. UTC | #11
On 21.06.2021 13:23, Stefano Garzarella wrote:
> On Mon, Jun 21, 2021 at 09:55:13AM +0300, Arseny Krasnov wrote:
>> On 18.06.2021 19:26, Arseny Krasnov wrote:
>>> On 18.06.2021 19:25, Stefano Garzarella wrote:
>>>> On Fri, Jun 18, 2021 at 07:08:30PM +0300, Arseny Krasnov wrote:
>>>>> On 18.06.2021 18:55, Stefano Garzarella wrote:
>>>>>> On Fri, Jun 18, 2021 at 06:04:37PM +0300, Arseny Krasnov wrote:
>>>>>>> On 18.06.2021 16:44, Stefano Garzarella wrote:
>>>>>>>> Hi Arseny,
>>>>>>>> the series looks great, I have just a question below about
>>>>>>>> seqpacket_dequeue.
>>>>>>>>
>>>>>>>> I also sent a couple a simple fixes, it would be great if you can review
>>>>>>>> them:
>>>>>>>> https://lore.kernel.org/netdev/20210618133526.300347-1-sgarzare@redhat.com/
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 11, 2021 at 02:12:38PM +0300, Arseny Krasnov wrote:
>>>>>>>>> Callback fetches RW packets from rx queue of socket until whole record
>>>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done
>>>>>>>>> to not stall sender, because if we wake up user and it leaves syscall,
>>>>>>>>> nobody will send credit update for rest of record, and sender will wait
>>>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is
>>>>>>>>> full, we just send credit update and drop data.
>>>>>>>>>
>>>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com>
>>>>>>>>> ---
>>>>>>>>> v10 -> v11:
>>>>>>>>> 1) 'msg_count' field added to count current number of EORs.
>>>>>>>>> 2) 'msg_ready' argument removed from callback.
>>>>>>>>> 3) If 'memcpy_to_msg()' failed during copy loop, there will be
>>>>>>>>>    no next attempts to copy data, rest of record will be freed.
>>>>>>>>>
>>>>>>>>> include/linux/virtio_vsock.h            |  5 ++
>>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 84 +++++++++++++++++++++++++
>>>>>>>>> 2 files changed, 89 insertions(+)
>>>>>>>>>
>>>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
>>>>>>>>> index dc636b727179..1d9a302cb91d 100644
>>>>>>>>> --- a/include/linux/virtio_vsock.h
>>>>>>>>> +++ b/include/linux/virtio_vsock.h
>>>>>>>>> @@ -36,6 +36,7 @@ struct virtio_vsock_sock {
>>>>>>>>> 	u32 rx_bytes;
>>>>>>>>> 	u32 buf_alloc;
>>>>>>>>> 	struct list_head rx_queue;
>>>>>>>>> +	u32 msg_count;
>>>>>>>>> };
>>>>>>>>>
>>>>>>>>> struct virtio_vsock_pkt {
>>>>>>>>> @@ -80,6 +81,10 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
>>>>>>>>> 			       struct msghdr *msg,
>>>>>>>>> 			       size_t len, int flags);
>>>>>>>>>
>>>>>>>>> +ssize_t
>>>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
>>>>>>>>> +				   struct msghdr *msg,
>>>>>>>>> +				   int flags);
>>>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
>>>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
>>>>>>>>>
>>>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
>>>>>>>>> index ad0d34d41444..1e1df19ec164 100644
>>>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c
>>>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c
>>>>>>>>> @@ -393,6 +393,78 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
>>>>>>>>> 	return err;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
>>>>>>>>> +						 struct msghdr *msg,
>>>>>>>>> +						 int flags)
>>>>>>>>> +{
>>>>>>>>> +	struct virtio_vsock_sock *vvs = vsk->trans;
>>>>>>>>> +	struct virtio_vsock_pkt *pkt;
>>>>>>>>> +	int dequeued_len = 0;
>>>>>>>>> +	size_t user_buf_len = msg_data_left(msg);
>>>>>>>>> +	bool copy_failed = false;
>>>>>>>>> +	bool msg_ready = false;
>>>>>>>>> +
>>>>>>>>> +	spin_lock_bh(&vvs->rx_lock);
>>>>>>>>> +
>>>>>>>>> +	if (vvs->msg_count == 0) {
>>>>>>>>> +		spin_unlock_bh(&vvs->rx_lock);
>>>>>>>>> +		return 0;
>>>>>>>>> +	}
>>>>>>>>> +
>>>>>>>>> +	while (!msg_ready) {
>>>>>>>>> +		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
>>>>>>>>> +
>>>>>>>>> +		if (!copy_failed) {
>>>>>>>>> +			size_t pkt_len;
>>>>>>>>> +			size_t bytes_to_copy;
>>>>>>>>> +
>>>>>>>>> +			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
>>>>>>>>> +			bytes_to_copy = min(user_buf_len, pkt_len);
>>>>>>>>> +
>>>>>>>>> +			if (bytes_to_copy) {
>>>>>>>>> +				int err;
>>>>>>>>> +
>>>>>>>>> +				/* sk_lock is held by caller so no one else can dequeue.
>>>>>>>>> +				 * Unlock rx_lock since memcpy_to_msg() may sleep.
>>>>>>>>> +				 */
>>>>>>>>> +				spin_unlock_bh(&vvs->rx_lock);
>>>>>>>>> +
>>>>>>>>> +				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
>>>>>>>>> +				if (err) {
>>>>>>>>> +					/* Copy of message failed, set flag to skip
>>>>>>>>> +					 * copy path for rest of fragments. Rest of
>>>>>>>>> +					 * fragments will be freed without copy.
>>>>>>>>> +					 */
>>>>>>>>> +					copy_failed = true;
>>>>>>>>> +					dequeued_len = err;
>>>>>>>> If we fail to copy the message we will discard the entire packet.
>>>>>>>> Is it acceptable for the user point of view, or we should leave the
>>>>>>>> packet in the queue and the user can retry, maybe with a different
>>>>>>>> buffer?
>>>>>>>>
>>>>>>>> Then we can remove the packets only when we successfully copied all the
>>>>>>>> fragments.
>>>>>>>>
>>>>>>>> I'm not sure make sense, maybe better to check also other
>>>>>>>> implementations :-)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Stefano
>>>>>>> Understand, i'll check it on weekend, anyway I think it is
>>>>>>> not critical for implementation.
>>>>>> Yep, I agree.
>>>>>>
>>>>>>> I have another question: may be it is useful to research for
>>>>>>> approach where packets are not queued until whole message
>>>>>>> is received, but copied to user's buffer thus freeing memory.
>>>>>>> (like previous implementation, of course with solution of problem
>>>>>>> where part of message still in queue, while reader was woken
>>>>>>> by timeout or signal).
>>>>>>>
>>>>>>> I think it is better, because  in current version, sender may set
>>>>>>> 'peer_alloc_buf' to  for example 1MB, so at receiver we get
>>>>>>> 1MB of 'kmalloc()' memory allocated, while having user's buffer
>>>>>>> to copy data there or drop it(if user's buffer is full). This way
>>>>>>> won't change spec(e.g. no message id or SEQ_BEGIN will be added).
>>>>>>>
>>>>>>> What do You think?
>>>>>> Yep, I see your point and it would be great, but I think the main issues
>>>>>> to fix is how to handle a signal while we are waiting other fragments
>>>>>> since the other peer can take unspecified time to send them.
>>>>> What about transport callback, something like 'seqpacket_drain()' or
>>>>>
>>>>> 'seqpacket_drop_curr()' - when we got signal or timeout, notify transport
>>>>>
>>>>> to drop current message. In virtio case this will set special flag in transport,
>>>>>
>>>>> so on next dequeue, this flag is checked and if it is set - we drop all packets
>>>>>
>>>>> until EOR found. Then we can copy untouched new record.
>>>>>
>>>> But in this way, we will lose the entire message.
>>>>
>>>> Is it acceptable for seqpacket?
>>>>
>>>> Stefano
>>> Hm, i'll check it. At least for unix domain sockets - it supports SEQPACKET
>> Hello, i've checked AF_UNIX and AF_AX25 SEQPACKET implementations,
> Great! Thanks for checking!
>
>> in both cases:
>>
>> 1) Datagram is dequeued first, then copied to user's buffer.
>>
>> 2) Datagram is also freed when copying to user's buffer fail
>>
>> (it is not reinserted back).
>>
>>
>> But, in case of virtio vsock, i've got the following concern in
>> this approach: in cases of AF_UNIX or AF_AX25 there is maximum
>>
>> datagram size, strictly limited by spec, so no 'setsockopt()' call allows
>>
>> to exceed this. Also these limits are significantly smaller that current
>>
>> amounts of RAM. But, in our case, there is no such limit: peer could
>>
>> say 'i want to use 100MB datagram', and receiver just answer 'ok',
> The receiver sets the limit of its receive buffer and tells the 
> transmitter that it should not exceed it. The default should be 256 KB, 
> so IIUC this scenario can happen only if the receiver do a 
> 'setsockopt()' increasing the limit to 100MB. Right?
>
> Maybe we should limit it.

Yes, sorry, i meant this. Two peers want's to transmit 100mb message.

Receiver calls 'setsockopt()' and got 100mb of kmalloc() memory.

May be, from point of view of these two peers its ok. But for whole system

- i'm not sure. And limit - it is interesting question, what value to use as limit?

>
>>  as there is just variable assignment to setup new limit. Now, consider
>>
>> that there will be 10 peers, 100MB each(no one limit such request,
>>
>> because each socket doesn't know about each other). I think we get
>>
>> out-of-service in this case - all kmalloc() memory will be wasted for
>>
>> pending record.
>>
>>
>> I still think, that approach when we copy data from packet to user's
>>
>> buffer without waiting EOR is better.
> Okay, in this way we can remove the receive buffer limit and maybe if we 
> receive a signal, we can set MSG_TRUNC, return the partially received 
> packet to the user, but we must free any next fragments.
>
> So, as you proposed, we need a `seqpacket_drop()` to tell to the 
> transport that if we were copying an uncompleted message, then it should 
> delete the queued fragments and any others until the next EOR.

Ok, i'll prepare RFC patch for this approach, i think it will be

significantly smaller than merged patchset.

>
>>
>> Also i'll rebase QEMU patch today or tomorrow.
> Great, please CC me, this is something high priority to test 
> SOCK_SEQPACKET with a guest.
Ack
>
>>
>> What do You Think?
> I'm fine with both, but I slightly prefer the approach we implemented 
> because it's easier to handle.
>
> Thanks,
> Stefano
>
>
diff mbox series

Patch

diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h
index dc636b727179..1d9a302cb91d 100644
--- a/include/linux/virtio_vsock.h
+++ b/include/linux/virtio_vsock.h
@@ -36,6 +36,7 @@  struct virtio_vsock_sock {
 	u32 rx_bytes;
 	u32 buf_alloc;
 	struct list_head rx_queue;
+	u32 msg_count;
 };
 
 struct virtio_vsock_pkt {
@@ -80,6 +81,10 @@  virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,
 			       size_t len, int flags);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags);
 s64 virtio_transport_stream_has_data(struct vsock_sock *vsk);
 s64 virtio_transport_stream_has_space(struct vsock_sock *vsk);
 
diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c
index ad0d34d41444..1e1df19ec164 100644
--- a/net/vmw_vsock/virtio_transport_common.c
+++ b/net/vmw_vsock/virtio_transport_common.c
@@ -393,6 +393,78 @@  virtio_transport_stream_do_dequeue(struct vsock_sock *vsk,
 	return err;
 }
 
+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk,
+						 struct msghdr *msg,
+						 int flags)
+{
+	struct virtio_vsock_sock *vvs = vsk->trans;
+	struct virtio_vsock_pkt *pkt;
+	int dequeued_len = 0;
+	size_t user_buf_len = msg_data_left(msg);
+	bool copy_failed = false;
+	bool msg_ready = false;
+
+	spin_lock_bh(&vvs->rx_lock);
+
+	if (vvs->msg_count == 0) {
+		spin_unlock_bh(&vvs->rx_lock);
+		return 0;
+	}
+
+	while (!msg_ready) {
+		pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list);
+
+		if (!copy_failed) {
+			size_t pkt_len;
+			size_t bytes_to_copy;
+
+			pkt_len = (size_t)le32_to_cpu(pkt->hdr.len);
+			bytes_to_copy = min(user_buf_len, pkt_len);
+
+			if (bytes_to_copy) {
+				int err;
+
+				/* sk_lock is held by caller so no one else can dequeue.
+				 * Unlock rx_lock since memcpy_to_msg() may sleep.
+				 */
+				spin_unlock_bh(&vvs->rx_lock);
+
+				err = memcpy_to_msg(msg, pkt->buf, bytes_to_copy);
+				if (err) {
+					/* Copy of message failed, set flag to skip
+					 * copy path for rest of fragments. Rest of
+					 * fragments will be freed without copy.
+					 */
+					copy_failed = true;
+					dequeued_len = err;
+				} else {
+					user_buf_len -= bytes_to_copy;
+				}
+
+				spin_lock_bh(&vvs->rx_lock);
+			}
+
+			if (dequeued_len >= 0)
+				dequeued_len += pkt_len;
+		}
+
+		if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) {
+			msg_ready = true;
+			vvs->msg_count--;
+		}
+
+		virtio_transport_dec_rx_pkt(vvs, pkt);
+		list_del(&pkt->list);
+		virtio_transport_free_pkt(pkt);
+	}
+
+	spin_unlock_bh(&vvs->rx_lock);
+
+	virtio_transport_send_credit_update(vsk);
+
+	return dequeued_len;
+}
+
 ssize_t
 virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 				struct msghdr *msg,
@@ -405,6 +477,18 @@  virtio_transport_stream_dequeue(struct vsock_sock *vsk,
 }
 EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue);
 
+ssize_t
+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk,
+				   struct msghdr *msg,
+				   int flags)
+{
+	if (flags & MSG_PEEK)
+		return -EOPNOTSUPP;
+
+	return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags);
+}
+EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue);
+
 int
 virtio_transport_dgram_dequeue(struct vsock_sock *vsk,
 			       struct msghdr *msg,