diff mbox series

[v4,bpf-next,06/22] xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path

Message ID 20230615172606.349557-7-maciej.fijalkowski@intel.com (mailing list archive)
State Changes Requested
Delegated to: BPF
Headers show
Series xsk: multi-buffer support | expand

Checks

Context Check Description
netdev/series_format fail Series longer than 15 patches (and no cover letter)
netdev/tree_selection success Clearly marked for bpf-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 36 this patch: 36
netdev/cc_maintainers warning 7 maintainers not CCed: kuba@kernel.org hawk@kernel.org john.fastabend@gmail.com davem@davemloft.net jonathan.lemon@gmail.com pabeni@redhat.com edumazet@google.com
netdev/build_clang success Errors and warnings before: 8 this patch: 8
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 36 this patch: 36
netdev/checkpatch warning WARNING: line length of 98 exceeds 80 columns WARNING: line length of 99 exceeds 80 columns
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-6 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-2 success Logs for build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-4 success Logs for build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-5 success Logs for build for x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-3 success Logs for build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-7 success Logs for test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-19 success Logs for test_progs_no_alu32_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-25 success Logs for test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-10 success Logs for test_maps on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-11 success Logs for test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-13 success Logs for test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-14 success Logs for test_progs on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-15 success Logs for test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-17 success Logs for test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-18 success Logs for test_progs_no_alu32 on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-20 success Logs for test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-21 success Logs for test_progs_no_alu32_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-22 success Logs for test_progs_parallel on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-23 success Logs for test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-24 success Logs for test_progs_parallel on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-27 success Logs for test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-28 success Logs for test_verifier on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-29 success Logs for veristat
bpf/vmtest-bpf-next-VM_Test-12 success Logs for test_progs on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-16 success Logs for test_progs_no_alu32 on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-26 fail Logs for test_verifier on s390x with gcc
bpf/vmtest-bpf-next-PR fail PR summary
bpf/vmtest-bpf-next-VM_Test-8 success Logs for test_maps on s390x with gcc

Commit Message

Fijalkowski, Maciej June 15, 2023, 5:25 p.m. UTC
From: Tirthendu Sarkar <tirthendu.sarkar@intel.com>

In Tx path, xsk core reserves space for each desc to be transmitted in
the completion queue and it's address contained in it is stored in the
skb destructor arg. After successful transmission the skb destructor
submits the addr marking completion.

To handle multiple descriptors per packet, now along with reserving
space for each descriptor, the corresponding address is also stored in
completion queue. The number of pending descriptors are stored in skb
destructor arg and is used by the skb destructor to update completions.

Introduce 'skb' in xdp_sock to store a partially built packet when
__xsk_generic_xmit() must return before it sees the EOP descriptor for
the current packet so that packet building can resume in next call of
__xsk_generic_xmit().

Helper functions are introduced to set and get the pending descriptors
in the skb destructor arg. Also, wrappers are introduced for storing
descriptor addresses, submitting and cancelling (for unsuccessful
transmissions) the number of completions.

Signed-off-by: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
---
 include/net/xdp_sock.h |  6 ++++
 net/xdp/xsk.c          | 74 ++++++++++++++++++++++++++++++------------
 net/xdp/xsk_queue.h    | 19 ++++-------
 3 files changed, 67 insertions(+), 32 deletions(-)

Comments

Toke Høiland-Jørgensen June 20, 2023, 5:25 p.m. UTC | #1
Maciej Fijalkowski <maciej.fijalkowski@intel.com> writes:

