Message ID | 20210520191801.1272027-1-arseny.krasnov@kaspersky.com (mailing list archive) |
---|---|
State | Not Applicable |
Delegated to: | Netdev Maintainers |
Headers | show |
Series | virtio/vsock: introduce SOCK_SEQPACKET support | expand |
Context | Check | Description |
---|---|---|
netdev/cover_letter | success | Link |
netdev/fixes_present | success | Link |
netdev/patch_count | fail | Series longer than 15 patches |
netdev/tree_selection | success | Guessed tree name to be net-next |
netdev/subject_prefix | success | Link |
netdev/cc_maintainers | success | CCed 9 of 9 maintainers |
netdev/source_inline | success | Was 0 now: 0 |
netdev/verify_signedoff | success | Link |
netdev/module_param | success | Was 0 now: 0 |
netdev/build_32bit | success | Errors and warnings before: 0 this patch: 0 |
netdev/kdoc | success | Errors and warnings before: 0 this patch: 0 |
netdev/verify_fixes | success | Link |
netdev/checkpatch | warning | WARNING: line length of 81 exceeds 80 columns WARNING: line length of 86 exceeds 80 columns |
netdev/build_allmodconfig_warn | success | Errors and warnings before: 0 this patch: 0 |
netdev/header_inline | success | Link |
On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >Callback fetches RW packets from rx queue of socket until whole record >is copied(if user's buffer is full, user is not woken up). This is done >to not stall sender, because if we wake up user and it leaves syscall, >nobody will send credit update for rest of record, and sender will wait >for next enter of read syscall at receiver's side. So if user buffer is >full, we just send credit update and drop data. > >Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >--- > v9 -> v10: > 1) Number of dequeued bytes incremented even in case when > user's buffer is full. > 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. > 3) Rename variable 'err' to 'dequeued_len', in case of error > it has negative value. > > include/linux/virtio_vsock.h | 5 ++ > net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ > 2 files changed, 70 insertions(+) > >diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >index dc636b727179..02acf6e9ae04 100644 >--- a/include/linux/virtio_vsock.h >+++ b/include/linux/virtio_vsock.h >@@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, > struct msghdr *msg, > size_t len, int flags); > >+ssize_t >+virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >+ struct msghdr *msg, >+ int flags, >+ bool *msg_ready); > s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); > s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); > >diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >index ad0d34d41444..61349b2ea7fe 100644 >--- a/net/vmw_vsock/virtio_transport_common.c >+++ b/net/vmw_vsock/virtio_transport_common.c >@@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, > return err; > } > >+static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >+ struct msghdr *msg, >+ int flags, >+ bool *msg_ready) >+{ >+ struct virtio_vsock_sock *vvs = vsk->trans; >+ struct virtio_vsock_pkt *pkt; >+ int dequeued_len = 0; >+ size_t user_buf_len = msg_data_left(msg); >+ >+ *msg_ready = false; >+ spin_lock_bh(&vvs->rx_lock); >+ >+ while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { I' >+ size_t bytes_to_copy; >+ size_t pkt_len; >+ >+ pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >+ pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >+ bytes_to_copy = min(user_buf_len, pkt_len); >+ >+ if (bytes_to_copy) { >+ /* sk_lock is held by caller so no one else can dequeue. >+ * Unlock rx_lock since memcpy_to_msg() may sleep. >+ */ >+ spin_unlock_bh(&vvs->rx_lock); >+ >+ if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >+ dequeued_len = -EINVAL; I think here is better to return the error returned by memcpy_to_msg(), as we do in the other place where we use memcpy_to_msg(). I mean something like this: err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); if (err) dequeued_len = err; >+ else >+ user_buf_len -= bytes_to_copy; >+ >+ spin_lock_bh(&vvs->rx_lock); >+ } >+ Maybe here we can simply break the cycle if we have an error: if (dequeued_len < 0) break; Or we can refactor a bit, simplifying the while() condition and also the code in this way (not tested): while (!*msg_ready && !list_empty(&vvs->rx_queue)) { ... if (bytes_to_copy) { int err; /* ... */ spin_unlock_bh(&vvs->rx_lock); err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); if (err) { dequeued_len = err; goto out; } spin_lock_bh(&vvs->rx_lock); user_buf_len -= bytes_to_copy; } dequeued_len += pkt_len; if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) *msg_ready = true; virtio_transport_dec_rx_pkt(vvs, pkt); list_del(&pkt->list); virtio_transport_free_pkt(pkt); } out: spin_unlock_bh(&vvs->rx_lock); virtio_transport_send_credit_update(vsk); return dequeued_len; }
On 03.06.2021 17:45, Stefano Garzarella wrote: > On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >> Callback fetches RW packets from rx queue of socket until whole record >> is copied(if user's buffer is full, user is not woken up). This is done >> to not stall sender, because if we wake up user and it leaves syscall, >> nobody will send credit update for rest of record, and sender will wait >> for next enter of read syscall at receiver's side. So if user buffer is >> full, we just send credit update and drop data. >> >> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >> --- >> v9 -> v10: >> 1) Number of dequeued bytes incremented even in case when >> user's buffer is full. >> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >> 3) Rename variable 'err' to 'dequeued_len', in case of error >> it has negative value. >> >> include/linux/virtio_vsock.h | 5 ++ >> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >> 2 files changed, 70 insertions(+) >> >> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >> index dc636b727179..02acf6e9ae04 100644 >> --- a/include/linux/virtio_vsock.h >> +++ b/include/linux/virtio_vsock.h >> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >> struct msghdr *msg, >> size_t len, int flags); >> >> +ssize_t >> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >> + struct msghdr *msg, >> + int flags, >> + bool *msg_ready); >> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >> >> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >> index ad0d34d41444..61349b2ea7fe 100644 >> --- a/net/vmw_vsock/virtio_transport_common.c >> +++ b/net/vmw_vsock/virtio_transport_common.c >> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >> return err; >> } >> >> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >> + struct msghdr *msg, >> + int flags, >> + bool *msg_ready) >> +{ >> + struct virtio_vsock_sock *vvs = vsk->trans; >> + struct virtio_vsock_pkt *pkt; >> + int dequeued_len = 0; >> + size_t user_buf_len = msg_data_left(msg); >> + >> + *msg_ready = false; >> + spin_lock_bh(&vvs->rx_lock); >> + >> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { > I' > >> + size_t bytes_to_copy; >> + size_t pkt_len; >> + >> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >> + bytes_to_copy = min(user_buf_len, pkt_len); >> + >> + if (bytes_to_copy) { >> + /* sk_lock is held by caller so no one else can dequeue. >> + * Unlock rx_lock since memcpy_to_msg() may sleep. >> + */ >> + spin_unlock_bh(&vvs->rx_lock); >> + >> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >> + dequeued_len = -EINVAL; > I think here is better to return the error returned by memcpy_to_msg(), > as we do in the other place where we use memcpy_to_msg(). > > I mean something like this: > err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); > if (err) > dequeued_len = err; Ack >> + else >> + user_buf_len -= bytes_to_copy; >> + >> + spin_lock_bh(&vvs->rx_lock); >> + } >> + > Maybe here we can simply break the cycle if we have an error: > if (dequeued_len < 0) > break; > > Or we can refactor a bit, simplifying the while() condition and also the > code in this way (not tested): > > while (!*msg_ready && !list_empty(&vvs->rx_queue)) { > ... > > if (bytes_to_copy) { > int err; > > /* ... > */ > spin_unlock_bh(&vvs->rx_lock); > err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); > if (err) { > dequeued_len = err; > goto out; > } > spin_lock_bh(&vvs->rx_lock); > > user_buf_len -= bytes_to_copy; > } > > dequeued_len += pkt_len; > > if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) > *msg_ready = true; > > virtio_transport_dec_rx_pkt(vvs, pkt); > list_del(&pkt->list); > virtio_transport_free_pkt(pkt); > } > > out: > spin_unlock_bh(&vvs->rx_lock); > > virtio_transport_send_credit_update(vsk); > > return dequeued_len; > } I think we can't do 'goto out' or break, because in case of error, we still need to free packet. It is possible to do something like this: virtio_transport_dec_rx_pkt(vvs, pkt); list_del(&pkt->list); virtio_transport_free_pkt(pkt); if (dequeued_len < 0) break; > >
On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: > >On 03.06.2021 17:45, Stefano Garzarella wrote: >> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>> Callback fetches RW packets from rx queue of socket until whole record >>> is copied(if user's buffer is full, user is not woken up). This is done >>> to not stall sender, because if we wake up user and it leaves syscall, >>> nobody will send credit update for rest of record, and sender will wait >>> for next enter of read syscall at receiver's side. So if user buffer is >>> full, we just send credit update and drop data. >>> >>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>> --- >>> v9 -> v10: >>> 1) Number of dequeued bytes incremented even in case when >>> user's buffer is full. >>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>> it has negative value. >>> >>> include/linux/virtio_vsock.h | 5 ++ >>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>> 2 files changed, 70 insertions(+) >>> >>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>> index dc636b727179..02acf6e9ae04 100644 >>> --- a/include/linux/virtio_vsock.h >>> +++ b/include/linux/virtio_vsock.h >>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>> struct msghdr *msg, >>> size_t len, int flags); >>> >>> +ssize_t >>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>> + struct msghdr *msg, >>> + int flags, >>> + bool *msg_ready); >>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>> >>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>> index ad0d34d41444..61349b2ea7fe 100644 >>> --- a/net/vmw_vsock/virtio_transport_common.c >>> +++ b/net/vmw_vsock/virtio_transport_common.c >>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>> return err; >>> } >>> >>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>> + struct msghdr *msg, >>> + int flags, >>> + bool *msg_ready) >>> +{ >>> + struct virtio_vsock_sock *vvs = vsk->trans; >>> + struct virtio_vsock_pkt *pkt; >>> + int dequeued_len = 0; >>> + size_t user_buf_len = msg_data_left(msg); >>> + >>> + *msg_ready = false; >>> + spin_lock_bh(&vvs->rx_lock); >>> + >>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >> I' >> >>> + size_t bytes_to_copy; >>> + size_t pkt_len; >>> + >>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>> + bytes_to_copy = min(user_buf_len, pkt_len); >>> + >>> + if (bytes_to_copy) { >>> + /* sk_lock is held by caller so no one else can dequeue. >>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>> + */ >>> + spin_unlock_bh(&vvs->rx_lock); >>> + >>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>> + dequeued_len = -EINVAL; >> I think here is better to return the error returned by memcpy_to_msg(), >> as we do in the other place where we use memcpy_to_msg(). >> >> I mean something like this: >> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >> if (err) >> dequeued_len = err; >Ack >>> + else >>> + user_buf_len -= bytes_to_copy; >>> + >>> + spin_lock_bh(&vvs->rx_lock); >>> + } >>> + >> Maybe here we can simply break the cycle if we have an error: >> if (dequeued_len < 0) >> break; >> >> Or we can refactor a bit, simplifying the while() condition and also the >> code in this way (not tested): >> >> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >> ... >> >> if (bytes_to_copy) { >> int err; >> >> /* ... >> */ >> spin_unlock_bh(&vvs->rx_lock); >> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >> if (err) { >> dequeued_len = err; >> goto out; >> } >> spin_lock_bh(&vvs->rx_lock); >> >> user_buf_len -= bytes_to_copy; >> } >> >> dequeued_len += pkt_len; >> >> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >> *msg_ready = true; >> >> virtio_transport_dec_rx_pkt(vvs, pkt); >> list_del(&pkt->list); >> virtio_transport_free_pkt(pkt); >> } >> >> out: >> spin_unlock_bh(&vvs->rx_lock); >> >> virtio_transport_send_credit_update(vsk); >> >> return dequeued_len; >> } > >I think we can't do 'goto out' or break, because in case of error, we still need >to free packet. Didn't we have code that remove packets from a previous message? I don't see it anymore. For example if we have 10 packets queued for a message (the 10th packet has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with you proposal we are freeing only the first 2 packets, the rest is there and should be freed when reading the next message, but I don't see that code. The same can happen if the recvmsg syscall is interrupted. In that case we report that nothing was copied, but we freed the first N packets, so they are lost but the other packets are still in the queue. Please check also the patch where we implemented __vsock_seqpacket_recvmsg(). I thinks we should free packets only when we are sure we copied them to the user space. > It is possible to do something like this: > > virtio_transport_dec_rx_pkt(vvs, pkt); > list_del(&pkt->list); > virtio_transport_free_pkt(pkt); > > if (dequeued_len < 0) > break; > >> >> >
On 04.06.2021 18:03, Stefano Garzarella wrote: > On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >> On 03.06.2021 17:45, Stefano Garzarella wrote: >>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>> Callback fetches RW packets from rx queue of socket until whole record >>>> is copied(if user's buffer is full, user is not woken up). This is done >>>> to not stall sender, because if we wake up user and it leaves syscall, >>>> nobody will send credit update for rest of record, and sender will wait >>>> for next enter of read syscall at receiver's side. So if user buffer is >>>> full, we just send credit update and drop data. >>>> >>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>> --- >>>> v9 -> v10: >>>> 1) Number of dequeued bytes incremented even in case when >>>> user's buffer is full. >>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>> it has negative value. >>>> >>>> include/linux/virtio_vsock.h | 5 ++ >>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>> 2 files changed, 70 insertions(+) >>>> >>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>> index dc636b727179..02acf6e9ae04 100644 >>>> --- a/include/linux/virtio_vsock.h >>>> +++ b/include/linux/virtio_vsock.h >>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>> struct msghdr *msg, >>>> size_t len, int flags); >>>> >>>> +ssize_t >>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>> + struct msghdr *msg, >>>> + int flags, >>>> + bool *msg_ready); >>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>> >>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>> index ad0d34d41444..61349b2ea7fe 100644 >>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>> return err; >>>> } >>>> >>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>> + struct msghdr *msg, >>>> + int flags, >>>> + bool *msg_ready) >>>> +{ >>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>> + struct virtio_vsock_pkt *pkt; >>>> + int dequeued_len = 0; >>>> + size_t user_buf_len = msg_data_left(msg); >>>> + >>>> + *msg_ready = false; >>>> + spin_lock_bh(&vvs->rx_lock); >>>> + >>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>> I' >>> >>>> + size_t bytes_to_copy; >>>> + size_t pkt_len; >>>> + >>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>> + >>>> + if (bytes_to_copy) { >>>> + /* sk_lock is held by caller so no one else can dequeue. >>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>> + */ >>>> + spin_unlock_bh(&vvs->rx_lock); >>>> + >>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>> + dequeued_len = -EINVAL; >>> I think here is better to return the error returned by memcpy_to_msg(), >>> as we do in the other place where we use memcpy_to_msg(). >>> >>> I mean something like this: >>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>> if (err) >>> dequeued_len = err; >> Ack >>>> + else >>>> + user_buf_len -= bytes_to_copy; >>>> + >>>> + spin_lock_bh(&vvs->rx_lock); >>>> + } >>>> + >>> Maybe here we can simply break the cycle if we have an error: >>> if (dequeued_len < 0) >>> break; >>> >>> Or we can refactor a bit, simplifying the while() condition and also the >>> code in this way (not tested): >>> >>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>> ... >>> >>> if (bytes_to_copy) { >>> int err; >>> >>> /* ... >>> */ >>> spin_unlock_bh(&vvs->rx_lock); >>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>> if (err) { >>> dequeued_len = err; >>> goto out; >>> } >>> spin_lock_bh(&vvs->rx_lock); >>> >>> user_buf_len -= bytes_to_copy; >>> } >>> >>> dequeued_len += pkt_len; >>> >>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>> *msg_ready = true; >>> >>> virtio_transport_dec_rx_pkt(vvs, pkt); >>> list_del(&pkt->list); >>> virtio_transport_free_pkt(pkt); >>> } >>> >>> out: >>> spin_unlock_bh(&vvs->rx_lock); >>> >>> virtio_transport_send_credit_update(vsk); >>> >>> return dequeued_len; >>> } >> I think we can't do 'goto out' or break, because in case of error, we still need >> to free packet. > Didn't we have code that remove packets from a previous message? > I don't see it anymore. > > For example if we have 10 packets queued for a message (the 10th packet > has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with > you proposal we are freeing only the first 2 packets, the rest is there > and should be freed when reading the next message, but I don't see that > code. > > The same can happen if the recvmsg syscall is interrupted. In that case > we report that nothing was copied, but we freed the first N packets, so > they are lost but the other packets are still in the queue. > > Please check also the patch where we implemented > __vsock_seqpacket_recvmsg(). > > I thinks we should free packets only when we are sure we copied them to > the user space. Hm, yes, this is problem. To solve it i can restore previous approach with seqbegin/seqend. In that case i can detect unfinished record and drop it's packets. Seems seqbegin will be a bit like VIRTIO_VSOCK_SEQ_EOR in flags field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are unneeded, as channel considedered lossless. What do You think? Thank You > >> It is possible to do something like this: >> >> virtio_transport_dec_rx_pkt(vvs, pkt); >> list_del(&pkt->list); >> virtio_transport_free_pkt(pkt); >> >> if (dequeued_len < 0) >> break; >> >>> >
On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: > >On 04.06.2021 18:03, Stefano Garzarella wrote: >> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>> nobody will send credit update for rest of record, and sender will wait >>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>> full, we just send credit update and drop data. >>>>> >>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>> --- >>>>> v9 -> v10: >>>>> 1) Number of dequeued bytes incremented even in case when >>>>> user's buffer is full. >>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>> it has negative value. >>>>> >>>>> include/linux/virtio_vsock.h | 5 ++ >>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>> 2 files changed, 70 insertions(+) >>>>> >>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>> index dc636b727179..02acf6e9ae04 100644 >>>>> --- a/include/linux/virtio_vsock.h >>>>> +++ b/include/linux/virtio_vsock.h >>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>> struct msghdr *msg, >>>>> size_t len, int flags); >>>>> >>>>> +ssize_t >>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>> + struct msghdr *msg, >>>>> + int flags, >>>>> + bool *msg_ready); >>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>> >>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>> return err; >>>>> } >>>>> >>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>> + struct msghdr *msg, >>>>> + int flags, >>>>> + bool *msg_ready) >>>>> +{ >>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>> + struct virtio_vsock_pkt *pkt; >>>>> + int dequeued_len = 0; >>>>> + size_t user_buf_len = msg_data_left(msg); >>>>> + >>>>> + *msg_ready = false; >>>>> + spin_lock_bh(&vvs->rx_lock); >>>>> + >>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>> I' >>>> >>>>> + size_t bytes_to_copy; >>>>> + size_t pkt_len; >>>>> + >>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>> + >>>>> + if (bytes_to_copy) { >>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>> + */ >>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>> + >>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>> + dequeued_len = -EINVAL; >>>> I think here is better to return the error returned by memcpy_to_msg(), >>>> as we do in the other place where we use memcpy_to_msg(). >>>> >>>> I mean something like this: >>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>> if (err) >>>> dequeued_len = err; >>> Ack >>>>> + else >>>>> + user_buf_len -= bytes_to_copy; >>>>> + >>>>> + spin_lock_bh(&vvs->rx_lock); >>>>> + } >>>>> + >>>> Maybe here we can simply break the cycle if we have an error: >>>> if (dequeued_len < 0) >>>> break; >>>> >>>> Or we can refactor a bit, simplifying the while() condition and also the >>>> code in this way (not tested): >>>> >>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>> ... >>>> >>>> if (bytes_to_copy) { >>>> int err; >>>> >>>> /* ... >>>> */ >>>> spin_unlock_bh(&vvs->rx_lock); >>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>> if (err) { >>>> dequeued_len = err; >>>> goto out; >>>> } >>>> spin_lock_bh(&vvs->rx_lock); >>>> >>>> user_buf_len -= bytes_to_copy; >>>> } >>>> >>>> dequeued_len += pkt_len; >>>> >>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>> *msg_ready = true; >>>> >>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>> list_del(&pkt->list); >>>> virtio_transport_free_pkt(pkt); >>>> } >>>> >>>> out: >>>> spin_unlock_bh(&vvs->rx_lock); >>>> >>>> virtio_transport_send_credit_update(vsk); >>>> >>>> return dequeued_len; >>>> } >>> I think we can't do 'goto out' or break, because in case of error, >>> we still need >>> to free packet. >> Didn't we have code that remove packets from a previous message? >> I don't see it anymore. >> >> For example if we have 10 packets queued for a message (the 10th >> packet >> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >> you proposal we are freeing only the first 2 packets, the rest is there >> and should be freed when reading the next message, but I don't see that >> code. >> >> The same can happen if the recvmsg syscall is interrupted. In that case >> we report that nothing was copied, but we freed the first N packets, so >> they are lost but the other packets are still in the queue. >> >> Please check also the patch where we implemented >> __vsock_seqpacket_recvmsg(). >> >> I thinks we should free packets only when we are sure we copied them to >> the user space. > >Hm, yes, this is problem. To solve it i can restore previous approach >with seqbegin/seqend. In that case i can detect unfinished record and >drop it's packets. Seems seqbegin will be a bit like >VIRTIO_VSOCK_SEQ_EOR in flags >field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >unneeded, >as channel considedered lossless. What do You think? > I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be fine. When we receive EOR we know that this is the last packet on this message and the next packet will be the first of a new message. What we should do is check that we have all the fragments of a packet and return them all together, otherwise we have to say we have nothing. For example as we process packets from the vitqueue and queue them in the rx_queue we could use a counter of how many EORs are in the rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() when we copied all the fragments. If the counter is 0, we don't remove anything from the queue and virtio_transport_seqpacket_do_dequeue() returns 0. So .seqpacket_dequeue should return 0 if there is not at least one complete message, or return the entire message. A partial message should never return. What do you think? Maybe we should start using skbuffs for seqpackets as well, but that might take some time, so that might be okay for now. Thanks, Stefano
On 07.06.2021 14:04, Stefano Garzarella wrote: > On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >> On 04.06.2021 18:03, Stefano Garzarella wrote: >>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>> full, we just send credit update and drop data. >>>>>> >>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>> --- >>>>>> v9 -> v10: >>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>> user's buffer is full. >>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>> it has negative value. >>>>>> >>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>> 2 files changed, 70 insertions(+) >>>>>> >>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>> --- a/include/linux/virtio_vsock.h >>>>>> +++ b/include/linux/virtio_vsock.h >>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>> struct msghdr *msg, >>>>>> size_t len, int flags); >>>>>> >>>>>> +ssize_t >>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>> + struct msghdr *msg, >>>>>> + int flags, >>>>>> + bool *msg_ready); >>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>> >>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>> return err; >>>>>> } >>>>>> >>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>> + struct msghdr *msg, >>>>>> + int flags, >>>>>> + bool *msg_ready) >>>>>> +{ >>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>> + struct virtio_vsock_pkt *pkt; >>>>>> + int dequeued_len = 0; >>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>> + >>>>>> + *msg_ready = false; >>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>> + >>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>> I' >>>>> >>>>>> + size_t bytes_to_copy; >>>>>> + size_t pkt_len; >>>>>> + >>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>> + >>>>>> + if (bytes_to_copy) { >>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>> + */ >>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>> + >>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>> + dequeued_len = -EINVAL; >>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>> as we do in the other place where we use memcpy_to_msg(). >>>>> >>>>> I mean something like this: >>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>> if (err) >>>>> dequeued_len = err; >>>> Ack >>>>>> + else >>>>>> + user_buf_len -= bytes_to_copy; >>>>>> + >>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>> + } >>>>>> + >>>>> Maybe here we can simply break the cycle if we have an error: >>>>> if (dequeued_len < 0) >>>>> break; >>>>> >>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>> code in this way (not tested): >>>>> >>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>> ... >>>>> >>>>> if (bytes_to_copy) { >>>>> int err; >>>>> >>>>> /* ... >>>>> */ >>>>> spin_unlock_bh(&vvs->rx_lock); >>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>> if (err) { >>>>> dequeued_len = err; >>>>> goto out; >>>>> } >>>>> spin_lock_bh(&vvs->rx_lock); >>>>> >>>>> user_buf_len -= bytes_to_copy; >>>>> } >>>>> >>>>> dequeued_len += pkt_len; >>>>> >>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>> *msg_ready = true; >>>>> >>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>> list_del(&pkt->list); >>>>> virtio_transport_free_pkt(pkt); >>>>> } >>>>> >>>>> out: >>>>> spin_unlock_bh(&vvs->rx_lock); >>>>> >>>>> virtio_transport_send_credit_update(vsk); >>>>> >>>>> return dequeued_len; >>>>> } >>>> I think we can't do 'goto out' or break, because in case of error, >>>> we still need >>>> to free packet. >>> Didn't we have code that remove packets from a previous message? >>> I don't see it anymore. >>> >>> For example if we have 10 packets queued for a message (the 10th >>> packet >>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>> you proposal we are freeing only the first 2 packets, the rest is there >>> and should be freed when reading the next message, but I don't see that >>> code. >>> >>> The same can happen if the recvmsg syscall is interrupted. In that case >>> we report that nothing was copied, but we freed the first N packets, so >>> they are lost but the other packets are still in the queue. >>> >>> Please check also the patch where we implemented >>> __vsock_seqpacket_recvmsg(). >>> >>> I thinks we should free packets only when we are sure we copied them to >>> the user space. >> Hm, yes, this is problem. To solve it i can restore previous approach >> with seqbegin/seqend. In that case i can detect unfinished record and >> drop it's packets. Seems seqbegin will be a bit like >> VIRTIO_VSOCK_SEQ_EOR in flags >> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >> unneeded, >> as channel considedered lossless. What do You think? >> > I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be > fine. > > When we receive EOR we know that this is the last packet on this message > and the next packet will be the first of a new message. > > What we should do is check that we have all the fragments of a packet > and return them all together, otherwise we have to say we have nothing. > > For example as we process packets from the vitqueue and queue them in > the rx_queue we could use a counter of how many EORs are in the > rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() > when we copied all the fragments. > > If the counter is 0, we don't remove anything from the queue and > virtio_transport_seqpacket_do_dequeue() returns 0. > > So .seqpacket_dequeue should return 0 if there is not at least one > complete message, or return the entire message. A partial message should > never return. > > What do you think? I like it, i've implemented this approach in some early pre v1 versions. But in this case, credit update logic will be changed - in current implementation (both seqpacket and stream) credit update reply is sent when data is copied to user's buffer(e.g. we copy data somewhere, free packet and ready to process new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue until whole record is ready, when to send credit update? Thank You > > > Maybe we should start using skbuffs for seqpackets as well, but that > might take some time, so that might be okay for now. > > Thanks, > Stefano > >
On Mon, Jun 07, 2021 at 04:18:38PM +0300, Arseny Krasnov wrote: > >On 07.06.2021 14:04, Stefano Garzarella wrote: >> On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >>> On 04.06.2021 18:03, Stefano Garzarella wrote: >>>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>>> full, we just send credit update and drop data. >>>>>>> >>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>>> --- >>>>>>> v9 -> v10: >>>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>>> user's buffer is full. >>>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>>> it has negative value. >>>>>>> >>>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>>> 2 files changed, 70 insertions(+) >>>>>>> >>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>>> --- a/include/linux/virtio_vsock.h >>>>>>> +++ b/include/linux/virtio_vsock.h >>>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>>> struct msghdr *msg, >>>>>>> size_t len, int flags); >>>>>>> >>>>>>> +ssize_t >>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>>> + struct msghdr *msg, >>>>>>> + int flags, >>>>>>> + bool *msg_ready); >>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>>> >>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>>> return err; >>>>>>> } >>>>>>> >>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>>> + struct msghdr *msg, >>>>>>> + int flags, >>>>>>> + bool *msg_ready) >>>>>>> +{ >>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>>> + struct virtio_vsock_pkt *pkt; >>>>>>> + int dequeued_len = 0; >>>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>>> + >>>>>>> + *msg_ready = false; >>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>> + >>>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>>> I' >>>>>> >>>>>>> + size_t bytes_to_copy; >>>>>>> + size_t pkt_len; >>>>>>> + >>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>>> + >>>>>>> + if (bytes_to_copy) { >>>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>>> + */ >>>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>>> + >>>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>>> + dequeued_len = -EINVAL; >>>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>>> as we do in the other place where we use memcpy_to_msg(). >>>>>> >>>>>> I mean something like this: >>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>> if (err) >>>>>> dequeued_len = err; >>>>> Ack >>>>>>> + else >>>>>>> + user_buf_len -= bytes_to_copy; >>>>>>> + >>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>> + } >>>>>>> + >>>>>> Maybe here we can simply break the cycle if we have an error: >>>>>> if (dequeued_len < 0) >>>>>> break; >>>>>> >>>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>>> code in this way (not tested): >>>>>> >>>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>>> ... >>>>>> >>>>>> if (bytes_to_copy) { >>>>>> int err; >>>>>> >>>>>> /* ... >>>>>> */ >>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>> if (err) { >>>>>> dequeued_len = err; >>>>>> goto out; >>>>>> } >>>>>> spin_lock_bh(&vvs->rx_lock); >>>>>> >>>>>> user_buf_len -= bytes_to_copy; >>>>>> } >>>>>> >>>>>> dequeued_len += pkt_len; >>>>>> >>>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>>> *msg_ready = true; >>>>>> >>>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>>> list_del(&pkt->list); >>>>>> virtio_transport_free_pkt(pkt); >>>>>> } >>>>>> >>>>>> out: >>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>> >>>>>> virtio_transport_send_credit_update(vsk); >>>>>> >>>>>> return dequeued_len; >>>>>> } >>>>> I think we can't do 'goto out' or break, because in case of error, >>>>> we still need >>>>> to free packet. >>>> Didn't we have code that remove packets from a previous message? >>>> I don't see it anymore. >>>> >>>> For example if we have 10 packets queued for a message (the 10th >>>> packet >>>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>>> you proposal we are freeing only the first 2 packets, the rest is there >>>> and should be freed when reading the next message, but I don't see that >>>> code. >>>> >>>> The same can happen if the recvmsg syscall is interrupted. In that case >>>> we report that nothing was copied, but we freed the first N packets, so >>>> they are lost but the other packets are still in the queue. >>>> >>>> Please check also the patch where we implemented >>>> __vsock_seqpacket_recvmsg(). >>>> >>>> I thinks we should free packets only when we are sure we copied them to >>>> the user space. >>> Hm, yes, this is problem. To solve it i can restore previous approach >>> with seqbegin/seqend. In that case i can detect unfinished record and >>> drop it's packets. Seems seqbegin will be a bit like >>> VIRTIO_VSOCK_SEQ_EOR in flags >>> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >>> unneeded, >>> as channel considedered lossless. What do You think? >>> >> I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be >> fine. >> >> When we receive EOR we know that this is the last packet on this message >> and the next packet will be the first of a new message. >> >> What we should do is check that we have all the fragments of a packet >> and return them all together, otherwise we have to say we have nothing. >> >> For example as we process packets from the vitqueue and queue them in >> the rx_queue we could use a counter of how many EORs are in the >> rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() >> when we copied all the fragments. >> >> If the counter is 0, we don't remove anything from the queue and >> virtio_transport_seqpacket_do_dequeue() returns 0. >> >> So .seqpacket_dequeue should return 0 if there is not at least one >> complete message, or return the entire message. A partial message should >> never return. >> >> What do you think? > >I like it, i've implemented this approach in some early pre v1 versions. > >But in this case, credit update logic will be changed - in current implementation > >(both seqpacket and stream) credit update reply is sent when data is copied > >to user's buffer(e.g. we copy data somewhere, free packet and ready to process > >new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue > >until whole record is ready, when to send credit update? I think the best approach could be to send credit updates when we remove them from the rx_queue. Stefano
On 08.06.2021 11:23, Stefano Garzarella wrote: > On Mon, Jun 07, 2021 at 04:18:38PM +0300, Arseny Krasnov wrote: >> On 07.06.2021 14:04, Stefano Garzarella wrote: >>> On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >>>> On 04.06.2021 18:03, Stefano Garzarella wrote: >>>>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>>>> full, we just send credit update and drop data. >>>>>>>> >>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>>>> --- >>>>>>>> v9 -> v10: >>>>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>>>> user's buffer is full. >>>>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>>>> it has negative value. >>>>>>>> >>>>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>>>> 2 files changed, 70 insertions(+) >>>>>>>> >>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>>>> --- a/include/linux/virtio_vsock.h >>>>>>>> +++ b/include/linux/virtio_vsock.h >>>>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>>>> struct msghdr *msg, >>>>>>>> size_t len, int flags); >>>>>>>> >>>>>>>> +ssize_t >>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>>>> + struct msghdr *msg, >>>>>>>> + int flags, >>>>>>>> + bool *msg_ready); >>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>>>> >>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>>>> return err; >>>>>>>> } >>>>>>>> >>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>>>> + struct msghdr *msg, >>>>>>>> + int flags, >>>>>>>> + bool *msg_ready) >>>>>>>> +{ >>>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>>>> + struct virtio_vsock_pkt *pkt; >>>>>>>> + int dequeued_len = 0; >>>>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>>>> + >>>>>>>> + *msg_ready = false; >>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>> + >>>>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>>>> I' >>>>>>> >>>>>>>> + size_t bytes_to_copy; >>>>>>>> + size_t pkt_len; >>>>>>>> + >>>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>>>> + >>>>>>>> + if (bytes_to_copy) { >>>>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>>>> + */ >>>>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>>>> + >>>>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>>>> + dequeued_len = -EINVAL; >>>>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>>>> as we do in the other place where we use memcpy_to_msg(). >>>>>>> >>>>>>> I mean something like this: >>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>> if (err) >>>>>>> dequeued_len = err; >>>>>> Ack >>>>>>>> + else >>>>>>>> + user_buf_len -= bytes_to_copy; >>>>>>>> + >>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>> + } >>>>>>>> + >>>>>>> Maybe here we can simply break the cycle if we have an error: >>>>>>> if (dequeued_len < 0) >>>>>>> break; >>>>>>> >>>>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>>>> code in this way (not tested): >>>>>>> >>>>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>>>> ... >>>>>>> >>>>>>> if (bytes_to_copy) { >>>>>>> int err; >>>>>>> >>>>>>> /* ... >>>>>>> */ >>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>> if (err) { >>>>>>> dequeued_len = err; >>>>>>> goto out; >>>>>>> } >>>>>>> spin_lock_bh(&vvs->rx_lock); >>>>>>> >>>>>>> user_buf_len -= bytes_to_copy; >>>>>>> } >>>>>>> >>>>>>> dequeued_len += pkt_len; >>>>>>> >>>>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>>>> *msg_ready = true; >>>>>>> >>>>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>>>> list_del(&pkt->list); >>>>>>> virtio_transport_free_pkt(pkt); >>>>>>> } >>>>>>> >>>>>>> out: >>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>> >>>>>>> virtio_transport_send_credit_update(vsk); >>>>>>> >>>>>>> return dequeued_len; >>>>>>> } >>>>>> I think we can't do 'goto out' or break, because in case of error, >>>>>> we still need >>>>>> to free packet. >>>>> Didn't we have code that remove packets from a previous message? >>>>> I don't see it anymore. >>>>> >>>>> For example if we have 10 packets queued for a message (the 10th >>>>> packet >>>>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>>>> you proposal we are freeing only the first 2 packets, the rest is there >>>>> and should be freed when reading the next message, but I don't see that >>>>> code. >>>>> >>>>> The same can happen if the recvmsg syscall is interrupted. In that case >>>>> we report that nothing was copied, but we freed the first N packets, so >>>>> they are lost but the other packets are still in the queue. >>>>> >>>>> Please check also the patch where we implemented >>>>> __vsock_seqpacket_recvmsg(). >>>>> >>>>> I thinks we should free packets only when we are sure we copied them to >>>>> the user space. >>>> Hm, yes, this is problem. To solve it i can restore previous approach >>>> with seqbegin/seqend. In that case i can detect unfinished record and >>>> drop it's packets. Seems seqbegin will be a bit like >>>> VIRTIO_VSOCK_SEQ_EOR in flags >>>> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >>>> unneeded, >>>> as channel considedered lossless. What do You think? >>>> >>> I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be >>> fine. >>> >>> When we receive EOR we know that this is the last packet on this message >>> and the next packet will be the first of a new message. >>> >>> What we should do is check that we have all the fragments of a packet >>> and return them all together, otherwise we have to say we have nothing. >>> >>> For example as we process packets from the vitqueue and queue them in >>> the rx_queue we could use a counter of how many EORs are in the >>> rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() >>> when we copied all the fragments. >>> >>> If the counter is 0, we don't remove anything from the queue and >>> virtio_transport_seqpacket_do_dequeue() returns 0. >>> >>> So .seqpacket_dequeue should return 0 if there is not at least one >>> complete message, or return the entire message. A partial message should >>> never return. >>> >>> What do you think? >> I like it, i've implemented this approach in some early pre v1 versions. >> >> But in this case, credit update logic will be changed - in current implementation >> >> (both seqpacket and stream) credit update reply is sent when data is copied >> >> to user's buffer(e.g. we copy data somewhere, free packet and ready to process >> >> new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue >> >> until whole record is ready, when to send credit update? > I think the best approach could be to send credit updates when we remove > them from the rx_queue. In that case, it will be impossible to send message bigger than size of rx buffer (e.g. credit allowed size), because packet will be queued without credit update reply until credit allowed reach 0. Thank You > > Stefano > >
On Tue, Jun 08, 2021 at 12:40:39PM +0300, Arseny Krasnov wrote: > >On 08.06.2021 11:23, Stefano Garzarella wrote: >> On Mon, Jun 07, 2021 at 04:18:38PM +0300, Arseny Krasnov wrote: >>> On 07.06.2021 14:04, Stefano Garzarella wrote: >>>> On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >>>>> On 04.06.2021 18:03, Stefano Garzarella wrote: >>>>>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>>>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>>>>> full, we just send credit update and drop data. >>>>>>>>> >>>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>>>>> --- >>>>>>>>> v9 -> v10: >>>>>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>>>>> user's buffer is full. >>>>>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>>>>> it has negative value. >>>>>>>>> >>>>>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>>>>> 2 files changed, 70 insertions(+) >>>>>>>>> >>>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>>>>> --- a/include/linux/virtio_vsock.h >>>>>>>>> +++ b/include/linux/virtio_vsock.h >>>>>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>>>>> struct msghdr *msg, >>>>>>>>> size_t len, int flags); >>>>>>>>> >>>>>>>>> +ssize_t >>>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>>>>> + struct msghdr *msg, >>>>>>>>> + int flags, >>>>>>>>> + bool *msg_ready); >>>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>>>>> >>>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>>>>> return err; >>>>>>>>> } >>>>>>>>> >>>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>>>>> + struct msghdr *msg, >>>>>>>>> + int flags, >>>>>>>>> + bool *msg_ready) >>>>>>>>> +{ >>>>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>>>>> + struct virtio_vsock_pkt *pkt; >>>>>>>>> + int dequeued_len = 0; >>>>>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>>>>> + >>>>>>>>> + *msg_ready = false; >>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>> + >>>>>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>>>>> I' >>>>>>>> >>>>>>>>> + size_t bytes_to_copy; >>>>>>>>> + size_t pkt_len; >>>>>>>>> + >>>>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>>>>> + >>>>>>>>> + if (bytes_to_copy) { >>>>>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>>>>> + */ >>>>>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>>>>> + >>>>>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>>>>> + dequeued_len = -EINVAL; >>>>>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>>>>> as we do in the other place where we use memcpy_to_msg(). >>>>>>>> >>>>>>>> I mean something like this: >>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>> if (err) >>>>>>>> dequeued_len = err; >>>>>>> Ack >>>>>>>>> + else >>>>>>>>> + user_buf_len -= bytes_to_copy; >>>>>>>>> + >>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>> + } >>>>>>>>> + >>>>>>>> Maybe here we can simply break the cycle if we have an error: >>>>>>>> if (dequeued_len < 0) >>>>>>>> break; >>>>>>>> >>>>>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>>>>> code in this way (not tested): >>>>>>>> >>>>>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>>>>> ... >>>>>>>> >>>>>>>> if (bytes_to_copy) { >>>>>>>> int err; >>>>>>>> >>>>>>>> /* ... >>>>>>>> */ >>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>> if (err) { >>>>>>>> dequeued_len = err; >>>>>>>> goto out; >>>>>>>> } >>>>>>>> spin_lock_bh(&vvs->rx_lock); >>>>>>>> >>>>>>>> user_buf_len -= bytes_to_copy; >>>>>>>> } >>>>>>>> >>>>>>>> dequeued_len += pkt_len; >>>>>>>> >>>>>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>>>>> *msg_ready = true; >>>>>>>> >>>>>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>>>>> list_del(&pkt->list); >>>>>>>> virtio_transport_free_pkt(pkt); >>>>>>>> } >>>>>>>> >>>>>>>> out: >>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>> >>>>>>>> virtio_transport_send_credit_update(vsk); >>>>>>>> >>>>>>>> return dequeued_len; >>>>>>>> } >>>>>>> I think we can't do 'goto out' or break, because in case of error, >>>>>>> we still need >>>>>>> to free packet. >>>>>> Didn't we have code that remove packets from a previous message? >>>>>> I don't see it anymore. >>>>>> >>>>>> For example if we have 10 packets queued for a message (the 10th >>>>>> packet >>>>>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>>>>> you proposal we are freeing only the first 2 packets, the rest is there >>>>>> and should be freed when reading the next message, but I don't see that >>>>>> code. >>>>>> >>>>>> The same can happen if the recvmsg syscall is interrupted. In that case >>>>>> we report that nothing was copied, but we freed the first N packets, so >>>>>> they are lost but the other packets are still in the queue. >>>>>> >>>>>> Please check also the patch where we implemented >>>>>> __vsock_seqpacket_recvmsg(). >>>>>> >>>>>> I thinks we should free packets only when we are sure we copied them to >>>>>> the user space. >>>>> Hm, yes, this is problem. To solve it i can restore previous approach >>>>> with seqbegin/seqend. In that case i can detect unfinished record and >>>>> drop it's packets. Seems seqbegin will be a bit like >>>>> VIRTIO_VSOCK_SEQ_EOR in flags >>>>> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >>>>> unneeded, >>>>> as channel considedered lossless. What do You think? >>>>> >>>> I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be >>>> fine. >>>> >>>> When we receive EOR we know that this is the last packet on this message >>>> and the next packet will be the first of a new message. >>>> >>>> What we should do is check that we have all the fragments of a packet >>>> and return them all together, otherwise we have to say we have nothing. >>>> >>>> For example as we process packets from the vitqueue and queue them in >>>> the rx_queue we could use a counter of how many EORs are in the >>>> rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() >>>> when we copied all the fragments. >>>> >>>> If the counter is 0, we don't remove anything from the queue and >>>> virtio_transport_seqpacket_do_dequeue() returns 0. >>>> >>>> So .seqpacket_dequeue should return 0 if there is not at least one >>>> complete message, or return the entire message. A partial message should >>>> never return. >>>> >>>> What do you think? >>> I like it, i've implemented this approach in some early pre v1 versions. >>> >>> But in this case, credit update logic will be changed - in current implementation >>> >>> (both seqpacket and stream) credit update reply is sent when data is copied >>> >>> to user's buffer(e.g. we copy data somewhere, free packet and ready to process >>> >>> new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue >>> >>> until whole record is ready, when to send credit update? >> I think the best approach could be to send credit updates when we remove >> them from the rx_queue. > >In that case, it will be impossible to send message bigger than size of rx buffer > >(e.g. credit allowed size), because packet will be queued without credit update > >reply until credit allowed reach 0. > Yep, but I think it is a reasonable limit for a datagram socket. Maybe we can add a check on the TX side, since we know this value and return an error to the user. Thanks, Stefano
On 08.06.2021 13:19, Stefano Garzarella wrote: > On Tue, Jun 08, 2021 at 12:40:39PM +0300, Arseny Krasnov wrote: >> On 08.06.2021 11:23, Stefano Garzarella wrote: >>> On Mon, Jun 07, 2021 at 04:18:38PM +0300, Arseny Krasnov wrote: >>>> On 07.06.2021 14:04, Stefano Garzarella wrote: >>>>> On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >>>>>> On 04.06.2021 18:03, Stefano Garzarella wrote: >>>>>>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>>>>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>>>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>>>>>> full, we just send credit update and drop data. >>>>>>>>>> >>>>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>>>>>> --- >>>>>>>>>> v9 -> v10: >>>>>>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>>>>>> user's buffer is full. >>>>>>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>>>>>> it has negative value. >>>>>>>>>> >>>>>>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>>>>>> 2 files changed, 70 insertions(+) >>>>>>>>>> >>>>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>>>>>> --- a/include/linux/virtio_vsock.h >>>>>>>>>> +++ b/include/linux/virtio_vsock.h >>>>>>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>>>>>> struct msghdr *msg, >>>>>>>>>> size_t len, int flags); >>>>>>>>>> >>>>>>>>>> +ssize_t >>>>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>>>>>> + struct msghdr *msg, >>>>>>>>>> + int flags, >>>>>>>>>> + bool *msg_ready); >>>>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>>>>>> >>>>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>>>>>> return err; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>>>>>> + struct msghdr *msg, >>>>>>>>>> + int flags, >>>>>>>>>> + bool *msg_ready) >>>>>>>>>> +{ >>>>>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>>>>>> + struct virtio_vsock_pkt *pkt; >>>>>>>>>> + int dequeued_len = 0; >>>>>>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>>>>>> + >>>>>>>>>> + *msg_ready = false; >>>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>>> + >>>>>>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>>>>>> I' >>>>>>>>> >>>>>>>>>> + size_t bytes_to_copy; >>>>>>>>>> + size_t pkt_len; >>>>>>>>>> + >>>>>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>>>>>> + >>>>>>>>>> + if (bytes_to_copy) { >>>>>>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>>>>>> + */ >>>>>>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>>>>>> + >>>>>>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>>>>>> + dequeued_len = -EINVAL; >>>>>>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>>>>>> as we do in the other place where we use memcpy_to_msg(). >>>>>>>>> >>>>>>>>> I mean something like this: >>>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>>> if (err) >>>>>>>>> dequeued_len = err; >>>>>>>> Ack >>>>>>>>>> + else >>>>>>>>>> + user_buf_len -= bytes_to_copy; >>>>>>>>>> + >>>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>>> + } >>>>>>>>>> + >>>>>>>>> Maybe here we can simply break the cycle if we have an error: >>>>>>>>> if (dequeued_len < 0) >>>>>>>>> break; >>>>>>>>> >>>>>>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>>>>>> code in this way (not tested): >>>>>>>>> >>>>>>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>>>>>> ... >>>>>>>>> >>>>>>>>> if (bytes_to_copy) { >>>>>>>>> int err; >>>>>>>>> >>>>>>>>> /* ... >>>>>>>>> */ >>>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>>> if (err) { >>>>>>>>> dequeued_len = err; >>>>>>>>> goto out; >>>>>>>>> } >>>>>>>>> spin_lock_bh(&vvs->rx_lock); >>>>>>>>> >>>>>>>>> user_buf_len -= bytes_to_copy; >>>>>>>>> } >>>>>>>>> >>>>>>>>> dequeued_len += pkt_len; >>>>>>>>> >>>>>>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>>>>>> *msg_ready = true; >>>>>>>>> >>>>>>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>>>>>> list_del(&pkt->list); >>>>>>>>> virtio_transport_free_pkt(pkt); >>>>>>>>> } >>>>>>>>> >>>>>>>>> out: >>>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>>> >>>>>>>>> virtio_transport_send_credit_update(vsk); >>>>>>>>> >>>>>>>>> return dequeued_len; >>>>>>>>> } >>>>>>>> I think we can't do 'goto out' or break, because in case of error, >>>>>>>> we still need >>>>>>>> to free packet. >>>>>>> Didn't we have code that remove packets from a previous message? >>>>>>> I don't see it anymore. >>>>>>> >>>>>>> For example if we have 10 packets queued for a message (the 10th >>>>>>> packet >>>>>>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>>>>>> you proposal we are freeing only the first 2 packets, the rest is there >>>>>>> and should be freed when reading the next message, but I don't see that >>>>>>> code. >>>>>>> >>>>>>> The same can happen if the recvmsg syscall is interrupted. In that case >>>>>>> we report that nothing was copied, but we freed the first N packets, so >>>>>>> they are lost but the other packets are still in the queue. >>>>>>> >>>>>>> Please check also the patch where we implemented >>>>>>> __vsock_seqpacket_recvmsg(). >>>>>>> >>>>>>> I thinks we should free packets only when we are sure we copied them to >>>>>>> the user space. >>>>>> Hm, yes, this is problem. To solve it i can restore previous approach >>>>>> with seqbegin/seqend. In that case i can detect unfinished record and >>>>>> drop it's packets. Seems seqbegin will be a bit like >>>>>> VIRTIO_VSOCK_SEQ_EOR in flags >>>>>> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >>>>>> unneeded, >>>>>> as channel considedered lossless. What do You think? >>>>>> >>>>> I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be >>>>> fine. >>>>> >>>>> When we receive EOR we know that this is the last packet on this message >>>>> and the next packet will be the first of a new message. >>>>> >>>>> What we should do is check that we have all the fragments of a packet >>>>> and return them all together, otherwise we have to say we have nothing. >>>>> >>>>> For example as we process packets from the vitqueue and queue them in >>>>> the rx_queue we could use a counter of how many EORs are in the >>>>> rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() >>>>> when we copied all the fragments. >>>>> >>>>> If the counter is 0, we don't remove anything from the queue and >>>>> virtio_transport_seqpacket_do_dequeue() returns 0. >>>>> >>>>> So .seqpacket_dequeue should return 0 if there is not at least one >>>>> complete message, or return the entire message. A partial message should >>>>> never return. >>>>> >>>>> What do you think? >>>> I like it, i've implemented this approach in some early pre v1 versions. >>>> >>>> But in this case, credit update logic will be changed - in current implementation >>>> >>>> (both seqpacket and stream) credit update reply is sent when data is copied >>>> >>>> to user's buffer(e.g. we copy data somewhere, free packet and ready to process >>>> >>>> new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue >>>> >>>> until whole record is ready, when to send credit update? >>> I think the best approach could be to send credit updates when we remove >>> them from the rx_queue. >> In that case, it will be impossible to send message bigger than size of rx buffer >> >> (e.g. credit allowed size), because packet will be queued without credit update >> >> reply until credit allowed reach 0. >> > Yep, but I think it is a reasonable limit for a datagram socket. > > Maybe we can add a check on the TX side, since we know this value and > return an error to the user. E.g., to before sending message using SEQPACKET socket, i need to call setsockopt with SO_VM_SOCKETS_BUFFER_MAX_SIZE/ SO_VM_SOCKETS_BUFFER_SIZE params to setup maximum message size, if user tries to send message bigger than it, return -EMSGSIZE ? Thank You > > Thanks, > Stefano > >
On Tue, Jun 08, 2021 at 01:24:58PM +0300, Arseny Krasnov wrote: > >On 08.06.2021 13:19, Stefano Garzarella wrote: >> On Tue, Jun 08, 2021 at 12:40:39PM +0300, Arseny Krasnov wrote: >>> On 08.06.2021 11:23, Stefano Garzarella wrote: >>>> On Mon, Jun 07, 2021 at 04:18:38PM +0300, Arseny Krasnov wrote: >>>>> On 07.06.2021 14:04, Stefano Garzarella wrote: >>>>>> On Fri, Jun 04, 2021 at 09:03:26PM +0300, Arseny Krasnov wrote: >>>>>>> On 04.06.2021 18:03, Stefano Garzarella wrote: >>>>>>>> On Fri, Jun 04, 2021 at 04:12:23PM +0300, Arseny Krasnov wrote: >>>>>>>>> On 03.06.2021 17:45, Stefano Garzarella wrote: >>>>>>>>>> On Thu, May 20, 2021 at 10:17:58PM +0300, Arseny Krasnov wrote: >>>>>>>>>>> Callback fetches RW packets from rx queue of socket until whole record >>>>>>>>>>> is copied(if user's buffer is full, user is not woken up). This is done >>>>>>>>>>> to not stall sender, because if we wake up user and it leaves syscall, >>>>>>>>>>> nobody will send credit update for rest of record, and sender will wait >>>>>>>>>>> for next enter of read syscall at receiver's side. So if user buffer is >>>>>>>>>>> full, we just send credit update and drop data. >>>>>>>>>>> >>>>>>>>>>> Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> >>>>>>>>>>> --- >>>>>>>>>>> v9 -> v10: >>>>>>>>>>> 1) Number of dequeued bytes incremented even in case when >>>>>>>>>>> user's buffer is full. >>>>>>>>>>> 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. >>>>>>>>>>> 3) Rename variable 'err' to 'dequeued_len', in case of error >>>>>>>>>>> it has negative value. >>>>>>>>>>> >>>>>>>>>>> include/linux/virtio_vsock.h | 5 ++ >>>>>>>>>>> net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ >>>>>>>>>>> 2 files changed, 70 insertions(+) >>>>>>>>>>> >>>>>>>>>>> diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h >>>>>>>>>>> index dc636b727179..02acf6e9ae04 100644 >>>>>>>>>>> --- a/include/linux/virtio_vsock.h >>>>>>>>>>> +++ b/include/linux/virtio_vsock.h >>>>>>>>>>> @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, >>>>>>>>>>> struct msghdr *msg, >>>>>>>>>>> size_t len, int flags); >>>>>>>>>>> >>>>>>>>>>> +ssize_t >>>>>>>>>>> +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, >>>>>>>>>>> + struct msghdr *msg, >>>>>>>>>>> + int flags, >>>>>>>>>>> + bool *msg_ready); >>>>>>>>>>> s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); >>>>>>>>>>> s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); >>>>>>>>>>> >>>>>>>>>>> diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>>> index ad0d34d41444..61349b2ea7fe 100644 >>>>>>>>>>> --- a/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>>> +++ b/net/vmw_vsock/virtio_transport_common.c >>>>>>>>>>> @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, >>>>>>>>>>> return err; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, >>>>>>>>>>> + struct msghdr *msg, >>>>>>>>>>> + int flags, >>>>>>>>>>> + bool *msg_ready) >>>>>>>>>>> +{ >>>>>>>>>>> + struct virtio_vsock_sock *vvs = vsk->trans; >>>>>>>>>>> + struct virtio_vsock_pkt *pkt; >>>>>>>>>>> + int dequeued_len = 0; >>>>>>>>>>> + size_t user_buf_len = msg_data_left(msg); >>>>>>>>>>> + >>>>>>>>>>> + *msg_ready = false; >>>>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>>>> + >>>>>>>>>>> + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { >>>>>>>>>> I' >>>>>>>>>> >>>>>>>>>>> + size_t bytes_to_copy; >>>>>>>>>>> + size_t pkt_len; >>>>>>>>>>> + >>>>>>>>>>> + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); >>>>>>>>>>> + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); >>>>>>>>>>> + bytes_to_copy = min(user_buf_len, pkt_len); >>>>>>>>>>> + >>>>>>>>>>> + if (bytes_to_copy) { >>>>>>>>>>> + /* sk_lock is held by caller so no one else can dequeue. >>>>>>>>>>> + * Unlock rx_lock since memcpy_to_msg() may sleep. >>>>>>>>>>> + */ >>>>>>>>>>> + spin_unlock_bh(&vvs->rx_lock); >>>>>>>>>>> + >>>>>>>>>>> + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) >>>>>>>>>>> + dequeued_len = -EINVAL; >>>>>>>>>> I think here is better to return the error returned by memcpy_to_msg(), >>>>>>>>>> as we do in the other place where we use memcpy_to_msg(). >>>>>>>>>> >>>>>>>>>> I mean something like this: >>>>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>>>> if (err) >>>>>>>>>> dequeued_len = err; >>>>>>>>> Ack >>>>>>>>>>> + else >>>>>>>>>>> + user_buf_len -= bytes_to_copy; >>>>>>>>>>> + >>>>>>>>>>> + spin_lock_bh(&vvs->rx_lock); >>>>>>>>>>> + } >>>>>>>>>>> + >>>>>>>>>> Maybe here we can simply break the cycle if we have an error: >>>>>>>>>> if (dequeued_len < 0) >>>>>>>>>> break; >>>>>>>>>> >>>>>>>>>> Or we can refactor a bit, simplifying the while() condition and also the >>>>>>>>>> code in this way (not tested): >>>>>>>>>> >>>>>>>>>> while (!*msg_ready && !list_empty(&vvs->rx_queue)) { >>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> if (bytes_to_copy) { >>>>>>>>>> int err; >>>>>>>>>> >>>>>>>>>> /* ... >>>>>>>>>> */ >>>>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>>>> err = memcpy_to_msgmsg, pkt->buf, bytes_to_copy); >>>>>>>>>> if (err) { >>>>>>>>>> dequeued_len = err; >>>>>>>>>> goto out; >>>>>>>>>> } >>>>>>>>>> spin_lock_bh(&vvs->rx_lock); >>>>>>>>>> >>>>>>>>>> user_buf_len -= bytes_to_copy; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> dequeued_len += pkt_len; >>>>>>>>>> >>>>>>>>>> if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) >>>>>>>>>> *msg_ready = true; >>>>>>>>>> >>>>>>>>>> virtio_transport_dec_rx_pkt(vvs, pkt); >>>>>>>>>> list_del(&pkt->list); >>>>>>>>>> virtio_transport_free_pkt(pkt); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> out: >>>>>>>>>> spin_unlock_bh(&vvs->rx_lock); >>>>>>>>>> >>>>>>>>>> virtio_transport_send_credit_update(vsk); >>>>>>>>>> >>>>>>>>>> return dequeued_len; >>>>>>>>>> } >>>>>>>>> I think we can't do 'goto out' or break, because in case of error, >>>>>>>>> we still need >>>>>>>>> to free packet. >>>>>>>> Didn't we have code that remove packets from a previous message? >>>>>>>> I don't see it anymore. >>>>>>>> >>>>>>>> For example if we have 10 packets queued for a message (the 10th >>>>>>>> packet >>>>>>>> has the EOR flag) and the memcpy_to_msg() fails on the 2nd packet, with >>>>>>>> you proposal we are freeing only the first 2 packets, the rest is there >>>>>>>> and should be freed when reading the next message, but I don't see that >>>>>>>> code. >>>>>>>> >>>>>>>> The same can happen if the recvmsg syscall is interrupted. In that case >>>>>>>> we report that nothing was copied, but we freed the first N packets, so >>>>>>>> they are lost but the other packets are still in the queue. >>>>>>>> >>>>>>>> Please check also the patch where we implemented >>>>>>>> __vsock_seqpacket_recvmsg(). >>>>>>>> >>>>>>>> I thinks we should free packets only when we are sure we copied them to >>>>>>>> the user space. >>>>>>> Hm, yes, this is problem. To solve it i can restore previous approach >>>>>>> with seqbegin/seqend. In that case i can detect unfinished record and >>>>>>> drop it's packets. Seems seqbegin will be a bit like >>>>>>> VIRTIO_VSOCK_SEQ_EOR in flags >>>>>>> field of header(e.g. VIRTIO_VSOCK_SEQ_BEGIN). Message id and length are >>>>>>> unneeded, >>>>>>> as channel considedered lossless. What do You think? >>>>>>> >>>>>> I think VIRTIO_VSOCK_SEQ_BEGIN is redundant, using only EOR should be >>>>>> fine. >>>>>> >>>>>> When we receive EOR we know that this is the last packet on this message >>>>>> and the next packet will be the first of a new message. >>>>>> >>>>>> What we should do is check that we have all the fragments of a packet >>>>>> and return them all together, otherwise we have to say we have nothing. >>>>>> >>>>>> For example as we process packets from the vitqueue and queue them in >>>>>> the rx_queue we could use a counter of how many EORs are in the >>>>>> rx_queue, which we decrease in virtio_transport_seqpacket_do_dequeue() >>>>>> when we copied all the fragments. >>>>>> >>>>>> If the counter is 0, we don't remove anything from the queue and >>>>>> virtio_transport_seqpacket_do_dequeue() returns 0. >>>>>> >>>>>> So .seqpacket_dequeue should return 0 if there is not at least one >>>>>> complete message, or return the entire message. A partial message should >>>>>> never return. >>>>>> >>>>>> What do you think? >>>>> I like it, i've implemented this approach in some early pre v1 versions. >>>>> >>>>> But in this case, credit update logic will be changed - in current implementation >>>>> >>>>> (both seqpacket and stream) credit update reply is sent when data is copied >>>>> >>>>> to user's buffer(e.g. we copy data somewhere, free packet and ready to process >>>>> >>>>> new packet). But if we don't touch user's buffer and keeping incoming packet in rx queue >>>>> >>>>> until whole record is ready, when to send credit update? >>>> I think the best approach could be to send credit updates when we remove >>>> them from the rx_queue. >>> In that case, it will be impossible to send message bigger than size of rx buffer >>> >>> (e.g. credit allowed size), because packet will be queued without credit update >>> >>> reply until credit allowed reach 0. >>> >> Yep, but I think it is a reasonable limit for a datagram socket. >> >> Maybe we can add a check on the TX side, since we know this value and >> return an error to the user. > >E.g., to before sending message using SEQPACKET socket, > >i need to call setsockopt with SO_VM_SOCKETS_BUFFER_MAX_SIZE/ > >SO_VM_SOCKETS_BUFFER_SIZE params to setup maximum message size, > >if user tries to send message bigger than it, return -EMSGSIZE ? > Yep, I mean the receiver side must set it (IIRC default is 256K). In the transmitter side we can check it using `vvs->peer_buf_alloc` and return the error. Stefano
diff --git a/include/linux/virtio_vsock.h b/include/linux/virtio_vsock.h index dc636b727179..02acf6e9ae04 100644 --- a/include/linux/virtio_vsock.h +++ b/include/linux/virtio_vsock.h @@ -80,6 +80,11 @@ virtio_transport_dgram_dequeue(struct vsock_sock *vsk, struct msghdr *msg, size_t len, int flags); +ssize_t +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, + struct msghdr *msg, + int flags, + bool *msg_ready); s64 virtio_transport_stream_has_data(struct vsock_sock *vsk); s64 virtio_transport_stream_has_space(struct vsock_sock *vsk); diff --git a/net/vmw_vsock/virtio_transport_common.c b/net/vmw_vsock/virtio_transport_common.c index ad0d34d41444..61349b2ea7fe 100644 --- a/net/vmw_vsock/virtio_transport_common.c +++ b/net/vmw_vsock/virtio_transport_common.c @@ -393,6 +393,59 @@ virtio_transport_stream_do_dequeue(struct vsock_sock *vsk, return err; } +static int virtio_transport_seqpacket_do_dequeue(struct vsock_sock *vsk, + struct msghdr *msg, + int flags, + bool *msg_ready) +{ + struct virtio_vsock_sock *vvs = vsk->trans; + struct virtio_vsock_pkt *pkt; + int dequeued_len = 0; + size_t user_buf_len = msg_data_left(msg); + + *msg_ready = false; + spin_lock_bh(&vvs->rx_lock); + + while (!*msg_ready && !list_empty(&vvs->rx_queue) && dequeued_len >= 0) { + size_t bytes_to_copy; + size_t pkt_len; + + pkt = list_first_entry(&vvs->rx_queue, struct virtio_vsock_pkt, list); + pkt_len = (size_t)le32_to_cpu(pkt->hdr.len); + bytes_to_copy = min(user_buf_len, pkt_len); + + if (bytes_to_copy) { + /* sk_lock is held by caller so no one else can dequeue. + * Unlock rx_lock since memcpy_to_msg() may sleep. + */ + spin_unlock_bh(&vvs->rx_lock); + + if (memcpy_to_msg(msg, pkt->buf, bytes_to_copy)) + dequeued_len = -EINVAL; + else + user_buf_len -= bytes_to_copy; + + spin_lock_bh(&vvs->rx_lock); + } + + if (dequeued_len >= 0) + dequeued_len += pkt_len; + + if (le32_to_cpu(pkt->hdr.flags) & VIRTIO_VSOCK_SEQ_EOR) + *msg_ready = true; + + virtio_transport_dec_rx_pkt(vvs, pkt); + list_del(&pkt->list); + virtio_transport_free_pkt(pkt); + } + + spin_unlock_bh(&vvs->rx_lock); + + virtio_transport_send_credit_update(vsk); + + return dequeued_len; +} + ssize_t virtio_transport_stream_dequeue(struct vsock_sock *vsk, struct msghdr *msg, @@ -405,6 +458,18 @@ virtio_transport_stream_dequeue(struct vsock_sock *vsk, } EXPORT_SYMBOL_GPL(virtio_transport_stream_dequeue); +ssize_t +virtio_transport_seqpacket_dequeue(struct vsock_sock *vsk, + struct msghdr *msg, + int flags, bool *msg_ready) +{ + if (flags & MSG_PEEK) + return -EOPNOTSUPP; + + return virtio_transport_seqpacket_do_dequeue(vsk, msg, flags, msg_ready); +} +EXPORT_SYMBOL_GPL(virtio_transport_seqpacket_dequeue); + int virtio_transport_dgram_dequeue(struct vsock_sock *vsk, struct msghdr *msg,
Callback fetches RW packets from rx queue of socket until whole record is copied(if user's buffer is full, user is not woken up). This is done to not stall sender, because if we wake up user and it leaves syscall, nobody will send credit update for rest of record, and sender will wait for next enter of read syscall at receiver's side. So if user buffer is full, we just send credit update and drop data. Signed-off-by: Arseny Krasnov <arseny.krasnov@kaspersky.com> --- v9 -> v10: 1) Number of dequeued bytes incremented even in case when user's buffer is full. 2) Use 'msg_data_left()' instead of direct access to 'msg_hdr'. 3) Rename variable 'err' to 'dequeued_len', in case of error it has negative value. include/linux/virtio_vsock.h | 5 ++ net/vmw_vsock/virtio_transport_common.c | 65 +++++++++++++++++++++++++ 2 files changed, 70 insertions(+)