diff mbox

[net-next,7/8] vhost_net: try batch dequing from skb array

Message ID 1490069087-4783-8-git-send-email-jasowang@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Jason Wang March 21, 2017, 4:04 a.m. UTC
We used to dequeue one skb during recvmsg() from skb_array, this could
be inefficient because of the bad cache utilization and spinlock
touching for each packet. This patch tries to batch them by calling
batch dequeuing helpers explicitly on the exported skb array and pass
the skb back through msg_control for underlayer socket to finish the
userspace copying.

Tests were done by XDP1:
- small buffer:
  Before: 1.88Mpps
  After : 2.25Mpps (+19.6%)
- mergeable buffer:
  Before: 1.83Mpps
  After : 2.10Mpps (+14.7%)

Signed-off-by: Jason Wang <jasowang@redhat.com>
---
 drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 60 insertions(+), 4 deletions(-)

Comments

Michael S. Tsirkin March 22, 2017, 2:16 p.m. UTC | #1
On Tue, Mar 21, 2017 at 12:04:46PM +0800, Jason Wang wrote:
> We used to dequeue one skb during recvmsg() from skb_array, this could
> be inefficient because of the bad cache utilization and spinlock
> touching for each packet. This patch tries to batch them by calling
> batch dequeuing helpers explicitly on the exported skb array and pass
> the skb back through msg_control for underlayer socket to finish the
> userspace copying.
> 
> Tests were done by XDP1:
> - small buffer:
>   Before: 1.88Mpps
>   After : 2.25Mpps (+19.6%)
> - mergeable buffer:
>   Before: 1.83Mpps
>   After : 2.10Mpps (+14.7%)
> 
> Signed-off-by: Jason Wang <jasowang@redhat.com>
> ---
>  drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
>  1 file changed, 60 insertions(+), 4 deletions(-)
> 
> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
> index 9b51989..53f09f2 100644
> --- a/drivers/vhost/net.c
> +++ b/drivers/vhost/net.c
> @@ -28,6 +28,8 @@
>  #include <linux/if_macvlan.h>
>  #include <linux/if_tap.h>
>  #include <linux/if_vlan.h>
> +#include <linux/skb_array.h>
> +#include <linux/skbuff.h>
>  
>  #include <net/sock.h>
>  
> @@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
>  	struct vhost_virtqueue *vq;
>  };
>  
> +#define VHOST_RX_BATCH 64
>  struct vhost_net_virtqueue {
>  	struct vhost_virtqueue vq;
>  	size_t vhost_hlen;
> @@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
>  	/* Reference counting for outstanding ubufs.
>  	 * Protected by vq mutex. Writers must also take device mutex. */
>  	struct vhost_net_ubuf_ref *ubufs;
> +	struct skb_array *rx_array;
> +	void *rxq[VHOST_RX_BATCH];
> +	int rt;
> +	int rh;
>  };
>  
>  struct vhost_net {
> @@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
>  		n->vqs[i].ubufs = NULL;
>  		n->vqs[i].vhost_hlen = 0;
>  		n->vqs[i].sock_hlen = 0;
> +		n->vqs[i].rt = 0;
> +		n->vqs[i].rh = 0;
>  	}
>  
>  }
> @@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
>  	mutex_unlock(&vq->mutex);
>  }
>  
> -static int peek_head_len(struct sock *sk)
> +static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)

Pls rename to say what it actually does: fetch skbs

> +{
> +	if (rvq->rh != rvq->rt)
> +		goto out;
> +
> +	rvq->rh = rvq->rt = 0;
> +	rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> +						VHOST_RX_BATCH);

A comment explaining why is is -bh would be helpful.