> From: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
>
> In Tx path, xsk core reserves space for each desc to be transmitted in
> the completion queue and it's address contained in it is stored in the
> skb destructor arg. After successful transmission the skb destructor
> submits the addr marking completion.
>
> To handle multiple descriptors per packet, now along with reserving
> space for each descriptor, the corresponding address is also stored in
> completion queue. The number of pending descriptors are stored in skb
> destructor arg and is used by the skb destructor to update completions.
>
> Introduce 'skb' in xdp_sock to store a partially built packet when
> __xsk_generic_xmit() must return before it sees the EOP descriptor for
> the current packet so that packet building can resume in next call of
> __xsk_generic_xmit().
>
> Helper functions are introduced to set and get the pending descriptors
> in the skb destructor arg. Also, wrappers are introduced for storing
> descriptor addresses, submitting and cancelling (for unsuccessful
> transmissions) the number of completions.
>
> Signed-off-by: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
> ---
>  include/net/xdp_sock.h |  6 ++++
>  net/xdp/xsk.c          | 74 ++++++++++++++++++++++++++++++------------
>  net/xdp/xsk_queue.h    | 19 ++++-------
>  3 files changed, 67 insertions(+), 32 deletions(-)
>
> diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
> index 36b0411a0d1b..1617af380162 100644
> --- a/include/net/xdp_sock.h
> +++ b/include/net/xdp_sock.h
> @@ -68,6 +68,12 @@ struct xdp_sock {
>  	u64 rx_dropped;
>  	u64 rx_queue_full;
>  
> +	/* When __xsk_generic_xmit() must return before it sees the EOP descriptor for the current
> +	 * packet, the partially built skb is saved here so that packet building can resume in next
> +	 * call of __xsk_generic_xmit().
> +	 */
> +	struct sk_buff *skb;

What ensures this doesn't leak? IIUC, when the loop in
__xsk_generic_xmit() gets to the end of a batch, userspace will get an
EAGAIN error and be expected to retry the call later, right? But if
userspace never retries, could the socket be torn down with this pointer
still populated? I looked for something that would prevent this in
subsequent patches, but couldn't find it; am I missing something?

-Toke
Tirthendu Sarkar June 21, 2023, 8:15 a.m. UTC | #2
> -----Original Message-----
> From: Toke Høiland-Jørgensen <toke@redhat.com>
> Sent: Tuesday, June 20, 2023 10:56 PM
>>
> Subject: Re: [PATCH v4 bpf-next 06/22] xsk: introduce wrappers and helpers
> for supporting multi-buffer in Tx path
> 
> Maciej Fijalkowski <maciej.fijalkowski@intel.com> writes:
> 
> > From: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
> >
> > In Tx path, xsk core reserves space for each desc to be transmitted in
> > the completion queue and it's address contained in it is stored in the
> > skb destructor arg. After successful transmission the skb destructor
> > submits the addr marking completion.
> >
> > To handle multiple descriptors per packet, now along with reserving
> > space for each descriptor, the corresponding address is also stored in
> > completion queue. The number of pending descriptors are stored in skb
> > destructor arg and is used by the skb destructor to update completions.
> >
> > Introduce 'skb' in xdp_sock to store a partially built packet when
> > __xsk_generic_xmit() must return before it sees the EOP descriptor for
> > the current packet so that packet building can resume in next call of
> > __xsk_generic_xmit().
> >
> > Helper functions are introduced to set and get the pending descriptors
> > in the skb destructor arg. Also, wrappers are introduced for storing
> > descriptor addresses, submitting and cancelling (for unsuccessful
> > transmissions) the number of completions.
> >
> > Signed-off-by: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
> > ---
> >  include/net/xdp_sock.h |  6 ++++
> >  net/xdp/xsk.c          | 74 ++++++++++++++++++++++++++++++------------
> >  net/xdp/xsk_queue.h    | 19 ++++-------
> >  3 files changed, 67 insertions(+), 32 deletions(-)
> >
> > diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
> > index 36b0411a0d1b..1617af380162 100644
> > --- a/include/net/xdp_sock.h
> > +++ b/include/net/xdp_sock.h
> > @@ -68,6 +68,12 @@ struct xdp_sock {
> >  	u64 rx_dropped;
> >  	u64 rx_queue_full;
> >
> > +	/* When __xsk_generic_xmit() must return before it sees the EOP
> descriptor for the current
> > +	 * packet, the partially built skb is saved here so that packet building
> can resume in next
> > +	 * call of __xsk_generic_xmit().
> > +	 */
> > +	struct sk_buff *skb;
> 
> What ensures this doesn't leak? IIUC, when the loop in
> __xsk_generic_xmit() gets to the end of a batch, userspace will get an
> EAGAIN error and be expected to retry the call later, right? But if
> userspace never retries, could the socket be torn down with this pointer
> still populated? I looked for something that would prevent this in
> subsequent patches, but couldn't find it; am I missing something?
> 
> -Toke
> 

Thanks for catching this. We will add cleanup during socket termination in v5.
Toke Høiland-Jørgensen June 21, 2023, 1:27 p.m. UTC | #3
"Sarkar, Tirthendu" <tirthendu.sarkar@intel.com> writes:

>> -----Original Message-----
>> From: Toke Høiland-Jørgensen <toke@redhat.com>
>> Sent: Tuesday, June 20, 2023 10:56 PM
>>>
>> Subject: Re: [PATCH v4 bpf-next 06/22] xsk: introduce wrappers and helpers
>> for supporting multi-buffer in Tx path
>> 
>> Maciej Fijalkowski <maciej.fijalkowski@intel.com> writes:
>> 
>> > From: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
>> >
>> > In Tx path, xsk core reserves space for each desc to be transmitted in
>> > the completion queue and it's address contained in it is stored in the
>> > skb destructor arg. After successful transmission the skb destructor
>> > submits the addr marking completion.
>> >
>> > To handle multiple descriptors per packet, now along with reserving
>> > space for each descriptor, the corresponding address is also stored in
>> > completion queue. The number of pending descriptors are stored in skb
>> > destructor arg and is used by the skb destructor to update completions.
>> >
>> > Introduce 'skb' in xdp_sock to store a partially built packet when
>> > __xsk_generic_xmit() must return before it sees the EOP descriptor for
>> > the current packet so that packet building can resume in next call of
>> > __xsk_generic_xmit().
>> >
>> > Helper functions are introduced to set and get the pending descriptors
>> > in the skb destructor arg. Also, wrappers are introduced for storing
>> > descriptor addresses, submitting and cancelling (for unsuccessful
>> > transmissions) the number of completions.
>> >
>> > Signed-off-by: Tirthendu Sarkar <tirthendu.sarkar@intel.com>
>> > ---
>> >  include/net/xdp_sock.h |  6 ++++
>> >  net/xdp/xsk.c          | 74 ++++++++++++++++++++++++++++++------------
>> >  net/xdp/xsk_queue.h    | 19 ++++-------
>> >  3 files changed, 67 insertions(+), 32 deletions(-)
>> >
>> > diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
>> > index 36b0411a0d1b..1617af380162 100644
>> > --- a/include/net/xdp_sock.h
>> > +++ b/include/net/xdp_sock.h
>> > @@ -68,6 +68,12 @@ struct xdp_sock {
>> >  	u64 rx_dropped;
>> >  	u64 rx_queue_full;
>> >
>> > +	/* When __xsk_generic_xmit() must return before it sees the EOP
>> descriptor for the current
>> > +	 * packet, the partially built skb is saved here so that packet building
>> can resume in next
>> > +	 * call of __xsk_generic_xmit().
>> > +	 */
>> > +	struct sk_buff *skb;
>> 
>> What ensures this doesn't leak? IIUC, when the loop in
>> __xsk_generic_xmit() gets to the end of a batch, userspace will get an
>> EAGAIN error and be expected to retry the call later, right? But if
>> userspace never retries, could the socket be torn down with this pointer
>> still populated? I looked for something that would prevent this in
>> subsequent patches, but couldn't find it; am I missing something?
>> 
>> -Toke
>> 
>
> Thanks for catching this. We will add cleanup during socket termination in v5.

Awesome! :)

-Toke
diff mbox series

Patch

diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index 36b0411a0d1b..1617af380162 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -68,6 +68,12 @@  struct xdp_sock {
 	u64 rx_dropped;
 	u64 rx_queue_full;
 
+	/* When __xsk_generic_xmit() must return before it sees the EOP descriptor for the current
+	 * packet, the partially built skb is saved here so that packet building can resume in next
+	 * call of __xsk_generic_xmit().
+	 */
+	struct sk_buff *skb;
+
 	struct list_head map_list;
 	/* Protects map_list */
 	spinlock_t map_list_lock;
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 86d8b23ae0a7..29bda8452e2c 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -480,19 +480,65 @@  static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
 	return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
 }
 
-static void xsk_destruct_skb(struct sk_buff *skb)
+static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr)
+{
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&xs->pool->cq_lock, flags);
+	ret = xskq_prod_reserve_addr(xs->pool->cq, addr);
+	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+
+	return ret;
+}
+
+static void xsk_cq_submit_locked(struct xdp_sock *xs, u32 n)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&xs->pool->cq_lock, flags);
+	xskq_prod_submit_n(xs->pool->cq, n);
+	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+}
+
+static void xsk_cq_cancel_locked(struct xdp_sock *xs, u32 n)
 {
-	u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg;
-	struct xdp_sock *xs = xdp_sk(skb->sk);
 	unsigned long flags;
 
 	spin_lock_irqsave(&xs->pool->cq_lock, flags);
-	xskq_prod_submit_addr(xs->pool->cq, addr);
+	xskq_prod_cancel_n(xs->pool->cq, n);
 	spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+}
