diff mbox series

[net-next,v3,4/8] virtio-net: xsk zero copy xmit implement wakeup and xmit

Message ID 20210331071139.15473-5-xuanzhuo@linux.alibaba.com (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series virtio-net support xdp socket zero copy xmit | expand

Checks

Context Check Description
netdev/cover_letter success Link
netdev/fixes_present success Link
netdev/patch_count success Link
netdev/tree_selection success Clearly marked for net-next
netdev/subject_prefix success Link
netdev/cc_maintainers success CCed 16 of 16 maintainers
netdev/source_inline success Was 0 now: 0
netdev/verify_signedoff success Link
netdev/module_param fail Was 0 now: 1
netdev/build_32bit success Errors and warnings before: 0 this patch: 0
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/verify_fixes success Link
netdev/checkpatch success total: 0 errors, 0 warnings, 0 checks, 222 lines checked
netdev/build_allmodconfig_warn success Errors and warnings before: 0 this patch: 0
netdev/header_inline success Link

Commit Message

Xuan Zhuo March 31, 2021, 7:11 a.m. UTC
When the user calls sendto to consume the data in the xsk tx queue,
virtnet_xsk_wakeup will be called.

In wakeup, it will try to send a part of the data directly, the quantity
is operated by the module parameter xsk_budget. There are two purposes
for this realization:

1. Send part of the data quickly to reduce the transmission delay of the
   first packet
2. Trigger tx interrupt, start napi to consume xsk tx data

All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
needs to support csum and other functions later, consider assigning xsk
hdr separately for each sent packet.

Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
---
 drivers/net/virtio_net.c | 183 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 183 insertions(+)

Comments

Jason Wang April 6, 2021, 6:19 a.m. UTC | #1
在 2021/3/31 下午3:11, Xuan Zhuo 写道:
> When the user calls sendto to consume the data in the xsk tx queue,
> virtnet_xsk_wakeup will be called.
>
> In wakeup, it will try to send a part of the data directly, the quantity
> is operated by the module parameter xsk_budget.


Any reason that we can't use NAPI budget?


>   There are two purposes
> for this realization:
>
> 1. Send part of the data quickly to reduce the transmission delay of the
>     first packet
> 2. Trigger tx interrupt, start napi to consume xsk tx data
>
> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
> needs to support csum and other functions later, consider assigning xsk
> hdr separately for each sent packet.
>
> Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
> Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
> ---
>   drivers/net/virtio_net.c | 183 +++++++++++++++++++++++++++++++++++++++
>   1 file changed, 183 insertions(+)
>
> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
> index 4e25408a2b37..c8a317a93ef7 100644
> --- a/drivers/net/virtio_net.c
> +++ b/drivers/net/virtio_net.c
> @@ -28,9 +28,11 @@ static int napi_weight = NAPI_POLL_WEIGHT;
>   module_param(napi_weight, int, 0444);
>   
>   static bool csum = true, gso = true, napi_tx = true;
> +static int xsk_budget = 32;
>   module_param(csum, bool, 0444);
>   module_param(gso, bool, 0444);
>   module_param(napi_tx, bool, 0644);
> +module_param(xsk_budget, int, 0644);
>   
>   /* FIXME: MTU in config. */
>   #define GOOD_PACKET_LEN (ETH_HLEN + VLAN_HLEN + ETH_DATA_LEN)
> @@ -47,6 +49,8 @@ module_param(napi_tx, bool, 0644);
>   
>   #define VIRTIO_XDP_FLAG	BIT(0)
>   
> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
> +
>   /* RX packet size EWMA. The average packet size is used to determine the packet
>    * buffer size when refilling RX rings. As the entire RX ring may be refilled
>    * at once, the weight is chosen so that the EWMA will be insensitive to short-
> @@ -138,6 +142,9 @@ struct send_queue {
>   	struct {
>   		/* xsk pool */
>   		struct xsk_buff_pool __rcu *pool;
> +
> +		/* save the desc for next xmit, when xmit fail. */
> +		struct xdp_desc last_desc;
>   	} xsk;
>   };
>   
> @@ -2532,6 +2539,179 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>   	return err;
>   }
>   
> +static void virtnet_xsk_check_space(struct send_queue *sq)
> +{


The name is confusing, the function does more than just checking the 
space, it may stop the queue as well.


> +	struct virtnet_info *vi = sq->vq->vdev->priv;
> +	struct net_device *dev = vi->dev;
> +	int qnum = sq - vi->sq;
> +
> +	/* If this sq is not the exclusive queue of the current cpu,
> +	 * then it may be called by start_xmit, so check it running out
> +	 * of space.
> +	 */


So the code can explain itself. We need a better comment to explain why 
we need to differ the case of the raw buffer queue.


> +	if (is_xdp_raw_buffer_queue(vi, qnum))
> +		return;
> +
> +	/* Stop the queue to avoid getting packets that we are
> +	 * then unable to transmit. Then wait the tx interrupt.
> +	 */
> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
> +		netif_stop_subqueue(dev, qnum);
> +}
> +
> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
> +			    struct xdp_desc *desc)
> +{
> +	struct virtnet_info *vi;
> +	struct page *page;
> +	void *data;
> +	u32 offset;
> +	u64 addr;
> +	int err;
> +
> +	vi = sq->vq->vdev->priv;
> +	addr = desc->addr;
> +	data = xsk_buff_raw_get_data(pool, addr);
> +	offset = offset_in_page(data);
> +
> +	sg_init_table(sq->sg, 2);
> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
> +	page = xsk_buff_xdp_get_page(pool, addr);
> +	sg_set_page(sq->sg + 1, page, desc->len, offset);
> +
> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, 2, NULL, GFP_ATOMIC);
> +	if (unlikely(err))
> +		sq->xsk.last_desc = *desc;