> +	if (!rvq->rt)
> +		return 0;
> +out:
> +	return __skb_array_len_with_tag(rvq->rxq[rvq->rh]);
> +}
> +
> +static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
>  {
>  	struct socket *sock = sk->sk_socket;
>  	struct sk_buff *head;
>  	int len = 0;
>  	unsigned long flags;
>  
> +	if (rvq->rx_array)
> +		return peek_head_len_batched(rvq);
> +
>  	if (sock->ops->peek_len)
>  		return sock->ops->peek_len(sock);
>  
> @@ -535,12 +561,14 @@ static int sk_has_rx_data(struct sock *sk)
>  	return skb_queue_empty(&sk->sk_receive_queue);
>  }
>  
> -static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
> +static int vhost_net_rx_peek_head_len(struct vhost_net *net,
> +				      struct sock *sk)
>  {
> +	struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
>  	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
>  	struct vhost_virtqueue *vq = &nvq->vq;
>  	unsigned long uninitialized_var(endtime);
> -	int len = peek_head_len(sk);
> +	int len = peek_head_len(rvq, sk);
>  
>  	if (!len && vq->busyloop_timeout) {
>  		/* Both tx vq and rx socket were polled here */
> @@ -561,7 +589,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>  			vhost_poll_queue(&vq->poll);
>  		mutex_unlock(&vq->mutex);
>  
> -		len = peek_head_len(sk);
> +		len = peek_head_len(rvq, sk);
>  	}
>  
>  	return len;
> @@ -699,6 +727,8 @@ static void handle_rx(struct vhost_net *net)
>  		/* On error, stop handling until the next kick. */
>  		if (unlikely(headcount < 0))
>  			goto out;
> +		if (nvq->rx_array)
> +			msg.msg_control = nvq->rxq[nvq->rh++];
>  		/* On overrun, truncate and discard */
>  		if (unlikely(headcount > UIO_MAXIOV)) {
>  			iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
> @@ -841,6 +871,8 @@ static int vhost_net_open(struct inode *inode, struct file *f)
>  		n->vqs[i].done_idx = 0;
>  		n->vqs[i].vhost_hlen = 0;
>  		n->vqs[i].sock_hlen = 0;
> +		n->vqs[i].rt = 0;
> +		n->vqs[i].rh = 0;
>  	}
>  	vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
>  
> @@ -856,11 +888,15 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n,
>  					struct vhost_virtqueue *vq)
>  {
>  	struct socket *sock;
> +	struct vhost_net_virtqueue *nvq =
> +		container_of(vq, struct vhost_net_virtqueue, vq);
>  
>  	mutex_lock(&vq->mutex);
>  	sock = vq->private_data;
>  	vhost_net_disable_vq(n, vq);
>  	vq->private_data = NULL;
> +	while (nvq->rh != nvq->rt)
> +		kfree_skb(nvq->rxq[nvq->rh++]);
>  	mutex_unlock(&vq->mutex);
>  	return sock;
>  }
> @@ -953,6 +989,25 @@ static struct socket *get_raw_socket(int fd)
>  	return ERR_PTR(r);
>  }
>  
> +static struct skb_array *get_tap_skb_array(int fd)
> +{
> +	struct skb_array *array;
> +	struct file *file = fget(fd);
> +
> +	if (!file)
> +		return NULL;
> +	array = tun_get_skb_array(file);
> +	if (!IS_ERR(array))
> +		goto out;
> +	array = tap_get_skb_array(file);
> +	if (!IS_ERR(array))
> +		goto out;
> +	array = NULL;
> +out:
> +	fput(file);
> +	return array;
> +}
> +
>  static struct socket *get_tap_socket(int fd)
>  {
>  	struct file *file = fget(fd);
> @@ -1029,6 +1084,7 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
>  
>  		vhost_net_disable_vq(n, vq);
>  		vq->private_data = sock;
> +		nvq->rx_array = get_tap_skb_array(fd);
>  		r = vhost_vq_init_access(vq);
>  		if (r)
>  			goto err_used;
> -- 
> 2.7.4
Jason Wang March 23, 2017, 5:34 a.m. UTC | #2
On 2017年03月22日 22:16, Michael S. Tsirkin wrote:
> On Tue, Mar 21, 2017 at 12:04:46PM +0800, Jason Wang wrote:
>> We used to dequeue one skb during recvmsg() from skb_array, this could
>> be inefficient because of the bad cache utilization and spinlock
>> touching for each packet. This patch tries to batch them by calling
>> batch dequeuing helpers explicitly on the exported skb array and pass
>> the skb back through msg_control for underlayer socket to finish the
>> userspace copying.
>>
>> Tests were done by XDP1:
>> - small buffer:
>>    Before: 1.88Mpps
>>    After : 2.25Mpps (+19.6%)
>> - mergeable buffer:
>>    Before: 1.83Mpps
>>    After : 2.10Mpps (+14.7%)
>>
>> Signed-off-by: Jason Wang <jasowang@redhat.com>
>> ---
>>   drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
>>   1 file changed, 60 insertions(+), 4 deletions(-)
>>
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index 9b51989..53f09f2 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -28,6 +28,8 @@
>>   #include <linux/if_macvlan.h>
>>   #include <linux/if_tap.h>
>>   #include <linux/if_vlan.h>
>> +#include <linux/skb_array.h>
>> +#include <linux/skbuff.h>
>>   
>>   #include <net/sock.h>
>>   
>> @@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
>>   	struct vhost_virtqueue *vq;
>>   };
>>   
>> +#define VHOST_RX_BATCH 64
>>   struct vhost_net_virtqueue {
>>   	struct vhost_virtqueue vq;
>>   	size_t vhost_hlen;
>> @@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
>>   	/* Reference counting for outstanding ubufs.
>>   	 * Protected by vq mutex. Writers must also take device mutex. */
>>   	struct vhost_net_ubuf_ref *ubufs;
>> +	struct skb_array *rx_array;
>> +	void *rxq[VHOST_RX_BATCH];
>> +	int rt;
>> +	int rh;
>>   };
>>   
>>   struct vhost_net {
>> @@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
>>   		n->vqs[i].ubufs = NULL;
>>   		n->vqs[i].vhost_hlen = 0;
>>   		n->vqs[i].sock_hlen = 0;
>> +		n->vqs[i].rt = 0;
>> +		n->vqs[i].rh = 0;
>>   	}
>>   
>>   }
>> @@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
>>   	mutex_unlock(&vq->mutex);
>>   }
>>   
>> -static int peek_head_len(struct sock *sk)
>> +static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)
> Pls rename to say what it actually does: fetch skbs

