diff mbox series

[liburing,2/2] Fix the use of memory barriers

Message ID 20190701214232.29338-3-bvanassche@acm.org (mailing list archive)
State New, archived
Headers show
Series Memory synchronization improvements | expand

Commit Message

Bart Van Assche July 1, 2019, 9:42 p.m. UTC
Introduce the smp_load_acquire() and smp_store_release() macros. Fix
synchronization in io_uring_cq_advance() and __io_uring_get_cqe().
Remove a superfluous local variable, if-test and write barrier from
__io_uring_submit(). Remove a superfluous barrier from
test/io_uring_enter.c.

Cc: Stefan Hajnoczi <stefanha@redhat.com>
Cc: Roman Penyaev <rpenyaev@suse.de>
Signed-off-by: Bart Van Assche <bvanassche@acm.org>
---
 man/io_uring_setup.2  |  6 ++-
 src/barrier.h         | 87 +++++++++++++++++++++++++++++++++++++++++--
 src/liburing.h        | 15 +++-----
 src/queue.c           | 30 ++++-----------
 test/io_uring_enter.c |  8 ++--
 5 files changed, 107 insertions(+), 39 deletions(-)

Comments

Roman Penyaev July 2, 2019, 9:07 a.m. UTC | #1
Hi Bart,

On 2019-07-01 23:42, Bart Van Assche wrote:

...

> +#if defined(__x86_64__)
> +#define smp_store_release(p, v)			\
> +do {						\
> +	barrier();				\
> +	WRITE_ONCE(*(p), (v));			\
> +} while (0)
> +
> +#define smp_load_acquire(p)			\
> +({						\
> +	typeof(*p) ___p1 = READ_ONCE(*(p));	\
> +	barrier();				\
> +	___p1;					\
> +})

Can we have these two macros for x86_32 as well?
For i386 it will take another branch with full mb,
which is not needed.

Besides that both patches looks good to me.

--
Roman
Bart Van Assche July 2, 2019, 4:17 p.m. UTC | #2
On 7/2/19 2:07 AM, Roman Penyaev wrote:
> Hi Bart,
> 
> On 2019-07-01 23:42, Bart Van Assche wrote:
> 
> ...
> 
>> +#if defined(__x86_64__)
>> +#define smp_store_release(p, v)            \
>> +do {                        \
>> +    barrier();                \
>> +    WRITE_ONCE(*(p), (v));            \
>> +} while (0)
>> +
>> +#define smp_load_acquire(p)            \
>> +({                        \
>> +    typeof(*p) ___p1 = READ_ONCE(*(p));    \
>> +    barrier();                \
>> +    ___p1;                    \
>> +})
> 
> Can we have these two macros for x86_32 as well?
> For i386 it will take another branch with full mb,
> which is not needed.
> 
> Besides that both patches looks good to me.

Hi Roman,

Thanks for having taken a look. From Linux kernel source file 
arch/x86/include/asm/barrier.h:

#ifdef CONFIG_X86_32
#define mb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
		"mfence", X86_FEATURE_XMM2) ::: "memory", "cc")
#define rmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
		"lfence", X86_FEATURE_XMM2) ::: "memory", "cc")
#define wmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
		"sfence", X86_FEATURE_XMM2) ::: "memory", "cc")
#else
#define mb() 	asm volatile("mfence":::"memory")
#define rmb()	asm volatile("lfence":::"memory")
#define wmb()	asm volatile("sfence" ::: "memory")
#endif

In other words, I think that 32-bit and 64-bit systems really have to be 
treated in a different way.