+
+static u32 xsk_get_num_desc(struct sk_buff *skb)
+{
+	return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
+}
 
+static void xsk_destruct_skb(struct sk_buff *skb)
+{
+	xsk_cq_submit_locked(xdp_sk(skb->sk), xsk_get_num_desc(skb));
 	sock_wfree(skb);
 }
 
+static void xsk_set_destructor_arg(struct sk_buff *skb)
+{
+	long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
+
+	skb_shinfo(skb)->destructor_arg = (void *)num;
+}
+
+static void xsk_consume_skb(struct sk_buff *skb)
+{
+	struct xdp_sock *xs = xdp_sk(skb->sk);
+
+	skb->destructor = sock_wfree;
+	xsk_cq_cancel_locked(xs, xsk_get_num_desc(skb));
+	/* Free skb without triggering the perf drop trace */
+	consume_skb(skb);
+	xs->skb = NULL;
+}
+
 static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
 					      struct xdp_desc *desc)
 {
@@ -578,8 +624,8 @@  static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
 	skb->dev = dev;
 	skb->priority = xs->sk.sk_priority;
 	skb->mark = xs->sk.sk_mark;
-	skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
 	skb->destructor = xsk_destruct_skb;
+	xsk_set_destructor_arg(skb);
 
 	return skb;
 }