Ok.

>
>> +{
>> +	if (rvq->rh != rvq->rt)
>> +		goto out;
>> +
>> +	rvq->rh = rvq->rt = 0;
>> +	rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>> +						VHOST_RX_BATCH);
> A comment explaining why is is -bh would be helpful.

Ok.

Thanks
Jason Wang March 29, 2017, 9:58 a.m. UTC | #3
On 2017年03月23日 13:34, Jason Wang wrote:
>
>
>>
>>> +{
>>> +    if (rvq->rh != rvq->rt)
>>> +        goto out;
>>> +
>>> +    rvq->rh = rvq->rt = 0;
>>> +    rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>>> +                        VHOST_RX_BATCH);
>> A comment explaining why is is -bh would be helpful.
>
> Ok.
>
> Thanks 

Rethink about this. It looks like -bh is not needed in this case since 
no consumer run in bh.

Thanks
Pankaj Gupta March 29, 2017, 10:46 a.m. UTC | #4
Hi Jason,

> 
> On 2017年03月23日 13:34, Jason Wang wrote:
> >
> >
> >>
> >>> +{
> >>> +    if (rvq->rh != rvq->rt)
> >>> +        goto out;
> >>> +
> >>> +    rvq->rh = rvq->rt = 0;
> >>> +    rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> >>> +                        VHOST_RX_BATCH);
> >> A comment explaining why is is -bh would be helpful.
> >
> > Ok.
> >
> > Thanks
> 
> Rethink about this. It looks like -bh is not needed in this case since
> no consumer run in bh.

In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
Are we planning to use them in future? 

