diff mbox series

[v3] vsock/virtio: Remove queued_replies pushback logic

Message ID 20250402150646.42855-1-graf@amazon.com (mailing list archive)
State New
Headers show
Series [v3] vsock/virtio: Remove queued_replies pushback logic | expand

Commit Message

Alexander Graf April 2, 2025, 3:06 p.m. UTC
Ever since the introduction of the virtio vsock driver, it included
pushback logic that blocks it from taking any new RX packets until the
TX queue backlog becomes shallower than the virtqueue size.

This logic works fine when you connect a user space application on the
hypervisor with a virtio-vsock target, because the guest will stop
receiving data until the host pulled all outstanding data from the VM.

With Nitro Enclaves however, we connect 2 VMs directly via vsock:

  Parent      Enclave

    RX -------- TX
    TX -------- RX

This means we now have 2 virtio-vsock backends that both have the pushback
logic. If the parent's TX queue runs full at the same time as the
Enclave's, both virtio-vsock drivers fall into the pushback path and
no longer accept RX traffic. However, that RX traffic is TX traffic on
the other side which blocks that driver from making any forward
progress. We're now in a deadlock.

To resolve this, let's remove that pushback logic altogether and rely on
higher levels (like credits) to ensure we do not consume unbounded
memory.

RX and TX queues share the same work queue. To prevent starvation of TX
by an RX flood and vice versa now that the pushback logic is gone, let's
deliberately reschedule RX and TX work after a fixed threshold (256) of
packets to process.

Fixes: 0ea9e1d3a9e3 ("VSOCK: Introduce virtio_transport.ko")
Signed-off-by: Alexander Graf <graf@amazon.com>

---

v1 -> v2:

  - Rework to use fixed threshold

v2 -> v3:

  - Remove superfluous reply variable
---
 net/vmw_vsock/virtio_transport.c | 73 +++++++++-----------------------
 1 file changed, 19 insertions(+), 54 deletions(-)

Comments

Michael S. Tsirkin April 2, 2025, 3:10 p.m. UTC | #1
On Wed, Apr 02, 2025 at 03:06:46PM +0000, Alexander Graf wrote:
> Ever since the introduction of the virtio vsock driver, it included
> pushback logic that blocks it from taking any new RX packets until the
> TX queue backlog becomes shallower than the virtqueue size.
> 
> This logic works fine when you connect a user space application on the
> hypervisor with a virtio-vsock target, because the guest will stop
> receiving data until the host pulled all outstanding data from the VM.
> 
> With Nitro Enclaves however, we connect 2 VMs directly via vsock:
> 
>   Parent      Enclave
> 
>     RX -------- TX
>     TX -------- RX
> 
> This means we now have 2 virtio-vsock backends that both have the pushback
> logic. If the parent's TX queue runs full at the same time as the
> Enclave's, both virtio-vsock drivers fall into the pushback path and
> no longer accept RX traffic. However, that RX traffic is TX traffic on
> the other side which blocks that driver from making any forward
> progress. We're now in a deadlock.
> 
> To resolve this, let's remove that pushback logic altogether and rely on
> higher levels (like credits) to ensure we do not consume unbounded
> memory.
> 
> RX and TX queues share the same work queue. To prevent starvation of TX
> by an RX flood and vice versa now that the pushback logic is gone, let's
> deliberately reschedule RX and TX work after a fixed threshold (256) of
> packets to process.
> 
> Fixes: 0ea9e1d3a9e3 ("VSOCK: Introduce virtio_transport.ko")
> Signed-off-by: Alexander Graf <graf@amazon.com>


Acked-by: Michael S. Tsirkin <mst@redhat.com>

