diff mbox series

[bpf-next,v2,1/2] xsk: update rings for load-acquire/store-release barriers

Message ID 20210305094113.413544-2-bjorn.topel@gmail.com (mailing list archive)
State Accepted
Commit 057e8fb782c1177059cf49413dc770e0f5ea5ed4
Delegated to: BPF
Headers show
Series load-acquire/store-release barriers for AF_XDP rings | expand

Checks

Context Check Description
netdev/cover_letter success Link
netdev/fixes_present success Link
netdev/patch_count success Link
netdev/tree_selection success Clearly marked for bpf-next
netdev/subject_prefix success Link
netdev/cc_maintainers warning 9 maintainers not CCed: kuba@kernel.org hawk@kernel.org yhs@fb.com john.fastabend@gmail.com bjorn@kernel.org kpsingh@kernel.org songliubraving@fb.com kafai@fb.com davem@davemloft.net
netdev/source_inline success Was 0 now: 0
netdev/verify_signedoff success Link
netdev/module_param success Was 0 now: 0
netdev/build_32bit success Errors and warnings before: 1 this patch: 1
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/verify_fixes success Link
netdev/checkpatch warning WARNING: line length of 82 exceeds 80 columns
netdev/build_allmodconfig_warn success Errors and warnings before: 1 this patch: 1
netdev/header_inline success Link
netdev/stable success Stable not CCed

Commit Message

Björn Töpel March 5, 2021, 9:41 a.m. UTC
From: Björn Töpel <bjorn.topel@intel.com>

Currently, the AF_XDP rings uses general smp_{r,w,}mb() barriers on
the kernel-side. On most modern architectures
load-acquire/store-release barriers perform better, and results in
simpler code for circular ring buffers.

This change updates the XDP socket rings to use
load-acquire/store-release barriers.

It is important to note that changing from the old smp_{r,w,}mb()
barriers, to load-acquire/store-release barriers does not break
compatibility. The old semantics work with the new one, and vice
versa.

As pointed out by "Documentation/memory-barriers.txt" in the "SMP
BARRIER PAIRING" section:

  "General barriers pair with each other, though they also pair with
  most other types of barriers, albeit without multicopy atomicity.
  An acquire barrier pairs with a release barrier, but both may also
  pair with other barriers, including of course general barriers."

How different barriers behaves and pairs is outlined in
"tools/memory-model/Documentation/cheatsheet.txt".

In order to make sure that compatibility is not broken, LKMM herd7
based litmus tests can be constructed and verified.

We generalize the XDP socket ring to a one entry ring, and create two
scenarios; One where the ring is full, where only the consumer can
proceed, followed by the producer. One where the ring is empty, where
only the producer can proceed, followed by the consumer. Each scenario
is then expanded to four different tests: general producer/general
consumer, general producer/acqrel consumer, acqrel producer/general
consumer, acqrel producer/acqrel consumer. In total eight tests.

The empty ring test:
  C spsc-rb+empty

  // Simple one entry ring:
  // prod cons     allowed action       prod cons
  //    0    0 =>       prod          =>   1    0
  //    0    1 =>       cons          =>   0    0
  //    1    0 =>       cons          =>   1    1
  //    1    1 =>       prod          =>   0    1

  {}

  // We start at prod==0, cons==0, data==0, i.e. nothing has been
  // written to the ring. From here only the producer can start, and
  // should write 1. Afterwards, consumer can continue and read 1 to
  // data. Can we enter state prod==1, cons==1, but consumer observed
  // the incorrect value of 0?

  P0(int *prod, int *cons, int *data)
  {
     ... producer
  }

  P1(int *prod, int *cons, int *data)
  {
     ... consumer
  }

  exists( 1:d=0 /\ prod=1 /\ cons=1 );

The full ring test:
  C spsc-rb+full

  // Simple one entry ring:
  // prod cons     allowed action       prod cons
  //    0    0 =>       prod          =>   1    0
  //    0    1 =>       cons          =>   0    0
  //    1    0 =>       cons          =>   1    1
  //    1    1 =>       prod          =>   0    1

  { prod = 1; }

  // We start at prod==1, cons==0, data==1, i.e. producer has
  // written 0, so from here only the consumer can start, and should
  // consume 0. Afterwards, producer can continue and write 1 to
  // data. Can we enter state prod==0, cons==1, but consumer observed
  // the write of 1?

  P0(int *prod, int *cons, int *data)
  {
    ... producer
  }

  P1(int *prod, int *cons, int *data)
  {
    ... consumer
  }

  exists( 1:d=1 /\ prod=0 /\ cons=1 );