> 
> Thanks
>
Jason Wang March 29, 2017, 10:53 a.m. UTC | #5
On 2017年03月29日 18:46, Pankaj Gupta wrote:
> Hi Jason,
>
>> On 2017年03月23日 13:34, Jason Wang wrote:
>>>
>>>>> +{
>>>>> +    if (rvq->rh != rvq->rt)
>>>>> +        goto out;
>>>>> +
>>>>> +    rvq->rh = rvq->rt = 0;
>>>>> +    rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
>>>>> +                        VHOST_RX_BATCH);
>>>> A comment explaining why is is -bh would be helpful.
>>> Ok.
>>>
>>> Thanks
>> Rethink about this. It looks like -bh is not needed in this case since
>> no consumer run in bh.
> In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
> Are we planning to use them in future?

I think we'd better keep them, since it serves as helpers. You can see 
that not all the helpers in ptr_ring has real users, but they were 
prepared for the future use.

Thanks

>
>> Thanks
>>
Michael S. Tsirkin March 29, 2017, 9:47 p.m. UTC | #6
On Wed, Mar 29, 2017 at 06:53:27PM +0800, Jason Wang wrote:
> 
> 
> On 2017年03月29日 18:46, Pankaj Gupta wrote:
> > Hi Jason,
> > 
> > > On 2017年03月23日 13:34, Jason Wang wrote:
> > > > 
> > > > > > +{
> > > > > > +    if (rvq->rh != rvq->rt)
> > > > > > +        goto out;
> > > > > > +
> > > > > > +    rvq->rh = rvq->rt = 0;
> > > > > > +    rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
> > > > > > +                        VHOST_RX_BATCH);
> > > > > A comment explaining why is is -bh would be helpful.
> > > > Ok.
> > > > 
> > > > Thanks
> > > Rethink about this. It looks like -bh is not needed in this case since
> > > no consumer run in bh.
> > In that case do we need other variants of "ptr_ring_consume_batched_*()" functions.
> > Are we planning to use them in future?
> 
> I think we'd better keep them, since it serves as helpers. You can see that
> not all the helpers in ptr_ring has real users, but they were prepared for
> the future use.
> 
> Thanks

Makes sense for basic building blocks but I'm not sure we
need to do it for all APIs.


> > 
> > > Thanks
> > >
diff mbox

Patch

diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 9b51989..53f09f2 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -28,6 +28,8 @@ 
 #include <linux/if_macvlan.h>
 #include <linux/if_tap.h>
 #include <linux/if_vlan.h>
+#include <linux/skb_array.h>
+#include <linux/skbuff.h>
 
 #include <net/sock.h>
 
@@ -85,6 +87,7 @@  struct vhost_net_ubuf_ref {
 	struct vhost_virtqueue *vq;
 };
 
+#define VHOST_RX_BATCH 64
 struct vhost_net_virtqueue {
 	struct vhost_virtqueue vq;
 	size_t vhost_hlen;
@@ -99,6 +102,10 @@  struct vhost_net_virtqueue {
 	/* Reference counting for outstanding ubufs.
 	 * Protected by vq mutex. Writers must also take device mutex. */
 	struct vhost_net_ubuf_ref *ubufs;
+	struct skb_array *rx_array;
+	void *rxq[VHOST_RX_BATCH];
+	int rt;
+	int rh;
 };
 
 struct vhost_net {
@@ -201,6 +208,8 @@  static void vhost_net_vq_reset(struct vhost_net *n)
 		n->vqs[i].ubufs = NULL;
 		n->vqs[i].vhost_hlen = 0;
 		n->vqs[i].sock_hlen = 0;
+		n->vqs[i].rt = 0;
+		n->vqs[i].rh = 0;
 	}
 
 }
@@ -503,13 +512,30 @@  static void handle_tx(struct vhost_net *net)
 	mutex_unlock(&vq->mutex);
 }
 