> ---
> 
> v1 -> v2:
> 
>   - Rework to use fixed threshold
> 
> v2 -> v3:
> 
>   - Remove superfluous reply variable
> ---
>  net/vmw_vsock/virtio_transport.c | 73 +++++++++-----------------------
>  1 file changed, 19 insertions(+), 54 deletions(-)
> 
> diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
> index f0e48e6911fc..6ae30bf8c85c 100644
> --- a/net/vmw_vsock/virtio_transport.c
> +++ b/net/vmw_vsock/virtio_transport.c
> @@ -26,6 +26,12 @@ static struct virtio_vsock __rcu *the_virtio_vsock;
>  static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
>  static struct virtio_transport virtio_transport; /* forward declaration */
>  
> +/*
> + * Max number of RX packets transferred before requeueing so we do
> + * not starve TX traffic because they share the same work queue.
> + */
> +#define VSOCK_MAX_PKTS_PER_WORK 256
> +
>  struct virtio_vsock {
>  	struct virtio_device *vdev;
>  	struct virtqueue *vqs[VSOCK_VQ_MAX];
> @@ -44,8 +50,6 @@ struct virtio_vsock {
>  	struct work_struct send_pkt_work;
>  	struct sk_buff_head send_pkt_queue;
>  
> -	atomic_t queued_replies;
> -
>  	/* The following fields are protected by rx_lock.  vqs[VSOCK_VQ_RX]
>  	 * must be accessed with rx_lock held.
>  	 */
> @@ -158,7 +162,7 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  		container_of(work, struct virtio_vsock, send_pkt_work);
>  	struct virtqueue *vq;
>  	bool added = false;
> -	bool restart_rx = false;
> +	int pkts = 0;
>  
>  	mutex_lock(&vsock->tx_lock);
>  
> @@ -169,32 +173,24 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>  	for (;;) {
>  		struct sk_buff *skb;
> -		bool reply;
>  		int ret;
>  
> +		if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
> +			/* Allow other works on the same queue to run */
> +			queue_work(virtio_vsock_workqueue, work);
> +			break;
> +		}
> +
>  		skb = virtio_vsock_skb_dequeue(&vsock->send_pkt_queue);
>  		if (!skb)
>  			break;
>  
> -		reply = virtio_vsock_skb_reply(skb);
> -
>  		ret = virtio_transport_send_skb(skb, vq, vsock, GFP_KERNEL);
>  		if (ret < 0) {
>  			virtio_vsock_skb_queue_head(&vsock->send_pkt_queue, skb);
>  			break;
>  		}
>  
> -		if (reply) {
> -			struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
> -			int val;
> -
> -			val = atomic_dec_return(&vsock->queued_replies);
> -
> -			/* Do we now have resources to resume rx processing? */
> -			if (val + 1 == virtqueue_get_vring_size(rx_vq))
> -				restart_rx = true;
> -		}
> -
>  		added = true;
>  	}
>  
> @@ -203,9 +199,6 @@ virtio_transport_send_pkt_work(struct work_struct *work)
>  
>  out:
>  	mutex_unlock(&vsock->tx_lock);
> -
> -	if (restart_rx)
> -		queue_work(virtio_vsock_workqueue, &vsock->rx_work);
>  }
>  
>  /* Caller need to hold RCU for vsock.
> @@ -261,9 +254,6 @@ virtio_transport_send_pkt(struct sk_buff *skb)
>  	 */
>  	if (!skb_queue_empty_lockless(&vsock->send_pkt_queue) ||
>  	    virtio_transport_send_skb_fast_path(vsock, skb)) {
> -		if (virtio_vsock_skb_reply(skb))
> -			atomic_inc(&vsock->queued_replies);
> -
>  		virtio_vsock_skb_queue_tail(&vsock->send_pkt_queue, skb);
>  		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
>  	}
> @@ -277,7 +267,7 @@ static int
>  virtio_transport_cancel_pkt(struct vsock_sock *vsk)
>  {
>  	struct virtio_vsock *vsock;
> -	int cnt = 0, ret;
> +	int ret;
>  
>  	rcu_read_lock();
>  	vsock = rcu_dereference(the_virtio_vsock);
> @@ -286,17 +276,7 @@ virtio_transport_cancel_pkt(struct vsock_sock *vsk)
>  		goto out_rcu;
>  	}
>  
> -	cnt = virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
> -
> -	if (cnt) {
> -		struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
> -		int new_cnt;
> -
> -		new_cnt = atomic_sub_return(cnt, &vsock->queued_replies);
> -		if (new_cnt + cnt >= virtqueue_get_vring_size(rx_vq) &&
> -		    new_cnt < virtqueue_get_vring_size(rx_vq))
> -			queue_work(virtio_vsock_workqueue, &vsock->rx_work);
> -	}
> +	virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
>  
>  	ret = 0;
>  
> @@ -367,18 +347,6 @@ static void virtio_transport_tx_work(struct work_struct *work)
>  		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
>  }
>  
> -/* Is there space left for replies to rx packets? */
> -static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
> -{
> -	struct virtqueue *vq = vsock->vqs[VSOCK_VQ_RX];
> -	int val;
> -
> -	smp_rmb(); /* paired with atomic_inc() and atomic_dec_return() */
> -	val = atomic_read(&vsock->queued_replies);
> -
> -	return val < virtqueue_get_vring_size(vq);
> -}
> -
>  /* event_lock must be held */
>  static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock,
>  				       struct virtio_vsock_event *event)
> @@ -613,6 +581,7 @@ static void virtio_transport_rx_work(struct work_struct *work)
>  	struct virtio_vsock *vsock =
>  		container_of(work, struct virtio_vsock, rx_work);
>  	struct virtqueue *vq;
> +	int pkts = 0;
>  
>  	vq = vsock->vqs[VSOCK_VQ_RX];
>  
> @@ -627,11 +596,9 @@ static void virtio_transport_rx_work(struct work_struct *work)
>  			struct sk_buff *skb;
>  			unsigned int len;
>  
> -			if (!virtio_transport_more_replies(vsock)) {
> -				/* Stop rx until the device processes already
> -				 * pending replies.  Leave rx virtqueue
> -				 * callbacks disabled.
> -				 */
> +			if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
> +				/* Allow other works on the same queue to run */
> +				queue_work(virtio_vsock_workqueue, work);
>  				goto out;
>  			}
>  
> @@ -675,8 +642,6 @@ static int virtio_vsock_vqs_init(struct virtio_vsock *vsock)
>  	vsock->rx_buf_max_nr = 0;
>  	mutex_unlock(&vsock->rx_lock);
>  
> -	atomic_set(&vsock->queued_replies, 0);
> -
>  	ret = virtio_find_vqs(vdev, VSOCK_VQ_MAX, vsock->vqs, vqs_info, NULL);
>  	if (ret < 0)
>  		return ret;
> -- 
> 2.47.1
diff mbox series

