diff mbox series

[RFC,v2,6/7] virtio: in order support for virtio_ring

Message ID 20220817135718.2553-7-qtxuning1999@sjtu.edu.cn (mailing list archive)
State New, archived
Headers show
Series In order support for virtio_ring, vhost and vsock. | expand

Commit Message

Guo Zhi Aug. 17, 2022, 1:57 p.m. UTC
If in order feature negotiated, we can skip the used ring to get
buffer's desc id sequentially.

Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
---
 drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
 1 file changed, 45 insertions(+), 8 deletions(-)

Comments

Xuan Zhuo Aug. 18, 2022, 3:11 a.m. UTC | #1
On Wed, 17 Aug 2022 21:57:17 +0800, Guo Zhi <qtxuning1999@sjtu.edu.cn> wrote:
> If in order feature negotiated, we can skip the used ring to get
> buffer's desc id sequentially.
>
> Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
> ---
>  drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>  1 file changed, 45 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index 1c1b3fa376a2..143184ebb5a1 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>  			/* DMA address and size information */
>  			dma_addr_t queue_dma_addr;
>  			size_t queue_size_in_bytes;
> +
> +			/* In order feature batch begin here */
> +			u16 next_desc_begin;
>  		} split;
>
>  		/* Available for packed ring */
> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
>  	}
>
>  	vring_unmap_one_split(vq, i);
> -	vq->split.desc_extra[i].next = vq->free_head;
> -	vq->free_head = head;
> +	/* In order feature use desc in order,
> +	 * that means, the next desc will always be free
> +	 */
> +	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {

Call virtio_has_feature() here is not good.

Thanks.

> +		vq->split.desc_extra[i].next = vq->free_head;
> +		vq->free_head = head;
> +	}
>
>  	/* Plus final descriptor */
>  	vq->vq.num_free++;
> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
>  {
>  	struct vring_virtqueue *vq = to_vvq(_vq);
>  	void *ret;
> -	unsigned int i;
> +	unsigned int i, j;
>  	u16 last_used;
>
>  	START_USE(vq);
> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
>  	/* Only get used array entries after they have been exposed by host. */
>  	virtio_rmb(vq->weak_barriers);
>
> -	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> -	i = virtio32_to_cpu(_vq->vdev,
> -			vq->split.vring.used->ring[last_used].id);
> -	*len = virtio32_to_cpu(_vq->vdev,
> -			vq->split.vring.used->ring[last_used].len);
> +	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
> +		/* Skip used ring and get used desc in order*/
> +		i = vq->split.next_desc_begin;
> +		j = i;
> +		/* Indirect only takes one descriptor in descriptor table */
> +		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
> +			j = (j + 1) % vq->split.vring.num;
> +		/* move to next */
> +		j = (j + 1) % vq->split.vring.num;
> +		/* Next buffer will use this descriptor in order */
> +		vq->split.next_desc_begin = j;
> +		if (!vq->indirect) {
> +			*len = vq->split.desc_extra[i].len;
> +		} else {
> +			struct vring_desc *indir_desc =
> +				vq->split.desc_state[i].indir_desc;
> +			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
> +
> +			if (indir_desc) {
> +				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
> +					buffer_len += indir_desc[j].len;
> +			}
> +
> +			*len = buffer_len;
> +		}
> +	} else {
> +		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> +		i = virtio32_to_cpu(_vq->vdev,
> +				    vq->split.vring.used->ring[last_used].id);
> +		*len = virtio32_to_cpu(_vq->vdev,
> +				       vq->split.vring.used->ring[last_used].len);
> +	}
>
>  	if (unlikely(i >= vq->split.vring.num)) {
>  		BAD_RING(vq, "id %u out of range\n", i);
> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
>  	vq->split.avail_flags_shadow = 0;
>  	vq->split.avail_idx_shadow = 0;
>
> +	vq->split.next_desc_begin = 0;
> +
>  	/* No callback?  Tell other side not to bother us. */
>  	if (!callback) {
>  		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
> --
> 2.17.1
>
Guo Zhi Aug. 18, 2022, 3:16 a.m. UTC | #2
----- Original Message -----
> From: "Xuan Zhuo" <xuanzhuo@linux.alibaba.com>
> To: "Guo Zhi" <qtxuning1999@sjtu.edu.cn>
> Cc: "netdev" <netdev@vger.kernel.org>, "linux-kernel" <linux-kernel@vger.kernel.org>, "kvm list" <kvm@vger.kernel.org>,
> "virtualization" <virtualization@lists.linux-foundation.org>, "Guo Zhi" <qtxuning1999@sjtu.edu.cn>, "eperezma"
> <eperezma@redhat.com>, "jasowang" <jasowang@redhat.com>, "sgarzare" <sgarzare@redhat.com>, "Michael Tsirkin"
> <mst@redhat.com>
> Sent: Thursday, August 18, 2022 11:11:58 AM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring

> On Wed, 17 Aug 2022 21:57:17 +0800, Guo Zhi <qtxuning1999@sjtu.edu.cn> wrote:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
>> ---
>>  drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>>  1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>>  			/* DMA address and size information */
>>  			dma_addr_t queue_dma_addr;
>>  			size_t queue_size_in_bytes;
>> +
>> +			/* In order feature batch begin here */
>> +			u16 next_desc_begin;
>>  		} split;
>>
>>  		/* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>>  	}
>>
>>  	vring_unmap_one_split(vq, i);
>> -	vq->split.desc_extra[i].next = vq->free_head;
>> -	vq->free_head = head;
>> +	/* In order feature use desc in order,
>> +	 * that means, the next desc will always be free
>> +	 */
>> +	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
> 
> Call virtio_has_feature() here is not good.
> 
> Thanks.
> 

Maybe I can use a variable like vq->indiret?
Thanks.

>> +		vq->split.desc_extra[i].next = vq->free_head;
>> +		vq->free_head = head;
>> +	}
>>
>>  	/* Plus final descriptor */
>>  	vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>  {
>>  	struct vring_virtqueue *vq = to_vvq(_vq);
>>  	void *ret;
>> -	unsigned int i;
>> +	unsigned int i, j;
>>  	u16 last_used;
>>
>>  	START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>  	/* Only get used array entries after they have been exposed by host. */
>>  	virtio_rmb(vq->weak_barriers);
>>
>> -	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> -	i = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].id);
>> -	*len = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].len);
>> +	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> +		/* Skip used ring and get used desc in order*/
>> +		i = vq->split.next_desc_begin;
>> +		j = i;
>> +		/* Indirect only takes one descriptor in descriptor table */
>> +		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> +			j = (j + 1) % vq->split.vring.num;
>> +		/* move to next */
>> +		j = (j + 1) % vq->split.vring.num;
>> +		/* Next buffer will use this descriptor in order */
>> +		vq->split.next_desc_begin = j;
>> +		if (!vq->indirect) {
>> +			*len = vq->split.desc_extra[i].len;
>> +		} else {
>> +			struct vring_desc *indir_desc =
>> +				vq->split.desc_state[i].indir_desc;
>> +			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> +			if (indir_desc) {
>> +				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> +					buffer_len += indir_desc[j].len;
>> +			}
>> +
>> +			*len = buffer_len;
>> +		}
>> +	} else {
>> +		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> +		i = virtio32_to_cpu(_vq->vdev,
>> +				    vq->split.vring.used->ring[last_used].id);
>> +		*len = virtio32_to_cpu(_vq->vdev,
>> +				       vq->split.vring.used->ring[last_used].len);
>> +	}
>>
>>  	if (unlikely(i >= vq->split.vring.num)) {
>>  		BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>>  	vq->split.avail_flags_shadow = 0;
>>  	vq->split.avail_idx_shadow = 0;
>>
>> +	vq->split.next_desc_begin = 0;
>> +
>>  	/* No callback?  Tell other side not to bother us. */
>>  	if (!callback) {
>>  		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
>> --
>> 2.17.1
>>
Jason Wang Aug. 25, 2022, 7:44 a.m. UTC | #3
在 2022/8/17 21:57, Guo Zhi 写道:
> If in order feature negotiated, we can skip the used ring to get
> buffer's desc id sequentially.
>
> Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
> ---
>   drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>   1 file changed, 45 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
> index 1c1b3fa376a2..143184ebb5a1 100644
> --- a/drivers/virtio/virtio_ring.c
> +++ b/drivers/virtio/virtio_ring.c
> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>   			/* DMA address and size information */
>   			dma_addr_t queue_dma_addr;
>   			size_t queue_size_in_bytes;
> +
> +			/* In order feature batch begin here */


We need tweak the comment, it's not easy for me to understand the 
meaning here.


> +			u16 next_desc_begin;
>   		} split;
>   
>   		/* Available for packed ring */
> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
>   	}
>   
>   	vring_unmap_one_split(vq, i);
> -	vq->split.desc_extra[i].next = vq->free_head;
> -	vq->free_head = head;
> +	/* In order feature use desc in order,
> +	 * that means, the next desc will always be free
> +	 */