So I think it's better to make sure we had at least 2 slots to avoid 
handling errors like this? (Especially consider the queue size is not 
necessarily the power of 2 when packed virtqueue is used).


> +
> +	return err;
> +}
> +
> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
> +				  struct xsk_buff_pool *pool,
> +				  unsigned int budget,
> +				  bool in_napi, int *done)
> +{
> +	struct xdp_desc desc;
> +	int err, packet = 0;
> +	int ret = -EAGAIN;
> +
> +	if (sq->xsk.last_desc.addr) {


Any reason that num_free is not checked here?


> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
> +		if (unlikely(err))
> +			return -EBUSY;
> +
> +		++packet;
> +		--budget;
> +		sq->xsk.last_desc.addr = 0;
> +	}
> +
> +	while (budget-- > 0) {
> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
> +			ret = -EBUSY;
> +			break;
> +		}
> +
> +		if (!xsk_tx_peek_desc(pool, &desc)) {
> +			/* done */
> +			ret = 0;
> +			break;
> +		}
> +
> +		err = virtnet_xsk_xmit(sq, pool, &desc);
> +		if (unlikely(err)) {
> +			ret = -EBUSY;
> +			break;
> +		}
> +
> +		++packet;
> +	}
> +
> +	if (packet) {
> +		if (virtqueue_kick_prepare(sq->vq) &&
> +		    virtqueue_notify(sq->vq)) {
> +			u64_stats_update_begin(&sq->stats.syncp);
> +			sq->stats.kicks += 1;
> +			u64_stats_update_end(&sq->stats.syncp);
> +		}
> +
> +		*done = packet;
> +
> +		xsk_tx_release(pool);
> +	}
> +
> +	return ret;
> +}
> +
> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
> +			   int budget, bool in_napi)
> +{
> +	int done = 0;
> +	int err;
> +
> +	free_old_xmit_skbs(sq, in_napi);
> +
> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done);
> +	/* -EAGAIN: done == budget
> +	 * -EBUSY: done < budget
> +	 *  0    : done < budget
> +	 */


Please move them to the comment above virtnet_xsk_xmit_batch().

And it looks to me there's no care for -EAGAIN, any reason for sticking 
a dedicated variable like that?