Patch

diff --git a/net/vmw_vsock/virtio_transport.c b/net/vmw_vsock/virtio_transport.c
index f0e48e6911fc..6ae30bf8c85c 100644
--- a/net/vmw_vsock/virtio_transport.c
+++ b/net/vmw_vsock/virtio_transport.c
@@ -26,6 +26,12 @@  static struct virtio_vsock __rcu *the_virtio_vsock;
 static DEFINE_MUTEX(the_virtio_vsock_mutex); /* protects the_virtio_vsock */
 static struct virtio_transport virtio_transport; /* forward declaration */
 
+/*
+ * Max number of RX packets transferred before requeueing so we do
+ * not starve TX traffic because they share the same work queue.
+ */
+#define VSOCK_MAX_PKTS_PER_WORK 256
+
 struct virtio_vsock {
 	struct virtio_device *vdev;
 	struct virtqueue *vqs[VSOCK_VQ_MAX];
@@ -44,8 +50,6 @@  struct virtio_vsock {
 	struct work_struct send_pkt_work;
 	struct sk_buff_head send_pkt_queue;
 
-	atomic_t queued_replies;
-
 	/* The following fields are protected by rx_lock.  vqs[VSOCK_VQ_RX]
 	 * must be accessed with rx_lock held.
 	 */
@@ -158,7 +162,7 @@  virtio_transport_send_pkt_work(struct work_struct *work)
 		container_of(work, struct virtio_vsock, send_pkt_work);
 	struct virtqueue *vq;
 	bool added = false;
-	bool restart_rx = false;
+	int pkts = 0;
 
 	mutex_lock(&vsock->tx_lock);
 
@@ -169,32 +173,24 @@  virtio_transport_send_pkt_work(struct work_struct *work)
 
 	for (;;) {
 		struct sk_buff *skb;
-		bool reply;
 		int ret;
 
+		if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
+			/* Allow other works on the same queue to run */
+			queue_work(virtio_vsock_workqueue, work);
+			break;
+		}
+
 		skb = virtio_vsock_skb_dequeue(&vsock->send_pkt_queue);
 		if (!skb)
 			break;
 
-		reply = virtio_vsock_skb_reply(skb);
-
 		ret = virtio_transport_send_skb(skb, vq, vsock, GFP_KERNEL);
 		if (ret < 0) {
 			virtio_vsock_skb_queue_head(&vsock->send_pkt_queue, skb);
 			break;
 		}
 
-		if (reply) {
-			struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
-			int val;
-
-			val = atomic_dec_return(&vsock->queued_replies);
-
-			/* Do we now have resources to resume rx processing? */
-			if (val + 1 == virtqueue_get_vring_size(rx_vq))
-				restart_rx = true;
-		}
-
 		added = true;
 	}
 
@@ -203,9 +199,6 @@  virtio_transport_send_pkt_work(struct work_struct *work)
 
 out:
 	mutex_unlock(&vsock->tx_lock);
-
-	if (restart_rx)
-		queue_work(virtio_vsock_workqueue, &vsock->rx_work);
 }
 
 /* Caller need to hold RCU for vsock.
@@ -261,9 +254,6 @@  virtio_transport_send_pkt(struct sk_buff *skb)
 	 */
 	if (!skb_queue_empty_lockless(&vsock->send_pkt_queue) ||
 	    virtio_transport_send_skb_fast_path(vsock, skb)) {
-		if (virtio_vsock_skb_reply(skb))
-			atomic_inc(&vsock->queued_replies);
-
 		virtio_vsock_skb_queue_tail(&vsock->send_pkt_queue, skb);
 		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
 	}