Bart.
Roman Penyaev July 2, 2019, 6:40 p.m. UTC | #3
On 2019-07-02 18:17, Bart Van Assche wrote:
> On 7/2/19 2:07 AM, Roman Penyaev wrote:
>> Hi Bart,
>> 
>> On 2019-07-01 23:42, Bart Van Assche wrote:
>> 
>> ...
>> 
>>> +#if defined(__x86_64__)
>>> +#define smp_store_release(p, v)            \
>>> +do {                        \
>>> +    barrier();                \
>>> +    WRITE_ONCE(*(p), (v));            \
>>> +} while (0)
>>> +
>>> +#define smp_load_acquire(p)            \
>>> +({                        \
>>> +    typeof(*p) ___p1 = READ_ONCE(*(p));    \
>>> +    barrier();                \
>>> +    ___p1;                    \
>>> +})
>> 
>> Can we have these two macros for x86_32 as well?
>> For i386 it will take another branch with full mb,
>> which is not needed.
>> 
>> Besides that both patches looks good to me.
> 
> Hi Roman,
> 
> Thanks for having taken a look. From Linux kernel source file
> arch/x86/include/asm/barrier.h:
> 
> #ifdef CONFIG_X86_32
> #define mb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
> 		"mfence", X86_FEATURE_XMM2) ::: "memory", "cc")
> #define rmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
> 		"lfence", X86_FEATURE_XMM2) ::: "memory", "cc")
> #define wmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
> 		"sfence", X86_FEATURE_XMM2) ::: "memory", "cc")
> #else
> #define mb() 	asm volatile("mfence":::"memory")
> #define rmb()	asm volatile("lfence":::"memory")
> #define wmb()	asm volatile("sfence" ::: "memory")
> #endif
> 
> In other words, I think that 32-bit and 64-bit systems really have to
> be treated in a different way.

I meant smp_load_acquire / smp_store_release

--
Roman
Bart Van Assche July 2, 2019, 8:31 p.m. UTC | #4
On 7/2/19 11:40 AM, Roman Penyaev wrote:
> On 2019-07-02 18:17, Bart Van Assche wrote:
>> On 7/2/19 2:07 AM, Roman Penyaev wrote:
>>> Hi Bart,
>>>
>>> On 2019-07-01 23:42, Bart Van Assche wrote:
>>>
>>> ...
>>>
>>>> +#if defined(__x86_64__)
>>>> +#define smp_store_release(p, v)            \
>>>> +do {                        \
>>>> +    barrier();                \
>>>> +    WRITE_ONCE(*(p), (v));            \
>>>> +} while (0)
>>>> +
>>>> +#define smp_load_acquire(p)            \
>>>> +({                        \
>>>> +    typeof(*p) ___p1 = READ_ONCE(*(p));    \
>>>> +    barrier();                \
>>>> +    ___p1;                    \
>>>> +})
>>>
>>> Can we have these two macros for x86_32 as well?
>>> For i386 it will take another branch with full mb,
>>> which is not needed.
>>>
>>> Besides that both patches looks good to me.
>>
>> Hi Roman,
>>
>> Thanks for having taken a look. From Linux kernel source file
>> arch/x86/include/asm/barrier.h:
>>
>> #ifdef CONFIG_X86_32
>> #define mb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>         "mfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>> #define rmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>         "lfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>> #define wmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>         "sfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>> #else
>> #define mb()     asm volatile("mfence":::"memory")
>> #define rmb()    asm volatile("lfence":::"memory")
>> #define wmb()    asm volatile("sfence" ::: "memory")
>> #endif
>>
>> In other words, I think that 32-bit and 64-bit systems really have to
>> be treated in a different way.
> 
> I meant smp_load_acquire / smp_store_release

Hi Roman,