Maybe we should add something like "The descriptors are prepared in order".


> +	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
> +		vq->split.desc_extra[i].next = vq->free_head;
> +		vq->free_head = head;
> +	}
>   
>   	/* Plus final descriptor */
>   	vq->vq.num_free++;
> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
>   {
>   	struct vring_virtqueue *vq = to_vvq(_vq);
>   	void *ret;
> -	unsigned int i;
> +	unsigned int i, j;
>   	u16 last_used;
>   
>   	START_USE(vq);
> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
>   	/* Only get used array entries after they have been exposed by host. */
>   	virtio_rmb(vq->weak_barriers);
>   
> -	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> -	i = virtio32_to_cpu(_vq->vdev,
> -			vq->split.vring.used->ring[last_used].id);
> -	*len = virtio32_to_cpu(_vq->vdev,
> -			vq->split.vring.used->ring[last_used].len);
> +	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
> +		/* Skip used ring and get used desc in order*/
> +		i = vq->split.next_desc_begin;
> +		j = i;
> +		/* Indirect only takes one descriptor in descriptor table */
> +		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
> +			j = (j + 1) % vq->split.vring.num;


Let's move the expensive mod outside the loop. Or it's split so we can 
use and here actually since the size is guaranteed to be power of the 
two? Another question, is it better to store the next_desc in e.g 
desc_extra?

And this seems very expensive if the device doesn't do the batching 
(which is not mandatory).


> +		/* move to next */
> +		j = (j + 1) % vq->split.vring.num;
> +		/* Next buffer will use this descriptor in order */
> +		vq->split.next_desc_begin = j;
> +		if (!vq->indirect) {
> +			*len = vq->split.desc_extra[i].len;
> +		} else {
> +			struct vring_desc *indir_desc =
> +				vq->split.desc_state[i].indir_desc;
> +			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
> +
> +			if (indir_desc) {
> +				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
> +					buffer_len += indir_desc[j].len;


So I think we need to finalize this, then we can have much more stress 
on the cache:

https://lkml.org/lkml/2021/10/26/1300

It was reverted since it's too aggressive, we should instead:

1) do the validation only for morden device

2) fail only when we enable the validation via (e.g a module parameter).

Thanks


> +			}
> +
> +			*len = buffer_len;
> +		}
> +	} else {
> +		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
> +		i = virtio32_to_cpu(_vq->vdev,
> +				    vq->split.vring.used->ring[last_used].id);
> +		*len = virtio32_to_cpu(_vq->vdev,
> +				       vq->split.vring.used->ring[last_used].len);
> +	}
>   
>   	if (unlikely(i >= vq->split.vring.num)) {
>   		BAD_RING(vq, "id %u out of range\n", i);
> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int index,
>   	vq->split.avail_flags_shadow = 0;
>   	vq->split.avail_idx_shadow = 0;
>   
> +	vq->split.next_desc_begin = 0;
> +
>   	/* No callback?  Tell other side not to bother us. */
>   	if (!callback) {
>   		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
Guo Zhi Aug. 26, 2022, 3:14 a.m. UTC | #4
----- Original Message -----
> From: "jasowang" <jasowang@redhat.com>
> To: "Guo Zhi" <qtxuning1999@sjtu.edu.cn>, "eperezma" <eperezma@redhat.com>, "sgarzare" <sgarzare@redhat.com>, "Michael
> Tsirkin" <mst@redhat.com>
> Cc: "netdev" <netdev@vger.kernel.org>, "linux-kernel" <linux-kernel@vger.kernel.org>, "kvm list" <kvm@vger.kernel.org>,
> "virtualization" <virtualization@lists.linux-foundation.org>
> Sent: Thursday, August 25, 2022 3:44:41 PM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring

> 在 2022/8/17 21:57, Guo Zhi 写道:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
>> ---
>>   drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>>   1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>>   			/* DMA address and size information */
>>   			dma_addr_t queue_dma_addr;
>>   			size_t queue_size_in_bytes;
>> +
>> +			/* In order feature batch begin here */
> 
> 
> We need tweak the comment, it's not easy for me to understand the
> meaning here.
> 
How about this: if in_order feature is negotiated, is the next head to consume.
> 
>> +			u16 next_desc_begin;
>>   		} split;
>>   
>>   		/* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>>   	}
>>   
>>   	vring_unmap_one_split(vq, i);
>> -	vq->split.desc_extra[i].next = vq->free_head;
>> -	vq->free_head = head;
>> +	/* In order feature use desc in order,
>> +	 * that means, the next desc will always be free
>> +	 */
> 
> 
> Maybe we should add something like "The descriptors are prepared in order".
> 
I will change it to this: The descriptors are made available in order if the in order feature is used. Since the free_head is already a circular list, it must consume it sequentially.

> 
>> +	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>> +		vq->split.desc_extra[i].next = vq->free_head;
>> +		vq->free_head = head;
>> +	}
>>   
>>   	/* Plus final descriptor */
>>   	vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>   {
>>   	struct vring_virtqueue *vq = to_vvq(_vq);
>>   	void *ret;
>> -	unsigned int i;
>> +	unsigned int i, j;
>>   	u16 last_used;
>>   
>>   	START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>   	/* Only get used array entries after they have been exposed by host. */
>>   	virtio_rmb(vq->weak_barriers);
>>   
>> -	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> -	i = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].id);
>> -	*len = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].len);
>> +	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> +		/* Skip used ring and get used desc in order*/
>> +		i = vq->split.next_desc_begin;
>> +		j = i;
>> +		/* Indirect only takes one descriptor in descriptor table */
>> +		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> +			j = (j + 1) % vq->split.vring.num;
> 
> 
> Let's move the expensive mod outside the loop. Or it's split so we can
> use and here actually since the size is guaranteed to be power of the
> two? Another question, is it better to store the next_desc in e.g
> desc_extra?