@@ -277,7 +267,7 @@  static int
 virtio_transport_cancel_pkt(struct vsock_sock *vsk)
 {
 	struct virtio_vsock *vsock;
-	int cnt = 0, ret;
+	int ret;
 
 	rcu_read_lock();
 	vsock = rcu_dereference(the_virtio_vsock);
@@ -286,17 +276,7 @@  virtio_transport_cancel_pkt(struct vsock_sock *vsk)
 		goto out_rcu;
 	}
 
-	cnt = virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
-
-	if (cnt) {
-		struct virtqueue *rx_vq = vsock->vqs[VSOCK_VQ_RX];
-		int new_cnt;
-
-		new_cnt = atomic_sub_return(cnt, &vsock->queued_replies);
-		if (new_cnt + cnt >= virtqueue_get_vring_size(rx_vq) &&
-		    new_cnt < virtqueue_get_vring_size(rx_vq))
-			queue_work(virtio_vsock_workqueue, &vsock->rx_work);
-	}
+	virtio_transport_purge_skbs(vsk, &vsock->send_pkt_queue);
 
 	ret = 0;
 
@@ -367,18 +347,6 @@  static void virtio_transport_tx_work(struct work_struct *work)
 		queue_work(virtio_vsock_workqueue, &vsock->send_pkt_work);
 }
 
-/* Is there space left for replies to rx packets? */
-static bool virtio_transport_more_replies(struct virtio_vsock *vsock)
-{
-	struct virtqueue *vq = vsock->vqs[VSOCK_VQ_RX];
-	int val;
-
-	smp_rmb(); /* paired with atomic_inc() and atomic_dec_return() */
-	val = atomic_read(&vsock->queued_replies);
-
-	return val < virtqueue_get_vring_size(vq);
-}
-
 /* event_lock must be held */
 static int virtio_vsock_event_fill_one(struct virtio_vsock *vsock,
 				       struct virtio_vsock_event *event)
@@ -613,6 +581,7 @@  static void virtio_transport_rx_work(struct work_struct *work)
 	struct virtio_vsock *vsock =
 		container_of(work, struct virtio_vsock, rx_work);
 	struct virtqueue *vq;
+	int pkts = 0;
 
 	vq = vsock->vqs[VSOCK_VQ_RX];
 
@@ -627,11 +596,9 @@  static void virtio_transport_rx_work(struct work_struct *work)
 			struct sk_buff *skb;
 			unsigned int len;
 
-			if (!virtio_transport_more_replies(vsock)) {
-				/* Stop rx until the device processes already
-				 * pending replies.  Leave rx virtqueue
-				 * callbacks disabled.
-				 */
+			if (++pkts > VSOCK_MAX_PKTS_PER_WORK) {
+				/* Allow other works on the same queue to run */
+				queue_work(virtio_vsock_workqueue, work);
 				goto out;
 			}
 
@@ -675,8 +642,6 @@  static int virtio_vsock_vqs_init(struct virtio_vsock *vsock)
 	vsock->rx_buf_max_nr = 0;
 	mutex_unlock(&vsock->rx_lock);
 
-	atomic_set(&vsock->queued_replies, 0);
-
 	ret = virtio_find_vqs(vdev, VSOCK_VQ_MAX, vsock->vqs, vqs_info, NULL);
 	if (ret < 0)
 		return ret;