Since there are 32-bit x86 CPUs that can reorder loads and stores I'm 
not sure it is safe to use the 64-bit smp_load_acquire() / 
smp_store_release() implementations for 32-bit CPUs. In case you would 
not yet have encountered this paper, a thorough discussion of the x86 
memory model is available in Sewell, Peter, Susmit Sarkar, Scott Owens, 
Francesco Zappa Nardelli, and Magnus O. Myreen. "x86-TSO: a rigorous and 
usable programmer's model for x86 multiprocessors." Communications of 
the ACM 53, no. 7 (2010): 89-97
(https://dl.acm.org/citation.cfm?id=1785443 / 
http://www.spinroot.com/spin/Doc/course/x86_tso.pdf).

Bart.
Roman Penyaev July 3, 2019, 9:49 a.m. UTC | #5
On 2019-07-02 22:31, Bart Van Assche wrote:
> On 7/2/19 11:40 AM, Roman Penyaev wrote:
>> On 2019-07-02 18:17, Bart Van Assche wrote:
>>> On 7/2/19 2:07 AM, Roman Penyaev wrote:
>>>> Hi Bart,
>>>> 
>>>> On 2019-07-01 23:42, Bart Van Assche wrote:
>>>> 
>>>> ...
>>>> 
>>>>> +#if defined(__x86_64__)
>>>>> +#define smp_store_release(p, v)            \
>>>>> +do {                        \
>>>>> +    barrier();                \
>>>>> +    WRITE_ONCE(*(p), (v));            \
>>>>> +} while (0)
>>>>> +
>>>>> +#define smp_load_acquire(p)            \
>>>>> +({                        \
>>>>> +    typeof(*p) ___p1 = READ_ONCE(*(p));    \
>>>>> +    barrier();                \
>>>>> +    ___p1;                    \
>>>>> +})
>>>> 
>>>> Can we have these two macros for x86_32 as well?
>>>> For i386 it will take another branch with full mb,
>>>> which is not needed.
>>>> 
>>>> Besides that both patches looks good to me.
>>> 
>>> Hi Roman,
>>> 
>>> Thanks for having taken a look. From Linux kernel source file
>>> arch/x86/include/asm/barrier.h:
>>> 
>>> #ifdef CONFIG_X86_32
>>> #define mb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>>         "mfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>>> #define rmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>>         "lfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>>> #define wmb() asm volatile(ALTERNATIVE("lock; addl $0,-4(%%esp)",\
>>>         "sfence", X86_FEATURE_XMM2) ::: "memory", "cc")
>>> #else
>>> #define mb()     asm volatile("mfence":::"memory")
>>> #define rmb()    asm volatile("lfence":::"memory")
>>> #define wmb()    asm volatile("sfence" ::: "memory")
>>> #endif
>>> 
>>> In other words, I think that 32-bit and 64-bit systems really have to
>>> be treated in a different way.
>> 
>> I meant smp_load_acquire / smp_store_release
> 
> Hi Roman,
> 

Hi Bart,

It seems you took the chunk of the code with macros from
tools/arch/x86/include/asm/barrier.h, where indeed smp_store_release
and smp_load_acquire are defined only for x86-64.  If I am not mistaken
kernel defines both __smp_store_release / __smp_load_acquire equally
for x86 memory model in arch/x86/include/asm/barrier.h.

> Since there are 32-bit x86 CPUs that can reorder loads and stores

Only "loads may be reordered with earlier stores"
(Intel® 64 and IA-32 Architectures, section 8.2.3.4), but "stores are 
not
reordered with earlier loads" (section 8.2.3.3), so smp_store_release 
and
and smp_load_acquire can be relaxed.

--
Roman
Bart Van Assche July 3, 2019, 8:49 p.m. UTC | #6
On 7/3/19 2:49 AM, Roman Penyaev wrote:
> It seems you took the chunk of the code with macros from
> tools/arch/x86/include/asm/barrier.h, where indeed smp_store_release
> and smp_load_acquire are defined only for x86-64.  If I am not mistaken
> kernel defines both __smp_store_release / __smp_load_acquire equally
> for x86 memory model in arch/x86/include/asm/barrier.h.

Agreed.

>> Since there are 32-bit x86 CPUs that can reorder loads and stores
> 
> Only "loads may be reordered with earlier stores"
> (Intel® 64 and IA-32 Architectures, section 8.2.3.4), but "stores are not
> reordered with earlier loads" (section 8.2.3.3), so smp_store_release and
> and smp_load_acquire can be relaxed.

Certain IA-32 CPUs do not follow that model. From Linux kernel v4.15:

config X86_PPRO_FENCE
    bool "PentiumPro memory ordering errata workaround"
    depends on M686 || M586MMX || M586TSC || M586 || M486 || MGEODEGX1
    ---help---
    Old PentiumPro multiprocessor systems had errata that could cause
    memory operations to violate the x86 ordering standard in rare cases.
    Enabling this option will attempt to work around some (but not all)
    occurrences of this problem, at the cost of much heavier spinlock and
    memory barrier operations.

    If unsure, say n here. Even distro kernels should think twice before
    enabling this: there are few systems, and an unlikely bug.

