@@ -47,19 +47,18 @@ struct xsk_queue {
u64 queue_empty_descs;
};
-/* The structure of the shared state of the rings are the same as the
- * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
- * ring, the kernel is the producer and user space is the consumer. For
- * the Tx and fill rings, the kernel is the consumer and user space is
- * the producer.
+/* The structure of the shared state of the rings are a simple
+ * circular buffer, as outlined in
+ * Documentation/core-api/circular-buffers.rst. For the Rx and
+ * completion ring, the kernel is the producer and user space is the
+ * consumer. For the Tx and fill rings, the kernel is the consumer and
+ * user space is the producer.
*
* producer consumer
*
- * if (LOAD ->consumer) { LOAD ->producer
- * (A) smp_rmb() (C)
+ * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C)
* STORE $data LOAD $data
- * smp_wmb() (B) smp_mb() (D)
- * STORE ->producer STORE ->consumer
+ * STORE.rel ->producer (B) STORE.rel ->consumer (D)
* }
*
* (A) pairs with (D), and (B) pairs with (C).
@@ -78,7 +77,8 @@ struct xsk_queue {
*
* (A) is a control dependency that separates the load of ->consumer
* from the stores of $data. In case ->consumer indicates there is no
- * room in the buffer to store $data we do not. So no barrier is needed.
+ * room in the buffer to store $data we do not. The dependency will
+ * order both of the stores after the loads. So no barrier is needed.
*
* (D) protects the load of the data to be observed to happen after the
* store of the consumer pointer. If we did not have this memory
@@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
static inline void __xskq_cons_release(struct xsk_queue *q)
{
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cached_cons);
+ smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
}
static inline void __xskq_cons_peek(struct xsk_queue *q)
{
/* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- smp_rmb(); /* C, matches B */
+ q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */
}
static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- smp_wmb(); /* B, matches C */
-
- WRITE_ONCE(q->ring->producer, idx);
+ smp_store_release(&q->ring->producer, idx); /* B, matches C */
}
static inline void xskq_prod_submit(struct xsk_queue *q)