-static int peek_head_len(struct sock *sk)
+static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)
+{
+	if (rvq->rh != rvq->rt)
+		goto out;
+
+	rvq->rh = rvq->rt = 0;
+	rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
+						VHOST_RX_BATCH);
+	if (!rvq->rt)
+		return 0;
+out:
+	return __skb_array_len_with_tag(rvq->rxq[rvq->rh]);
+}
+
+static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
 {
 	struct socket *sock = sk->sk_socket;
 	struct sk_buff *head;
 	int len = 0;
 	unsigned long flags;
 
+	if (rvq->rx_array)
+		return peek_head_len_batched(rvq);
+
 	if (sock->ops->peek_len)
 		return sock->ops->peek_len(sock);
 
@@ -535,12 +561,14 @@  static int sk_has_rx_data(struct sock *sk)
 	return skb_queue_empty(&sk->sk_receive_queue);
 }
 
-static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
+static int vhost_net_rx_peek_head_len(struct vhost_net *net,
+				      struct sock *sk)
 {
+	struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
 	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
 	struct vhost_virtqueue *vq = &nvq->vq;
 	unsigned long uninitialized_var(endtime);
-	int len = peek_head_len(sk);
+	int len = peek_head_len(rvq, sk);
 
 	if (!len && vq->busyloop_timeout) {
 		/* Both tx vq and rx socket were polled here */
@@ -561,7 +589,7 @@  static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
 			vhost_poll_queue(&vq->poll);
 		mutex_unlock(&vq->mutex);
 
-		len = peek_head_len(sk);
+		len = peek_head_len(rvq, sk);
 	}
 
 	return len;
@@ -699,6 +727,8 @@  static void handle_rx(struct vhost_net *net)
 		/* On error, stop handling until the next kick. */
 		if (unlikely(headcount < 0))
 			goto out;
+		if (nvq->rx_array)
+			msg.msg_control = nvq->rxq[nvq->rh++];
 		/* On overrun, truncate and discard */
 		if (unlikely(headcount > UIO_MAXIOV)) {
 			iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
@@ -841,6 +871,8 @@  static int vhost_net_open(struct inode *inode, struct file *f)
 		n->vqs[i].done_idx = 0;
 		n->vqs[i].vhost_hlen = 0;
 		n->vqs[i].sock_hlen = 0;
+		n->vqs[i].rt = 0;
+		n->vqs[i].rh = 0;
 	}
 	vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
 
@@ -856,11 +888,15 @@  static struct socket *vhost_net_stop_vq(struct vhost_net *n,
 					struct vhost_virtqueue *vq)
 {
 	struct socket *sock;
+	struct vhost_net_virtqueue *nvq =
+		container_of(vq, struct vhost_net_virtqueue, vq);
 
 	mutex_lock(&vq->mutex);
 	sock = vq->private_data;
 	vhost_net_disable_vq(n, vq);
 	vq->private_data = NULL;
+	while (nvq->rh != nvq->rt)
+		kfree_skb(nvq->rxq[nvq->rh++]);
 	mutex_unlock(&vq->mutex);
 	return sock;
 }
@@ -953,6 +989,25 @@  static struct socket *get_raw_socket(int fd)
 	return ERR_PTR(r);
 }
 
+static struct skb_array *get_tap_skb_array(int fd)
+{
+	struct skb_array *array;
+	struct file *file = fget(fd);
+
+	if (!file)
+		return NULL;
+	array = tun_get_skb_array(file);
+	if (!IS_ERR(array))
+		goto out;
+	array = tap_get_skb_array(file);
+	if (!IS_ERR(array))
+		goto out;
+	array = NULL;
+out:
+	fput(file);
+	return array;
+}
+
 static struct socket *get_tap_socket(int fd)
 {
 	struct file *file = fget(fd);
@@ -1029,6 +1084,7 @@  static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
 
 		vhost_net_disable_vq(n, vq);
 		vq->private_data = sock;
+		nvq->rx_array = get_tap_skb_array(fd);
 		r = vhost_vq_init_access(vq);
 		if (r)
 			goto err_used;