> +	if (err == -EBUSY) {
> +		free_old_xmit_skbs(sq, in_napi);
> +
> +		/* If the space is enough, let napi run again. */
> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
> +			done = budget;


So I don't see how this can work in the case of event index where the 
notification needs to be enabled explicitly.


> +	}
> +
> +	virtnet_xsk_check_space(sq);
> +
> +	return done;
> +}
> +
> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
> +{
> +	struct virtnet_info *vi = netdev_priv(dev);
> +	struct xsk_buff_pool *pool;
> +	struct netdev_queue *txq;
> +	struct send_queue *sq;
> +
> +	if (!netif_running(dev))
> +		return -ENETDOWN;
> +
> +	if (qid >= vi->curr_queue_pairs)
> +		return -EINVAL;
> +
> +	sq = &vi->sq[qid];
> +
> +	rcu_read_lock();
> +
> +	pool = rcu_dereference(sq->xsk.pool);
> +	if (!pool)
> +		goto end;
> +
> +	if (napi_if_scheduled_mark_missed(&sq->napi))
> +		goto end;
> +
> +	txq = netdev_get_tx_queue(dev, qid);
> +
> +	__netif_tx_lock_bh(txq);
> +
> +	/* Send part of the packet directly to reduce the delay in sending the
> +	 * packet, and this can actively trigger the tx interrupts.
> +	 *
> +	 * If no packet is sent out, the ring of the device is full. In this
> +	 * case, we will still get a tx interrupt response. Then we will deal
> +	 * with the subsequent packet sending work.
> +	 */
> +	virtnet_xsk_run(sq, pool, xsk_budget, false);


So the return value is ignored, this means there's no way to report we 
exhaust the budget. Is this intended?

Thanks


> +
> +	__netif_tx_unlock_bh(txq);
> +
> +end:
> +	rcu_read_unlock();
> +	return 0;
> +}
> +
>   static int virtnet_xsk_pool_enable(struct net_device *dev,
>   				   struct xsk_buff_pool *pool,
>   				   u16 qid)
> @@ -2553,6 +2733,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>   	if (rcu_dereference(sq->xsk.pool))
>   		goto end;
>   
> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
> +
>   	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>   	 * safe.
>   	 */
> @@ -2656,6 +2838,7 @@ static const struct net_device_ops virtnet_netdev = {
>   	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>   	.ndo_bpf		= virtnet_xdp,
>   	.ndo_xdp_xmit		= virtnet_xdp_xmit,
> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>   	.ndo_features_check	= passthru_features_check,
>   	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>   	.ndo_set_features	= virtnet_set_features,
Magnus Karlsson April 7, 2021, 9:56 a.m. UTC | #2
On Tue, Apr 6, 2021 at 3:33 PM Jason Wang <jasowang@redhat.com> wrote:
>
>
> 在 2021/3/31 下午3:11, Xuan Zhuo 写道:
> > When the user calls sendto to consume the data in the xsk tx queue,
> > virtnet_xsk_wakeup will be called.
> >
> > In wakeup, it will try to send a part of the data directly, the quantity
> > is operated by the module parameter xsk_budget.
>
>
> Any reason that we can't use NAPI budget?

I think we should use NAPI budget here and skip this module parameter.
If a user would like to control the number of packets processed, then
the preferred busy_poll mode [1] can be used. In that patch series, a
new setsockopt option was introduced called SO_BUSY_POLL_BUDGET. With
it you can set the napi budget value without the need of a module
parameter.

[1]: https://lore.kernel.org/bpf/20201119083024.119566-1-bjorn.topel@gmail.com/T/

>
> >   There are two purposes
> > for this realization:
> >
> > 1. Send part of the data quickly to reduce the transmission delay of the
> >     first packet
> > 2. Trigger tx interrupt, start napi to consume xsk tx data
> >
> > All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
> > needs to support csum and other functions later, consider assigning xsk
> > hdr separately for each sent packet.
> >
> > Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
> > Reviewed-by: Dust Li <dust.li@linux.alibaba.com>
> > ---
> >   drivers/net/virtio_net.c | 183 +++++++++++++++++++++++++++++++++++++++
> >   1 file changed, 183 insertions(+)
> >
> > diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
> > index 4e25408a2b37..c8a317a93ef7 100644
> > --- a/drivers/net/virtio_net.c
> > +++ b/drivers/net/virtio_net.c
> > @@ -28,9 +28,11 @@ static int napi_weight = NAPI_POLL_WEIGHT;
> >   module_param(napi_weight, int, 0444);
> >
> >   static bool csum = true, gso = true, napi_tx = true;
> > +static int xsk_budget = 32;
> >   module_param(csum, bool, 0444);
> >   module_param(gso, bool, 0444);
> >   module_param(napi_tx, bool, 0644);
> > +module_param(xsk_budget, int, 0644);
> >
> >   /* FIXME: MTU in config. */
> >   #define GOOD_PACKET_LEN (ETH_HLEN + VLAN_HLEN + ETH_DATA_LEN)
> > @@ -47,6 +49,8 @@ module_param(napi_tx, bool, 0644);
> >
> >   #define VIRTIO_XDP_FLAG     BIT(0)
> >
> > +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
> > +
> >   /* RX packet size EWMA. The average packet size is used to determine the packet
> >    * buffer size when refilling RX rings. As the entire RX ring may be refilled
> >    * at once, the weight is chosen so that the EWMA will be insensitive to short-
> > @@ -138,6 +142,9 @@ struct send_queue {
> >       struct {
> >               /* xsk pool */
> >               struct xsk_buff_pool __rcu *pool;
> > +
> > +             /* save the desc for next xmit, when xmit fail. */
> > +             struct xdp_desc last_desc;
> >       } xsk;
> >   };
> >
> > @@ -2532,6 +2539,179 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
> >       return err;
> >   }
> >
> > +static void virtnet_xsk_check_space(struct send_queue *sq)
> > +{
>
>
> The name is confusing, the function does more than just checking the
> space, it may stop the queue as well.
>
>
> > +     struct virtnet_info *vi = sq->vq->vdev->priv;
> > +     struct net_device *dev = vi->dev;
> > +     int qnum = sq - vi->sq;
> > +
> > +     /* If this sq is not the exclusive queue of the current cpu,
> > +      * then it may be called by start_xmit, so check it running out
> > +      * of space.
> > +      */
>
>
> So the code can explain itself. We need a better comment to explain why
> we need to differ the case of the raw buffer queue.
>
>
> > +     if (is_xdp_raw_buffer_queue(vi, qnum))
> > +             return;
> > +
> > +     /* Stop the queue to avoid getting packets that we are
> > +      * then unable to transmit. Then wait the tx interrupt.
> > +      */
> > +     if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
> > +             netif_stop_subqueue(dev, qnum);
> > +}
> > +
> > +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
> > +                         struct xdp_desc *desc)
> > +{
> > +     struct virtnet_info *vi;
> > +     struct page *page;
> > +     void *data;
> > +     u32 offset;
> > +     u64 addr;
> > +     int err;
> > +
> > +     vi = sq->vq->vdev->priv;
> > +     addr = desc->addr;
> > +     data = xsk_buff_raw_get_data(pool, addr);
> > +     offset = offset_in_page(data);
> > +
> > +     sg_init_table(sq->sg, 2);
> > +     sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
> > +     page = xsk_buff_xdp_get_page(pool, addr);
> > +     sg_set_page(sq->sg + 1, page, desc->len, offset);
> > +
> > +     err = virtqueue_add_outbuf(sq->vq, sq->sg, 2, NULL, GFP_ATOMIC);
> > +     if (unlikely(err))
> > +             sq->xsk.last_desc = *desc;
>
>
> So I think it's better to make sure we had at least 2 slots to avoid
> handling errors like this? (Especially consider the queue size is not
> necessarily the power of 2 when packed virtqueue is used).
>
>
> > +
> > +     return err;
> > +}
> > +
> > +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
> > +                               struct xsk_buff_pool *pool,
> > +                               unsigned int budget,
> > +                               bool in_napi, int *done)
> > +{
> > +     struct xdp_desc desc;
> > +     int err, packet = 0;
> > +     int ret = -EAGAIN;
> > +
> > +     if (sq->xsk.last_desc.addr) {
>
>
> Any reason that num_free is not checked here?
>
>
> > +             err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
> > +             if (unlikely(err))
> > +                     return -EBUSY;
> > +
> > +             ++packet;
> > +             --budget;
> > +             sq->xsk.last_desc.addr = 0;
> > +     }
> > +
> > +     while (budget-- > 0) {
> > +             if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
> > +                     ret = -EBUSY;
> > +                     break;
> > +             }
> > +
> > +             if (!xsk_tx_peek_desc(pool, &desc)) {
> > +                     /* done */
> > +                     ret = 0;
> > +                     break;
> > +             }
> > +
> > +             err = virtnet_xsk_xmit(sq, pool, &desc);
> > +             if (unlikely(err)) {
> > +                     ret = -EBUSY;
> > +                     break;
> > +             }
> > +
> > +             ++packet;
> > +     }
> > +
> > +     if (packet) {
> > +             if (virtqueue_kick_prepare(sq->vq) &&
> > +                 virtqueue_notify(sq->vq)) {
> > +                     u64_stats_update_begin(&sq->stats.syncp);
> > +                     sq->stats.kicks += 1;
> > +                     u64_stats_update_end(&sq->stats.syncp);
> > +             }
> > +
> > +             *done = packet;
> > +
> > +             xsk_tx_release(pool);
> > +     }
> > +
> > +     return ret;
> > +}
> > +
> > +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
> > +                        int budget, bool in_napi)
> > +{
> > +     int done = 0;
> > +     int err;
> > +
> > +     free_old_xmit_skbs(sq, in_napi);
> > +
> > +     err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done);
> > +     /* -EAGAIN: done == budget
> > +      * -EBUSY: done < budget
> > +      *  0    : done < budget
> > +      */
>
>
> Please move them to the comment above virtnet_xsk_xmit_batch().
>
> And it looks to me there's no care for -EAGAIN, any reason for sticking
> a dedicated variable like that?
>
>
> > +     if (err == -EBUSY) {
> > +             free_old_xmit_skbs(sq, in_napi);
> > +
> > +             /* If the space is enough, let napi run again. */
> > +             if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
> > +                     done = budget;
>
>
> So I don't see how this can work in the case of event index where the
> notification needs to be enabled explicitly.
>
>
> > +     }
> > +
> > +     virtnet_xsk_check_space(sq);
> > +
> > +     return done;
> > +}
> > +
> > +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
> > +{
> > +     struct virtnet_info *vi = netdev_priv(dev);
> > +     struct xsk_buff_pool *pool;
> > +     struct netdev_queue *txq;
> > +     struct send_queue *sq;
> > +
> > +     if (!netif_running(dev))
> > +             return -ENETDOWN;
> > +
> > +     if (qid >= vi->curr_queue_pairs)
> > +             return -EINVAL;
> > +
> > +     sq = &vi->sq[qid];
> > +
> > +     rcu_read_lock();
> > +
> > +     pool = rcu_dereference(sq->xsk.pool);
> > +     if (!pool)
> > +             goto end;
> > +
> > +     if (napi_if_scheduled_mark_missed(&sq->napi))
> > +             goto end;
> > +
> > +     txq = netdev_get_tx_queue(dev, qid);
> > +
> > +     __netif_tx_lock_bh(txq);
> > +
> > +     /* Send part of the packet directly to reduce the delay in sending the
> > +      * packet, and this can actively trigger the tx interrupts.
> > +      *
> > +      * If no packet is sent out, the ring of the device is full. In this
> > +      * case, we will still get a tx interrupt response. Then we will deal
> > +      * with the subsequent packet sending work.
> > +      */
> > +     virtnet_xsk_run(sq, pool, xsk_budget, false);
>
>
> So the return value is ignored, this means there's no way to report we
> exhaust the budget. Is this intended?
>
> Thanks
>
>
> > +
> > +     __netif_tx_unlock_bh(txq);
> > +
> > +end:
> > +     rcu_read_unlock();
> > +     return 0;
> > +}
> > +
> >   static int virtnet_xsk_pool_enable(struct net_device *dev,
> >                                  struct xsk_buff_pool *pool,
> >                                  u16 qid)
> > @@ -2553,6 +2733,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
> >       if (rcu_dereference(sq->xsk.pool))
> >               goto end;
> >
> > +     memset(&sq->xsk, 0, sizeof(sq->xsk));
> > +
> >       /* Here is already protected by rtnl_lock, so rcu_assign_pointer is
> >        * safe.
> >        */
> > @@ -2656,6 +2838,7 @@ static const struct net_device_ops virtnet_netdev = {
> >       .ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
> >       .ndo_bpf                = virtnet_xdp,
> >       .ndo_xdp_xmit           = virtnet_xdp_xmit,
> > +     .ndo_xsk_wakeup         = virtnet_xsk_wakeup,
> >       .ndo_features_check     = passthru_features_check,
> >       .ndo_get_phys_port_name = virtnet_get_phys_port_name,
> >       .ndo_set_features       = virtnet_set_features,
>
diff mbox series

Patch

diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
index 4e25408a2b37..c8a317a93ef7 100644
--- a/drivers/net/virtio_net.c
+++ b/drivers/net/virtio_net.c
@@ -28,9 +28,11 @@  static int napi_weight = NAPI_POLL_WEIGHT;
 module_param(napi_weight, int, 0444);
 
 static bool csum = true, gso = true, napi_tx = true;
+static int xsk_budget = 32;
 module_param(csum, bool, 0444);
 module_param(gso, bool, 0444);
 module_param(napi_tx, bool, 0644);
+module_param(xsk_budget, int, 0644);
 
 /* FIXME: MTU in config. */
 #define GOOD_PACKET_LEN (ETH_HLEN + VLAN_HLEN + ETH_DATA_LEN)
@@ -47,6 +49,8 @@  module_param(napi_tx, bool, 0644);
 
 #define VIRTIO_XDP_FLAG	BIT(0)
 
+static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
+
 /* RX packet size EWMA. The average packet size is used to determine the packet
  * buffer size when refilling RX rings. As the entire RX ring may be refilled
  * at once, the weight is chosen so that the EWMA will be insensitive to short-
@@ -138,6 +142,9 @@  struct send_queue {
 	struct {
 		/* xsk pool */
 		struct xsk_buff_pool __rcu *pool;
+
+		/* save the desc for next xmit, when xmit fail. */
+		struct xdp_desc last_desc;
 	} xsk;
 };
 