Since support for these CPUs has been removed from the Linux kernel it's 
probably fine not to worry about these CPUs in liburing either. See also 
commit 5927145efd5d ("x86/cpu: Remove the CONFIG_X86_PPRO_FENCE=y 
quirk") # v4.16.

Bart.
diff mbox series

Patch

diff --git a/man/io_uring_setup.2 b/man/io_uring_setup.2
index ebaee2d43f35..9ab01434c18d 100644
--- a/man/io_uring_setup.2
+++ b/man/io_uring_setup.2
@@ -97,7 +97,11 @@  call with the following code sequence:
 
 .in +4n
 .EX
-read_barrier();
+/*
+ * Ensure that the wakeup flag is read after the tail pointer has been
+ * written.
+ */
+smp_mb();
 if (*sq_ring->flags & IORING_SQ_NEED_WAKEUP)
     io_uring_enter(fd, 0, 0, IORING_ENTER_SQ_WAKEUP);
 .EE
diff --git a/src/barrier.h b/src/barrier.h
index ef00f6722ba9..e1a407fccde2 100644
--- a/src/barrier.h
+++ b/src/barrier.h
@@ -1,16 +1,95 @@ 
 #ifndef LIBURING_BARRIER_H
 #define LIBURING_BARRIER_H
 
+/*
+From the kernel documentation file refcount-vs-atomic.rst:
+
+A RELEASE memory ordering guarantees that all prior loads and
+stores (all po-earlier instructions) on the same CPU are completed
+before the operation. It also guarantees that all po-earlier
+stores on the same CPU and all propagated stores from other CPUs
+must propagate to all other CPUs before the release operation
+(A-cumulative property). This is implemented using
+:c:func:`smp_store_release`.
+
+An ACQUIRE memory ordering guarantees that all post loads and
+stores (all po-later instructions) on the same CPU are
+completed after the acquire operation. It also guarantees that all
+po-later stores on the same CPU must propagate to all other CPUs
+after the acquire operation executes. This is implemented using
+:c:func:`smp_acquire__after_ctrl_dep`.
+*/
+
+/* From tools/include/linux/compiler.h */
+/* Optimization barrier */
+/* The "volatile" is due to gcc bugs */
+#define barrier() __asm__ __volatile__("": : :"memory")
+
+/* From tools/virtio/linux/compiler.h */
+#define WRITE_ONCE(var, val) \
+	(*((volatile typeof(val) *)(&(var))) = (val))
+#define READ_ONCE(var) (*((volatile typeof(var) *)(&(var))))
+
+
 #if defined(__x86_64) || defined(__i386__)
-#define read_barrier()	__asm__ __volatile__("":::"memory")
-#define write_barrier()	__asm__ __volatile__("":::"memory")
+/* From tools/arch/x86/include/asm/barrier.h */
+#if defined(__i386__)
+/*
+ * Some non-Intel clones support out of order store. wmb() ceases to be a
+ * nop for these.
+ */
+#define mb()	asm volatile("lock; addl $0,0(%%esp)" ::: "memory")
+#define rmb()	asm volatile("lock; addl $0,0(%%esp)" ::: "memory")
+#define wmb()	asm volatile("lock; addl $0,0(%%esp)" ::: "memory")
+#elif defined(__x86_64__)
+#define mb()	asm volatile("mfence" ::: "memory")
+#define rmb()	asm volatile("lfence" ::: "memory")
+#define wmb()	asm volatile("sfence" ::: "memory")
+#define smp_rmb() barrier()
+#define smp_wmb() barrier()
+#define smp_mb()  asm volatile("lock; addl $0,-132(%%rsp)" ::: "memory", "cc")
+#endif
+
+#if defined(__x86_64__)
+#define smp_store_release(p, v)			\
+do {						\
+	barrier();				\
+	WRITE_ONCE(*(p), (v));			\
+} while (0)
+
+#define smp_load_acquire(p)			\
+({						\
+	typeof(*p) ___p1 = READ_ONCE(*(p));	\
+	barrier();				\
+	___p1;					\
+})
+#endif /* defined(__x86_64__) */
 #else
 /*
  * Add arch appropriate definitions. Be safe and use full barriers for
  * archs we don't have support for.
  */
-#define read_barrier()	__sync_synchronize()
-#define write_barrier()	__sync_synchronize()
+#define smp_rmb()	__sync_synchronize()
+#define smp_wmb()	__sync_synchronize()
+#endif
+
+/* From tools/include/asm/barrier.h */
+
+#ifndef smp_store_release
+# define smp_store_release(p, v)		\
+do {						\
+	smp_mb();				\
+	WRITE_ONCE(*p, v);			\
+} while (0)
+#endif
+
+#ifndef smp_load_acquire
+# define smp_load_acquire(p)			\
+({						\
+	typeof(*p) ___p1 = READ_ONCE(*p);	\
+	smp_mb();				\
+	___p1;					\
+})
 #endif
 
 #endif
diff --git a/src/liburing.h b/src/liburing.h
index d3fcd1524540..a350a013ef8a 100644
--- a/src/liburing.h
+++ b/src/liburing.h
@@ -88,11 +88,10 @@  extern int io_uring_register_eventfd(struct io_uring *ring, int fd);
 extern int io_uring_unregister_eventfd(struct io_uring *ring);
 
 #define io_uring_for_each_cqe(ring, head, cqe)					\
+	/* smp_load_acquire() enforces the order of tail and CQE reads. */	\
 	for (head = *(ring)->cq.khead;						\
-	     /* See read_barrier() explanation in __io_uring_get_cqe() */	\
-	     ({read_barrier();							\
-	       cqe = (head != *(ring)->cq.ktail ?				\
-		&(ring)->cq.cqes[head & (*(ring)->cq.kring_mask)] : NULL);});	\
+	     (cqe = (head != smp_load_acquire((ring)->cq.ktail) ?		\
+		&(ring)->cq.cqes[head & (*(ring)->cq.kring_mask)] : NULL));	\
 	     head++)								\
 
 
@@ -105,13 +104,11 @@  static inline void io_uring_cq_advance(struct io_uring *ring,
 	if (nr) {
 		struct io_uring_cq *cq = &ring->cq;
 
-		(*cq->khead) += nr;
-
 		/*
-		 * Ensure that the kernel sees our new head, the kernel has
-		 * the matching read barrier.
+		 * Ensure that the kernel only sees the new value of the head
+		 * index after the CQEs have been read.
 		 */
-		write_barrier();
+		smp_store_release(cq->khead, *cq->khead + nr);
 	}
 }
 
diff --git a/src/queue.c b/src/queue.c
index bec363fc0ebf..72b22935c2ef 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -77,7 +77,7 @@  static int __io_uring_submit(struct io_uring *ring, unsigned wait_nr)
 {
 	struct io_uring_sq *sq = &ring->sq;
 	const unsigned mask = *sq->kring_mask;
-	unsigned ktail, ktail_next, submitted, to_submit;
+	unsigned ktail, submitted, to_submit;
 	unsigned flags;
 	int ret;
 
@@ -88,15 +88,11 @@  static int __io_uring_submit(struct io_uring *ring, unsigned wait_nr)
 	 * Fill in sqes that we have queued up, adding them to the kernel ring
 	 */
 	submitted = 0;
-	ktail = ktail_next = *sq->ktail;
+	ktail = *sq->ktail;
 	to_submit = sq->sqe_tail - sq->sqe_head;
 	while (to_submit--) {
-		ktail_next++;
-		read_barrier();
-
 		sq->array[ktail & mask] = sq->sqe_head & mask;
-		ktail = ktail_next;
-
+		ktail++;
 		sq->sqe_head++;
 		submitted++;
 	}
@@ -104,21 +100,11 @@  static int __io_uring_submit(struct io_uring *ring, unsigned wait_nr)
 	if (!submitted)
 		return 0;
 
-	if (*sq->ktail != ktail) {
-		/*
-		 * First write barrier ensures that the SQE stores are updated
-		 * with the tail update. This is needed so that the kernel
-		 * will never see a tail update without the preceeding sQE
-		 * stores being done.
-		 */
-		write_barrier();
-		*sq->ktail = ktail;
-		/*
-		 * The kernel has the matching read barrier for reading the
-		 * SQ tail.
-		 */
-		write_barrier();
-	}
+	/*
+	 * Ensure that the kernel sees the SQE updates before it sees the tail
+	 * update.
+	 */
+	smp_store_release(sq->ktail, ktail);
 
 	flags = 0;
 	if (wait_nr || sq_ring_needs_enter(ring, &flags)) {
diff --git a/test/io_uring_enter.c b/test/io_uring_enter.c
index d6e407e621ff..b25afd5790f3 100644
--- a/test/io_uring_enter.c
+++ b/test/io_uring_enter.c
@@ -262,9 +262,11 @@  main(int argc, char **argv)
 	ktail = *sq->ktail;
 	sq->array[ktail & mask] = index;
 	++ktail;
-	write_barrier();
-	*sq->ktail = ktail;
-	write_barrier();
+	/*
+	 * Ensure that the kernel sees the SQE update before it sees the tail
+	 * update.
+	 */
+	smp_store_release(sq->ktail, ktail);
 
 	ret = io_uring_enter(ring.ring_fd, 1, 0, 0, NULL);
 	/* now check to see if our sqe was dropped */