@@ -591,7 +637,6 @@  static int __xsk_generic_xmit(struct sock *sk)
 	bool sent_frame = false;
 	struct xdp_desc desc;
 	struct sk_buff *skb;
-	unsigned long flags;
 	int err = 0;
 
 	mutex_lock(&xs->mutex);
@@ -616,31 +661,20 @@  static int __xsk_generic_xmit(struct sock *sk)
 		 * if there is space in it. This avoids having to implement
 		 * any buffering in the Tx path.
 		 */
-		spin_lock_irqsave(&xs->pool->cq_lock, flags);
-		if (xskq_prod_reserve(xs->pool->cq)) {
-			spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+		if (xsk_cq_reserve_addr_locked(xs, desc.addr))
 			goto out;
-		}
-		spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
 
 		skb = xsk_build_skb(xs, &desc);
 		if (IS_ERR(skb)) {
 			err = PTR_ERR(skb);
-			spin_lock_irqsave(&xs->pool->cq_lock, flags);
-			xskq_prod_cancel(xs->pool->cq);
-			spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+			xsk_cq_cancel_locked(xs, 1);
 			goto out;
 		}
 
 		err = __dev_direct_xmit(skb, xs->queue_id);
 		if  (err == NETDEV_TX_BUSY) {
 			/* Tell user-space to retry the send */
-			skb->destructor = sock_wfree;
-			spin_lock_irqsave(&xs->pool->cq_lock, flags);
-			xskq_prod_cancel(xs->pool->cq);
-			spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
-			/* Free skb without triggering the perf drop trace */
-			consume_skb(skb);
+			xsk_consume_skb(skb);
 			err = -EAGAIN;
 			goto out;
 		}
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index ad81b19e6fdf..4190f43ce0b0 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -297,6 +297,11 @@  static inline void xskq_cons_release(struct xsk_queue *q)
 	q->cached_cons++;
 }
 
+static inline void xskq_cons_cancel_n(struct xsk_queue *q, u32 cnt)
+{
+	q->cached_cons -= cnt;
+}
+
 static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
 {
 	/* No barriers needed since data is not accessed */
@@ -324,9 +329,9 @@  static inline bool xskq_prod_is_full(struct xsk_queue *q)
 	return xskq_prod_nb_free(q, 1) ? false : true;
 }
 
-static inline void xskq_prod_cancel(struct xsk_queue *q)
+static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt)
 {
-	q->cached_prod--;
+	q->cached_prod -= cnt;
 }
 
 static inline int xskq_prod_reserve(struct xsk_queue *q)
@@ -392,16 +397,6 @@  static inline void xskq_prod_submit(struct xsk_queue *q)
 	__xskq_prod_submit(q, q->cached_prod);
 }
 
-static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
-{
-	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-	u32 idx = q->ring->producer;
-
-	ring->desc[idx++ & q->ring_mask] = addr;
-
-	__xskq_prod_submit(q, idx);
-}
-
 static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
 {
 	__xskq_prod_submit(q, q->ring->producer + nb_entries);