@@ -2532,6 +2539,179 @@  static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
 	return err;
 }
 
+static void virtnet_xsk_check_space(struct send_queue *sq)
+{
+	struct virtnet_info *vi = sq->vq->vdev->priv;
+	struct net_device *dev = vi->dev;
+	int qnum = sq - vi->sq;
+
+	/* If this sq is not the exclusive queue of the current cpu,
+	 * then it may be called by start_xmit, so check it running out
+	 * of space.
+	 */
+	if (is_xdp_raw_buffer_queue(vi, qnum))
+		return;
+
+	/* Stop the queue to avoid getting packets that we are
+	 * then unable to transmit. Then wait the tx interrupt.
+	 */
+	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
+		netif_stop_subqueue(dev, qnum);
+}
+
+static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
+			    struct xdp_desc *desc)
+{
+	struct virtnet_info *vi;
+	struct page *page;
+	void *data;
+	u32 offset;
+	u64 addr;
+	int err;
+
+	vi = sq->vq->vdev->priv;
+	addr = desc->addr;
+	data = xsk_buff_raw_get_data(pool, addr);
+	offset = offset_in_page(data);
+
+	sg_init_table(sq->sg, 2);
+	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
+	page = xsk_buff_xdp_get_page(pool, addr);
+	sg_set_page(sq->sg + 1, page, desc->len, offset);
+
+	err = virtqueue_add_outbuf(sq->vq, sq->sg, 2, NULL, GFP_ATOMIC);
+	if (unlikely(err))
+		sq->xsk.last_desc = *desc;
+
+	return err;
+}
+
+static int virtnet_xsk_xmit_batch(struct send_queue *sq,
+				  struct xsk_buff_pool *pool,
+				  unsigned int budget,
+				  bool in_napi, int *done)
+{
+	struct xdp_desc desc;
+	int err, packet = 0;
+	int ret = -EAGAIN;
+
+	if (sq->xsk.last_desc.addr) {
+		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
+		if (unlikely(err))
+			return -EBUSY;
+
+		++packet;
+		--budget;
+		sq->xsk.last_desc.addr = 0;
+	}
+
+	while (budget-- > 0) {
+		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
+			ret = -EBUSY;
+			break;
+		}
+
+		if (!xsk_tx_peek_desc(pool, &desc)) {
+			/* done */
+			ret = 0;
+			break;
+		}
+
+		err = virtnet_xsk_xmit(sq, pool, &desc);
+		if (unlikely(err)) {
+			ret = -EBUSY;
+			break;
+		}
+
+		++packet;
+	}
+
+	if (packet) {
+		if (virtqueue_kick_prepare(sq->vq) &&
+		    virtqueue_notify(sq->vq)) {
+			u64_stats_update_begin(&sq->stats.syncp);
+			sq->stats.kicks += 1;
+			u64_stats_update_end(&sq->stats.syncp);
+		}
+
+		*done = packet;
+
+		xsk_tx_release(pool);
+	}
+
+	return ret;
+}
+
+static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
+			   int budget, bool in_napi)
+{
+	int done = 0;
+	int err;
+
+	free_old_xmit_skbs(sq, in_napi);
+
+	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done);
+	/* -EAGAIN: done == budget
+	 * -EBUSY: done < budget
+	 *  0    : done < budget
+	 */
+	if (err == -EBUSY) {
+		free_old_xmit_skbs(sq, in_napi);
+
+		/* If the space is enough, let napi run again. */
+		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
+			done = budget;
+	}
+
+	virtnet_xsk_check_space(sq);
+
+	return done;
+}
+
+static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
+{
+	struct virtnet_info *vi = netdev_priv(dev);
+	struct xsk_buff_pool *pool;
+	struct netdev_queue *txq;
+	struct send_queue *sq;
+
+	if (!netif_running(dev))
+		return -ENETDOWN;
+
+	if (qid >= vi->curr_queue_pairs)
+		return -EINVAL;
+
+	sq = &vi->sq[qid];
+
+	rcu_read_lock();
+
+	pool = rcu_dereference(sq->xsk.pool);
+	if (!pool)
+		goto end;
+
+	if (napi_if_scheduled_mark_missed(&sq->napi))
+		goto end;
+
+	txq = netdev_get_tx_queue(dev, qid);
+
+	__netif_tx_lock_bh(txq);
+
+	/* Send part of the packet directly to reduce the delay in sending the
+	 * packet, and this can actively trigger the tx interrupts.
+	 *
+	 * If no packet is sent out, the ring of the device is full. In this
+	 * case, we will still get a tx interrupt response. Then we will deal
+	 * with the subsequent packet sending work.
+	 */
+	virtnet_xsk_run(sq, pool, xsk_budget, false);
+
+	__netif_tx_unlock_bh(txq);
+
+end:
+	rcu_read_unlock();
+	return 0;
+}
+
 static int virtnet_xsk_pool_enable(struct net_device *dev,
 				   struct xsk_buff_pool *pool,
 				   u16 qid)
@@ -2553,6 +2733,8 @@  static int virtnet_xsk_pool_enable(struct net_device *dev,
 	if (rcu_dereference(sq->xsk.pool))
 		goto end;
 
+	memset(&sq->xsk, 0, sizeof(sq->xsk));
+
 	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
 	 * safe.
 	 */
@@ -2656,6 +2838,7 @@  static const struct net_device_ops virtnet_netdev = {
 	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
 	.ndo_bpf		= virtnet_xdp,
 	.ndo_xdp_xmit		= virtnet_xdp_xmit,
+	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
 	.ndo_features_check	= passthru_features_check,
 	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
 	.ndo_set_features	= virtnet_set_features,