where P0 and P1 are:

  P0(int *prod, int *cons, int *data)
  {
  	int p;

  	p = READ_ONCE(*prod);
  	if (READ_ONCE(*cons) == p) {
  		WRITE_ONCE(*data, 1);
  		smp_wmb();
  		WRITE_ONCE(*prod, p ^ 1);
  	}
  }

  P0(int *prod, int *cons, int *data)
  {
  	int p;

  	p = READ_ONCE(*prod);
  	if (READ_ONCE(*cons) == p) {
  		WRITE_ONCE(*data, 1);
  		smp_store_release(prod, p ^ 1);
  	}
  }

  P1(int *prod, int *cons, int *data)
  {
  	int c;
  	int d = -1;

  	c = READ_ONCE(*cons);
  	if (READ_ONCE(*prod) != c) {
  		smp_rmb();
  		d = READ_ONCE(*data);
  		smp_mb();
  		WRITE_ONCE(*cons, c ^ 1);
  	}
  }

  P1(int *prod, int *cons, int *data)
  {
  	int c;
  	int d = -1;

  	c = READ_ONCE(*cons);
  	if (smp_load_acquire(prod) != c) {
  		d = READ_ONCE(*data);
  		smp_store_release(cons, c ^ 1);
  	}
  }

The full LKMM litmus tests are found at [1].

On x86-64 systems the l2fwd AF_XDP xdpsock sample performance
increases by 1%. This is mostly due to that the smp_mb() is removed,
which is a relatively expensive operation on these
platforms. Weakly-ordered platforms, such as ARM64 might benefit even
more.

[1] https://github.com/bjoto/litmus-xsk

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
---
 net/xdp/xsk_queue.h | 30 +++++++++++++-----------------
 1 file changed, 13 insertions(+), 17 deletions(-)
diff mbox series

Patch

diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 2823b7c3302d..2ac3802c2cd7 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -47,19 +47,18 @@  struct xsk_queue {
 	u64 queue_empty_descs;
 };
 
-/* The structure of the shared state of the rings are the same as the
- * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
- * ring, the kernel is the producer and user space is the consumer. For
- * the Tx and fill rings, the kernel is the consumer and user space is
- * the producer.
+/* The structure of the shared state of the rings are a simple
+ * circular buffer, as outlined in
+ * Documentation/core-api/circular-buffers.rst. For the Rx and
+ * completion ring, the kernel is the producer and user space is the
+ * consumer. For the Tx and fill rings, the kernel is the consumer and
+ * user space is the producer.
  *
  * producer                         consumer
  *
- * if (LOAD ->consumer) {           LOAD ->producer
- *                    (A)           smp_rmb()       (C)
+ * if (LOAD ->consumer) {  (A)      LOAD.acq ->producer  (C)
  *    STORE $data                   LOAD $data
- *    smp_wmb()       (B)           smp_mb()        (D)
- *    STORE ->producer              STORE ->consumer
+ *    STORE.rel ->producer (B)      STORE.rel ->consumer (D)
  * }
  *
  * (A) pairs with (D), and (B) pairs with (C).
@@ -78,7 +77,8 @@  struct xsk_queue {
  *
  * (A) is a control dependency that separates the load of ->consumer
  * from the stores of $data. In case ->consumer indicates there is no
- * room in the buffer to store $data we do not. So no barrier is needed.
+ * room in the buffer to store $data we do not. The dependency will
+ * order both of the stores after the loads. So no barrier is needed.
  *
  * (D) protects the load of the data to be observed to happen after the
  * store of the consumer pointer. If we did not have this memory
@@ -227,15 +227,13 @@  static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
 
 static inline void __xskq_cons_release(struct xsk_queue *q)
 {
-	smp_mb(); /* D, matches A */
-	WRITE_ONCE(q->ring->consumer, q->cached_cons);
+	smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
 }
 
 static inline void __xskq_cons_peek(struct xsk_queue *q)
 {
 	/* Refresh the local pointer */
-	q->cached_prod = READ_ONCE(q->ring->producer);
-	smp_rmb(); /* C, matches B */
+	q->cached_prod = smp_load_acquire(&q->ring->producer);  /* C, matches B */
 }
 
 static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +395,7 @@  static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
 
 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
 {
-	smp_wmb(); /* B, matches C */
-
-	WRITE_ONCE(q->ring->producer, idx);
+	smp_store_release(&q->ring->producer, idx); /* B, matches C */
 }
 
 static inline void xskq_prod_submit(struct xsk_queue *q)