Thanks, I will use bit operation instead of mod.

We only use one next_desc_begin at the same time, there is no need to store it in desc_extra.

> 
> And this seems very expensive if the device doesn't do the batching
> (which is not mandatory).
> 
> 
We will judge whether the device batched the buffer or not, we will only use this way for the batched buffer.
And Not in order is more expensive, because is following a linked list.

>> +		/* move to next */
>> +		j = (j + 1) % vq->split.vring.num;
>> +		/* Next buffer will use this descriptor in order */
>> +		vq->split.next_desc_begin = j;
>> +		if (!vq->indirect) {
>> +			*len = vq->split.desc_extra[i].len;
>> +		} else {
>> +			struct vring_desc *indir_desc =
>> +				vq->split.desc_state[i].indir_desc;
>> +			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> +			if (indir_desc) {
>> +				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> +					buffer_len += indir_desc[j].len;
> 
> 
> So I think we need to finalize this, then we can have much more stress
> on the cache:
> 
> https://lkml.org/lkml/2021/10/26/1300
> 
> It was reverted since it's too aggressive, we should instead:
> 
> 1) do the validation only for morden device
> 
> 2) fail only when we enable the validation via (e.g a module parameter).
> 
> Thanks
> 
> 
>> +			}
>> +
>> +			*len = buffer_len;
>> +		}
>> +	} else {
>> +		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> +		i = virtio32_to_cpu(_vq->vdev,
>> +				    vq->split.vring.used->ring[last_used].id);
>> +		*len = virtio32_to_cpu(_vq->vdev,
>> +				       vq->split.vring.used->ring[last_used].len);
>> +	}
>>   
>>   	if (unlikely(i >= vq->split.vring.num)) {
>>   		BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>>   	vq->split.avail_flags_shadow = 0;
>>   	vq->split.avail_idx_shadow = 0;
>>   
>> +	vq->split.next_desc_begin = 0;
>> +
>>   	/* No callback?  Tell other side not to bother us. */
>>   	if (!callback) {
>>   		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
Guo Zhi Aug. 26, 2022, 3:18 a.m. UTC | #5
----- Original Message -----
> From: "jasowang" <jasowang@redhat.com>
> To: "Guo Zhi" <qtxuning1999@sjtu.edu.cn>, "eperezma" <eperezma@redhat.com>, "sgarzare" <sgarzare@redhat.com>, "Michael
> Tsirkin" <mst@redhat.com>
> Cc: "netdev" <netdev@vger.kernel.org>, "linux-kernel" <linux-kernel@vger.kernel.org>, "kvm list" <kvm@vger.kernel.org>,
> "virtualization" <virtualization@lists.linux-foundation.org>
> Sent: Thursday, August 25, 2022 3:44:41 PM
> Subject: Re: [RFC v2 6/7] virtio: in order support for virtio_ring

> 在 2022/8/17 21:57, Guo Zhi 写道:
>> If in order feature negotiated, we can skip the used ring to get
>> buffer's desc id sequentially.
>>
>> Signed-off-by: Guo Zhi <qtxuning1999@sjtu.edu.cn>
>> ---
>>   drivers/virtio/virtio_ring.c | 53 ++++++++++++++++++++++++++++++------
>>   1 file changed, 45 insertions(+), 8 deletions(-)
>>
>> diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
>> index 1c1b3fa376a2..143184ebb5a1 100644
>> --- a/drivers/virtio/virtio_ring.c
>> +++ b/drivers/virtio/virtio_ring.c
>> @@ -144,6 +144,9 @@ struct vring_virtqueue {
>>   			/* DMA address and size information */
>>   			dma_addr_t queue_dma_addr;
>>   			size_t queue_size_in_bytes;
>> +
>> +			/* In order feature batch begin here */
> 
> 
> We need tweak the comment, it's not easy for me to understand the
> meaning here.
> 
> 
>> +			u16 next_desc_begin;
>>   		} split;
>>   
>>   		/* Available for packed ring */
>> @@ -702,8 +705,13 @@ static void detach_buf_split(struct vring_virtqueue *vq,
>> unsigned int head,
>>   	}
>>   
>>   	vring_unmap_one_split(vq, i);
>> -	vq->split.desc_extra[i].next = vq->free_head;
>> -	vq->free_head = head;
>> +	/* In order feature use desc in order,
>> +	 * that means, the next desc will always be free
>> +	 */
> 
> 
> Maybe we should add something like "The descriptors are prepared in order".
> 
> 
>> +	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
>> +		vq->split.desc_extra[i].next = vq->free_head;
>> +		vq->free_head = head;
>> +	}
>>   
>>   	/* Plus final descriptor */
>>   	vq->vq.num_free++;
>> @@ -745,7 +753,7 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>   {
>>   	struct vring_virtqueue *vq = to_vvq(_vq);
>>   	void *ret;
>> -	unsigned int i;
>> +	unsigned int i, j;
>>   	u16 last_used;
>>   
>>   	START_USE(vq);
>> @@ -764,11 +772,38 @@ static void *virtqueue_get_buf_ctx_split(struct virtqueue
>> *_vq,
>>   	/* Only get used array entries after they have been exposed by host. */
>>   	virtio_rmb(vq->weak_barriers);
>>   
>> -	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> -	i = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].id);
>> -	*len = virtio32_to_cpu(_vq->vdev,
>> -			vq->split.vring.used->ring[last_used].len);
>> +	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
>> +		/* Skip used ring and get used desc in order*/
>> +		i = vq->split.next_desc_begin;
>> +		j = i;
>> +		/* Indirect only takes one descriptor in descriptor table */
>> +		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
>> +			j = (j + 1) % vq->split.vring.num;
> 
> 
> Let's move the expensive mod outside the loop. Or it's split so we can
> use and here actually since the size is guaranteed to be power of the
> two? Another question, is it better to store the next_desc in e.g
> desc_extra?
> 
> And this seems very expensive if the device doesn't do the batching
> (which is not mandatory).
> 
> 
>> +		/* move to next */
>> +		j = (j + 1) % vq->split.vring.num;
>> +		/* Next buffer will use this descriptor in order */
>> +		vq->split.next_desc_begin = j;
>> +		if (!vq->indirect) {
>> +			*len = vq->split.desc_extra[i].len;
>> +		} else {
>> +			struct vring_desc *indir_desc =
>> +				vq->split.desc_state[i].indir_desc;
>> +			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
>> +
>> +			if (indir_desc) {
>> +				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
>> +					buffer_len += indir_desc[j].len;
> 
> 
> So I think we need to finalize this, then we can have much more stress
> on the cache:
> 
> https://lkml.org/lkml/2021/10/26/1300
> 
> It was reverted since it's too aggressive, we should instead:
> 
> 1) do the validation only for morden device
> 
> 2) fail only when we enable the validation via (e.g a module parameter).
> 
> Thanks
> 

Sorry for this obsolete implementation, we will not get buffer'len like this(in a loop).
Actually, for not skipped buffers, we can get length from used ring directly, for skipped buffers
I think we don’t have to get the length, because the driver is not interested in the skipped buffers(tx)’ length.

> 
>> +			}
>> +
>> +			*len = buffer_len;
>> +		}
>> +	} else {
>> +		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
>> +		i = virtio32_to_cpu(_vq->vdev,
>> +				    vq->split.vring.used->ring[last_used].id);
>> +		*len = virtio32_to_cpu(_vq->vdev,
>> +				       vq->split.vring.used->ring[last_used].len);
>> +	}
>>   
>>   	if (unlikely(i >= vq->split.vring.num)) {
>>   		BAD_RING(vq, "id %u out of range\n", i);
>> @@ -2236,6 +2271,8 @@ struct virtqueue *__vring_new_virtqueue(unsigned int
>> index,
>>   	vq->split.avail_flags_shadow = 0;
>>   	vq->split.avail_idx_shadow = 0;
>>   
>> +	vq->split.next_desc_begin = 0;
>> +
>>   	/* No callback?  Tell other side not to bother us. */
>>   	if (!callback) {
>>   		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;
diff mbox series

Patch

diff --git a/drivers/virtio/virtio_ring.c b/drivers/virtio/virtio_ring.c
index 1c1b3fa376a2..143184ebb5a1 100644
--- a/drivers/virtio/virtio_ring.c
+++ b/drivers/virtio/virtio_ring.c
@@ -144,6 +144,9 @@  struct vring_virtqueue {
 			/* DMA address and size information */
 			dma_addr_t queue_dma_addr;
 			size_t queue_size_in_bytes;
+
+			/* In order feature batch begin here */
+			u16 next_desc_begin;
 		} split;
 
 		/* Available for packed ring */
@@ -702,8 +705,13 @@  static void detach_buf_split(struct vring_virtqueue *vq, unsigned int head,
 	}
 
 	vring_unmap_one_split(vq, i);
-	vq->split.desc_extra[i].next = vq->free_head;
-	vq->free_head = head;
+	/* In order feature use desc in order,
+	 * that means, the next desc will always be free
+	 */
+	if (!virtio_has_feature(vq->vq.vdev, VIRTIO_F_IN_ORDER)) {
+		vq->split.desc_extra[i].next = vq->free_head;
+		vq->free_head = head;
+	}
 
 	/* Plus final descriptor */
 	vq->vq.num_free++;
@@ -745,7 +753,7 @@  static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
 {
 	struct vring_virtqueue *vq = to_vvq(_vq);
 	void *ret;
-	unsigned int i;
+	unsigned int i, j;
 	u16 last_used;
 
 	START_USE(vq);
@@ -764,11 +772,38 @@  static void *virtqueue_get_buf_ctx_split(struct virtqueue *_vq,
 	/* Only get used array entries after they have been exposed by host. */
 	virtio_rmb(vq->weak_barriers);
 
-	last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
-	i = virtio32_to_cpu(_vq->vdev,
-			vq->split.vring.used->ring[last_used].id);
-	*len = virtio32_to_cpu(_vq->vdev,
-			vq->split.vring.used->ring[last_used].len);
+	if (virtio_has_feature(_vq->vdev, VIRTIO_F_IN_ORDER)) {
+		/* Skip used ring and get used desc in order*/
+		i = vq->split.next_desc_begin;
+		j = i;
+		/* Indirect only takes one descriptor in descriptor table */
+		while (!vq->indirect && (vq->split.desc_extra[j].flags & VRING_DESC_F_NEXT))
+			j = (j + 1) % vq->split.vring.num;
+		/* move to next */
+		j = (j + 1) % vq->split.vring.num;
+		/* Next buffer will use this descriptor in order */
+		vq->split.next_desc_begin = j;
+		if (!vq->indirect) {
+			*len = vq->split.desc_extra[i].len;
+		} else {
+			struct vring_desc *indir_desc =
+				vq->split.desc_state[i].indir_desc;
+			u32 indir_num = vq->split.desc_extra[i].len, buffer_len = 0;
+
+			if (indir_desc) {
+				for (j = 0; j < indir_num / sizeof(struct vring_desc); j++)
+					buffer_len += indir_desc[j].len;
+			}
+
+			*len = buffer_len;
+		}
+	} else {
+		last_used = (vq->last_used_idx & (vq->split.vring.num - 1));
+		i = virtio32_to_cpu(_vq->vdev,
+				    vq->split.vring.used->ring[last_used].id);
+		*len = virtio32_to_cpu(_vq->vdev,
+				       vq->split.vring.used->ring[last_used].len);
+	}
 
 	if (unlikely(i >= vq->split.vring.num)) {
 		BAD_RING(vq, "id %u out of range\n", i);
@@ -2236,6 +2271,8 @@  struct virtqueue *__vring_new_virtqueue(unsigned int index,
 	vq->split.avail_flags_shadow = 0;
 	vq->split.avail_idx_shadow = 0;
 
+	vq->split.next_desc_begin = 0;
+
 	/* No callback?  Tell other side not to bother us. */
 	if (!callback) {
 		vq->split.avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT;