[RFC] pipe: Convert ring to head/tail
diff mbox series

Message ID 25289.1568379639@warthog.procyon.org.uk
State New
Headers show
Series
  • [RFC] pipe: Convert ring to head/tail
Related show

Commit Message

David Howells Sept. 13, 2019, 1 p.m. UTC
Here's my first patch to the pipe ring, with an eye to piggybacking
notifications on it.  Other than whatever booting does, I haven't managed to
explicitly test splice and iov_iter-to-pipe yet.  Other than that, lmbench
seems to show about a 1% improvement on pipe performance and my own
just-shovel-data-through-for-X-amount-of-time test shows about .5%
improvement.  The difference in seeming improvements is possibly because my
test is using 511-byte writes and lmbench is using 512-byte writes.

David
---
pipe: Use head and tail pointers for the ring, not cursor and length

Convert pipes to use head and tail pointers for the buffer ring rather than
pointer and length as the latter requires two atomic ops to update (or a
combined op) whereas the former only requires one.

 (1) The head pointer is the point at which production occurs and points to
     the slot in which the next buffer will be placed.  This is equivalent
     to pipe->curbuf + pipe->nrbufs.

     The head pointer belongs to the write-side.

 (2) The tail pointer is the point at which consumption occurs.  It points
     to the next slot to be consumed.  This is equivalent to pipe->curbuf.

     The tail pointer belongs to the read-side.

 (3) head and tail are allowed to run to UINT_MAX and wrap naturally.  They
     are only masked off when the array is being accessed, e.g.:

        pipe->bufs[head & mask]

     This means that it is not necessary to have a dead slot in the ring as
     head == tail isn't ambiguous.

 (4) The ring is empty if head == tail.

 (5) The occupancy of the ring is head - tail.

 (6) The amount of space in the ring is (tail + pipe->buffers) - head.

 (7) The ring is full if head == (tail + pipe->buffers) or
     head - tail == pipe->buffers.

Barriers are also inserted, wrapped in inline functions:

 (1) unsigned int tail = pipe_get_tail_for_write(pipe);

     Read the tail pointer from the write-side.  This acts as a barrier to
     order the tail read before the data in the ring is overwritten.  It
     also prevents the compiler from re-reading the pointer.

 (2) unsigned int head = pipe_get_head_for_read(pipe);

     Read the head pointer from the read-side.  This acts as a barrier to
     order the head read before the data read.  It also prevents the
     compiler from re-reading the pointer.

 (3) pipe_post_read_barrier(pipe, unsigned int tail);

     Update the tail pointer from the read-side.  This acts as a barrier to
     order the pointer update after the data read.  The consumed data slot
     must not be touched after this function.

 (4) pipe_post_write_barrier(pipe, unsigned int head);

     Update the head pointer from the write-side.  This acts as a barrier
     to order the pointer update after the data write.  The produced data
     slot should not normally be touched after this function[*].

     [*] Unless pipe->mutex is held.
---
 fs/fuse/dev.c             |   23 ++-
 fs/pipe.c                 |  154 ++++++++++++++++----------
 fs/splice.c               |  161 +++++++++++++++++----------
 include/linux/pipe_fs_i.h |   76 ++++++++++++-
 include/linux/uio.h       |    4 
 lib/iov_iter.c            |  268 ++++++++++++++++++++++++++--------------------
 6 files changed, 438 insertions(+), 248 deletions(-)

Comments

David Howells Sept. 13, 2019, 1:06 p.m. UTC | #1
/*
 * Benchmark a pipe by seeing how many 511-byte writes can be stuffed through
 * it in a certain amount of time.  Compile with -lm.
 */
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <math.h>
#include <sys/wait.h>
#include <sys/mman.h>

static char buf1[4096] __attribute__((aligned(4096)));
static char buf2[4096] __attribute__((aligned(4096)));
static unsigned long long *results;

static volatile int stop;
void sigalrm(int sig)
{
	stop = 1;
}

static __attribute__((noreturn))
void producer(int fd, int i)
{
	unsigned long long l = 0;
	ssize_t n;

	signal(SIGALRM, sigalrm);
	alarm(1);

	while (!stop) {
		n = write(fd, buf1, 511);
		if (n == -1) {
			perror("read");
			exit(1);
		}

		l += n;
	}

	results[i] = l;
	exit(0);
}

static __attribute__((noreturn))
void consumer(int fd)
{
	unsigned long long l = 0;
	ssize_t n;

	for (;;) {
		n = read(fd, buf2, 4096);
		if (n == 0)
			break;
		if (n == -1) {
			perror("read");
			exit(1);
		}

		l += n;
	}

	exit(0);
}

unsigned long long stddev(const unsigned long long *vals, int nvals)
{
	double sum = 0.0, mean, sd = 0.0;
	int i;

	for (i = 0; i < nvals; i++)
		sum += vals[i];

	mean = sum / nvals;
	for (i = 0; i < nvals; i++)
		sd += pow(vals[i] - mean, 2);
	return sqrt(sd / 10);
}

int main()
{
	unsigned long long t = 0;
	int ex = 0, i;

	results = mmap(NULL, 4096, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
	if (results == MAP_FAILED) {
		perror("mmap");
		exit(1);
	}

	for (i = 0; i < 16 && ex == 0; i++) {
		pid_t prod, con;
		int pfd[2], wt;

		if (pipe2(pfd, 0) < 0) {
			perror("pipe");
			exit(1);
		}

		con = fork();
		switch (con) {
		case -1:
			perror("fork/c");
			exit(1);
		case 0:
			close(pfd[1]);
			consumer(pfd[0]);
		default:
			break;
		}

		prod = fork();
		switch (prod) {
		case -1:
			perror("fork/p");
			exit(1);
		case 0:
			close(pfd[0]);
			producer(pfd[1], i);
		default:
			break;
		}

		close(pfd[0]);
		close(pfd[1]);

		for (;;) {
			errno = 0;
			wait(&wt);
			if (errno == ECHILD)
				break;
			if (!WIFEXITED(wt) || WEXITSTATUS(wt) != 0)
				ex = 1;
		}

		printf("WR[%02u]: %12llu\n", i, results[i]);
		t += results[i];
	}

	printf("total : %12llu\n", t);
	printf("avg   : %12llu\n", t / i);
	printf("stddev: %12llu\n", stddev(results, i));
	exit(ex);
}
Will Deacon Sept. 15, 2019, 2:59 p.m. UTC | #2
Hi David, [+Peter]

I have a few drive-by comments on the ordering side of things. See below.

On Fri, Sep 13, 2019 at 02:00:39PM +0100, David Howells wrote:
> Convert pipes to use head and tail pointers for the buffer ring rather than
> pointer and length as the latter requires two atomic ops to update (or a
> combined op) whereas the former only requires one.
> 
>  (1) The head pointer is the point at which production occurs and points to
>      the slot in which the next buffer will be placed.  This is equivalent
>      to pipe->curbuf + pipe->nrbufs.
> 
>      The head pointer belongs to the write-side.
> 
>  (2) The tail pointer is the point at which consumption occurs.  It points
>      to the next slot to be consumed.  This is equivalent to pipe->curbuf.
> 
>      The tail pointer belongs to the read-side.
> 
>  (3) head and tail are allowed to run to UINT_MAX and wrap naturally.  They
>      are only masked off when the array is being accessed, e.g.:
> 
>         pipe->bufs[head & mask]
> 
>      This means that it is not necessary to have a dead slot in the ring as
>      head == tail isn't ambiguous.
> 
>  (4) The ring is empty if head == tail.
> 
>  (5) The occupancy of the ring is head - tail.
> 
>  (6) The amount of space in the ring is (tail + pipe->buffers) - head.
> 
>  (7) The ring is full if head == (tail + pipe->buffers) or
>      head - tail == pipe->buffers.
> 
> Barriers are also inserted, wrapped in inline functions:
> 
>  (1) unsigned int tail = pipe_get_tail_for_write(pipe);
> 
>      Read the tail pointer from the write-side.  This acts as a barrier to
>      order the tail read before the data in the ring is overwritten.  It
>      also prevents the compiler from re-reading the pointer.
> 
>  (2) unsigned int head = pipe_get_head_for_read(pipe);
> 
>      Read the head pointer from the read-side.  This acts as a barrier to
>      order the head read before the data read.  It also prevents the
>      compiler from re-reading the pointer.
> 
>  (3) pipe_post_read_barrier(pipe, unsigned int tail);
> 
>      Update the tail pointer from the read-side.  This acts as a barrier to
>      order the pointer update after the data read.  The consumed data slot
>      must not be touched after this function.
> 
>  (4) pipe_post_write_barrier(pipe, unsigned int head);
> 
>      Update the head pointer from the write-side.  This acts as a barrier
>      to order the pointer update after the data write.  The produced data
>      slot should not normally be touched after this function[*].
> 
>      [*] Unless pipe->mutex is held.
> ---
>  fs/fuse/dev.c             |   23 ++-
>  fs/pipe.c                 |  154 ++++++++++++++++----------
>  fs/splice.c               |  161 +++++++++++++++++----------
>  include/linux/pipe_fs_i.h |   76 ++++++++++++-
>  include/linux/uio.h       |    4 
>  lib/iov_iter.c            |  268 ++++++++++++++++++++++++++--------------------
>  6 files changed, 438 insertions(+), 248 deletions(-)

[...]

> diff --git a/fs/pipe.c b/fs/pipe.c
> index 8a2ab2f974bd..aa410ee0f423 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -43,10 +43,10 @@ unsigned long pipe_user_pages_hard;
>  unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
>  
>  /*
> - * We use a start+len construction, which provides full use of the 
> + * We use a start+len construction, which provides full use of the
>   * allocated memory.
>   * -- Florian Coosmann (FGC)
> - * 
> + *
>   * Reads with count = 0 should always return 0.
>   * -- Julian Bradfield 1999-06-07.
>   *
> @@ -285,10 +285,15 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>  	ret = 0;
>  	__pipe_lock(pipe);
>  	for (;;) {
> -		int bufs = pipe->nrbufs;
> -		if (bufs) {
> -			int curbuf = pipe->curbuf;
> -			struct pipe_buffer *buf = pipe->bufs + curbuf;
> +		/* Barrier: head belongs to the write side, so order reading
> +		 * the data after reading the head pointer.
> +		 */
> +		unsigned int head = READ_ONCE(pipe->head);

Hmm, I don't understand this. Since READ_ONCE() doesn't imply a barrier,
how are you enforcing the read-read ordering in the CPU?

> @@ -104,6 +104,76 @@ struct pipe_buf_operations {
>  	bool (*get)(struct pipe_inode_info *, struct pipe_buffer *);
>  };
>  
> +/**
> + * pipe_get_tail_for_write - Get pipe buffer tail pointer for write-side use
> + * @pipe: The pipe in question
> + *
> + * Get the tail pointer for use in the write-side code.  This may need to
> + * insert a barrier against the reader to order reading the tail pointer
> + * against the reader reading the buffer.

What is the purpose of saying "This may need to insert a barrier"? Can this
function be overridden or something?

> + */
> +static inline unsigned int pipe_get_tail_for_write(struct pipe_inode_info *pipe)
> +{
> +	return READ_ONCE(pipe->tail);
> +}
> +
> +/**
> + * pipe_post_read_barrier - Set pipe buffer tail pointer in the read-side
> + * @pipe: The pipe in question
> + * @tail: The new tail pointer
> + *
> + * Update the tail pointer in the read-side code.  This inserts a barrier
> + * against the writer such that the data write is ordered before the tail
> + * pointer update.
> + */
> +static inline void pipe_post_read_barrier(struct pipe_inode_info *pipe,
> +					  unsigned int tail)
> +{
> +	smp_store_release(&pipe->tail, tail);
> +}
> +
> +/**
> + * pipe_get_head_for_read - Get pipe buffer head pointer for read-side use
> + * @pipe: The pipe in question
> + *
> + * Get the head pointer for use in the read-side code.  This inserts a barrier
> + * against the reader such that the head pointer is read before the data it
> + * points to.
> + */
> +static inline unsigned int pipe_get_head_for_read(struct pipe_inode_info *pipe)
> +{
> +	return READ_ONCE(pipe->head);
> +}

Saying that "This inserts a barrier" feels misleading, because READ_ONCE()
doesn't do that.

Will
David Howells Sept. 17, 2019, 1:51 p.m. UTC | #3
Will Deacon <will@kernel.org> wrote:

> > +		/* Barrier: head belongs to the write side, so order reading
> > +		 * the data after reading the head pointer.
> > +		 */
> > +		unsigned int head = READ_ONCE(pipe->head);
> 
> Hmm, I don't understand this. Since READ_ONCE() doesn't imply a barrier,
> how are you enforcing the read-read ordering in the CPU?

It does imply a barrier: smp_read_barrier_depends().  I believe that's

> What is the purpose of saying "This may need to insert a barrier"? Can this
> function be overridden or something?

I mean it's arch-dependent whether READ_ONCE() inserts a barrier or not.

> Saying that "This inserts a barrier" feels misleading, because READ_ONCE()
> doesn't do that.

Yes it does - on the Alpha:

[arch/alpha/include/asm/barrier.h]
#define read_barrier_depends() __asm__ __volatile__("mb": : :"memory")

[include/asm-generic/barrier.h]
#ifndef __smp_read_barrier_depends
#define __smp_read_barrier_depends()	read_barrier_depends()
#endif
...
#ifndef smp_read_barrier_depends
#define smp_read_barrier_depends()	__smp_read_barrier_depends()
#endif

[include/linux/compiler.h]
#define __READ_ONCE(x, check)						\
({									\
	union { typeof(x) __val; char __c[1]; } __u;			\
	if (check)							\
		__read_once_size(&(x), __u.__c, sizeof(x));		\
	else								\
		__read_once_size_nocheck(&(x), __u.__c, sizeof(x));	\
	smp_read_barrier_depends(); /* Enforce dependency ordering from x */ \
	__u.__val;							\
})
#define READ_ONCE(x) __READ_ONCE(x, 1)

See:

    commit 76ebbe78f7390aee075a7f3768af197ded1bdfbb
    Author: Will Deacon <will.deacon@arm.com>
    Date:   Tue Oct 24 11:22:47 2017 +0100
    locking/barriers: Add implicit smp_read_barrier_depends() to READ_ONCE()

David
Will Deacon Sept. 17, 2019, 5:07 p.m. UTC | #4
Hi David,

On Tue, Sep 17, 2019 at 02:51:35PM +0100, David Howells wrote:
> Will Deacon <will@kernel.org> wrote:
> 
> > > +		/* Barrier: head belongs to the write side, so order reading
> > > +		 * the data after reading the head pointer.
> > > +		 */
> > > +		unsigned int head = READ_ONCE(pipe->head);
> > 
> > Hmm, I don't understand this. Since READ_ONCE() doesn't imply a barrier,
> > how are you enforcing the read-read ordering in the CPU?
> 
> It does imply a barrier: smp_read_barrier_depends().  I believe that's

I fed your incomplete sentence to https://talktotransformer.com/ :

  It does imply a barrier: smp_read_barrier_depends(). I believe that's
  correct. (I'm not a coder so I assume it just means it's a dependency. Maybe
  this works for other languages too.)

but I have a feeling that's not what you meant. I guess AI isn't quite
ready to rule the world.

> > What is the purpose of saying "This may need to insert a barrier"? Can this
> > function be overridden or something?
> 
> I mean it's arch-dependent whether READ_ONCE() inserts a barrier or not.

Ok, but why would the caller care?

> > Saying that "This inserts a barrier" feels misleading, because READ_ONCE()
> > doesn't do that.
> 
> Yes it does - on the Alpha:
> 
> [arch/alpha/include/asm/barrier.h]
> #define read_barrier_depends() __asm__ __volatile__("mb": : :"memory")
> 
> [include/asm-generic/barrier.h]
> #ifndef __smp_read_barrier_depends
> #define __smp_read_barrier_depends()	read_barrier_depends()
> #endif
> ...
> #ifndef smp_read_barrier_depends
> #define smp_read_barrier_depends()	__smp_read_barrier_depends()
> #endif
> 
> [include/linux/compiler.h]
> #define __READ_ONCE(x, check)						\
> ({									\
> 	union { typeof(x) __val; char __c[1]; } __u;			\
> 	if (check)							\
> 		__read_once_size(&(x), __u.__c, sizeof(x));		\
> 	else								\
> 		__read_once_size_nocheck(&(x), __u.__c, sizeof(x));	\
> 	smp_read_barrier_depends(); /* Enforce dependency ordering from x */ \
> 	__u.__val;							\
> })
> #define READ_ONCE(x) __READ_ONCE(x, 1)
> 
> See:
> 
>     commit 76ebbe78f7390aee075a7f3768af197ded1bdfbb
>     Author: Will Deacon <will.deacon@arm.com>
>     Date:   Tue Oct 24 11:22:47 2017 +0100
>     locking/barriers: Add implicit smp_read_barrier_depends() to READ_ONCE()

Ah, that guy. I tried emailing him but he didn't reply.

Seriously though, READ_ONCE() implies a barrier on Alpha, but its portable
barrier semantics are only that it can be used to head an address
dependency, a bit like rcu_dereference(). You shouldn't be relying on the
stronger ordering provided by Alpha, and I doubt that you really are.

If I'm understanding your code correctly (big 'if'), then you have things
like this in pipe_read():


	unsigned int head = READ_ONCE(pipe->head);
	unsigned int tail = pipe->tail;
	unsigned int mask = pipe->buffers - 1;

	if (tail != head) {
		struct pipe_buffer *buf = &pipe->bufs[tail & mask];

		[...]

		written = copy_page_to_iter(buf->page, buf->offset, chars, to);


where you want to make sure you don't read from 'buf->page' until after
you've read the updated head index. Is that right? If so, then READ_ONCE()
will not give you that guarantee on architectures such as Power and Arm,
because the 'if (tail != head)' branch can be speculated and the buffer
can be read before we've got around to looking at the head index.

So I reckon you need smp_load_acquire() in this case. pipe_write() might be ok
with the control dependency because CPUs don't tend to make speculative writes
visible, but I didn't check it carefully and the compiler can do crazy stuff in
this area, so I'd be inclined to use smp_load_acquire() here too unless you
really need the last ounce of performance.

Will
David Howells Sept. 18, 2019, 3:43 p.m. UTC | #5
Will Deacon <will@kernel.org> wrote:

> If I'm understanding your code correctly (big 'if'), then you have things
> like this in pipe_read():
> 
> 
> 	unsigned int head = READ_ONCE(pipe->head);
> 	unsigned int tail = pipe->tail;
> 	unsigned int mask = pipe->buffers - 1;
> 
> 	if (tail != head) {
> 		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
> 
> 		[...]
> 
> 		written = copy_page_to_iter(buf->page, buf->offset, chars, to);
> 
> 
> where you want to make sure you don't read from 'buf->page' until after
> you've read the updated head index. Is that right? If so, then READ_ONCE()
> will not give you that guarantee on architectures such as Power and Arm,
> because the 'if (tail != head)' branch can be speculated and the buffer
> can be read before we've got around to looking at the head index.
> 
> So I reckon you need smp_load_acquire() in this case. pipe_write() might be
> ok with the control dependency because CPUs don't tend to make speculative
> writes visible, but I didn't check it carefully and the compiler can do
> crazy stuff in this area, so I'd be inclined to use smp_load_acquire() here
> too unless you really need the last ounce of performance.

Yeah, I probably do.

Documentation/core-api/circular-buffers.rst might be wrong, then, I think.

It mandates using smp_store_release() to update buffer->head in the producer
and buffer->tail in the consumer - but these need pairing with memory barriers
used when reading buffer->head and buffer->tail on the other side.  Currently,
for the producer we have:

	spin_lock(&producer_lock);

	unsigned long head = buffer->head;
	/* The spin_unlock() and next spin_lock() provide needed ordering. */
	unsigned long tail = READ_ONCE(buffer->tail);

	if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
		/* insert one item into the buffer */
		struct item *item = buffer[head];

		produce_item(item);

		smp_store_release(buffer->head,
				  (head + 1) & (buffer->size - 1));

		/* wake_up() will make sure that the head is committed before
		 * waking anyone up */
		wake_up(consumer);
	}

	spin_unlock(&producer_lock);

I think the ordering comment about spin_unlock and spin_lock is wrong.  There's
no requirement to have a spinlock on either side - and in any case, both sides
could be inside their respective locked sections when accessing the buffer.
The READ_ONCE() would theoretically provide the smp_read_barrier_depends() to
pair with the smp_store_release() in the consumer.  Maybe I should change this
to:

	spin_lock(&producer_lock);

	/* Barrier paired with consumer-side store-release on tail */
	unsigned long tail = smp_load_acquire(buffer->tail);
	unsigned long head = buffer->head;

	if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
		/* insert one item into the buffer */
		struct item *item = buffer[head];

		produce_item(item);

		smp_store_release(buffer->head,
				  (head + 1) & (buffer->size - 1));

		/* wake_up() will make sure that the head is committed before
		 * waking anyone up */
		wake_up(consumer);
	}

	spin_unlock(&producer_lock);

The consumer is currently:

	spin_lock(&consumer_lock);

	/* Read index before reading contents at that index. */
	unsigned long head = smp_load_acquire(buffer->head);
	unsigned long tail = buffer->tail;

	if (CIRC_CNT(head, tail, buffer->size) >= 1) {

		/* extract one item from the buffer */
		struct item *item = buffer[tail];

		consume_item(item);

		/* Finish reading descriptor before incrementing tail. */
		smp_store_release(buffer->tail,
				  (tail + 1) & (buffer->size - 1));
	}

	spin_unlock(&consumer_lock);

which I think is okay.

And yes, I note that this does use smp_load_acquire(buffer->head) in the
consumer - which I should also be doing.

David
Linus Torvalds Sept. 18, 2019, 4:48 p.m. UTC | #6
On Wed, Sep 18, 2019 at 8:43 AM David Howells <dhowells@redhat.com> wrote:
>
> It mandates using smp_store_release() to update buffer->head in the producer
> and buffer->tail in the consumer - but these need pairing with memory barriers
> used when reading buffer->head and buffer->tail on the other side.

No, the rule with smp_store_release() should be that it's paired with
"smp_load_acquire()". No other barriers needed.

If you do that

   thread #1            thread #2

   ... add data to queue ..
   smp_store_release(x)

                        smp_load_acquire(x)
                        ... read data from queue ..

then you should need no other barriers.

But yes, store_release(x) should always pair with the load_acquire(x),
and the guarantee is that if the load_acquire reads the value that the
store_release stored, then all subsequent reads in thread #2 will see
all preceding writes in thread #1.

That's the optimal locking for a simple queue with a reader and a
writer and no other locking needed between the two.

HOWEVER.

I think this is all entirely pointless wrt the pipe buffer use. You
don't have a simple queue. You have multiple writers, and you have
multiple readers. As a result, you need real locking.

So don't do the barriers. If you do the barriers, you're almost
certainly doing something buggy. You don't have the simple "consumer
-> producer" thing. Or rather, you don't _only_ have that thing.

A reader "produces" a new tail as far as the writer is concerned (so
the reader that has consumed an entry does a smp_store_release(tail)
-> smp_load_acquire(tail) on the writer side when the writer looks for
a new entry it can fill).

BUT! A writer also needs to make sure it's the *only* writer for that
new entry, and similarly a reader that is about to consume an entry
needs to make sure it's the only reader of that entry. So it is *not*
that kind of simple hand-off situation at all.

End result: use a lock. Seriously. Anything else is going to be buggy,
or going to be incredibly subtle.

Don't do those barriers. They are wrong. Barriers simply do not
protect against two concurrent readers, or two concurrent writers.
Barriers are useless.

Now, it's possible that barriers together with very clever atomics
could do a lockless model, but then all the cleverness is going to be
in the lockless part of the code, not the barriers. So even if we
eventually do a lockless model, it's completely wrong to do the
barriers first.

And no, lockless isn't as simple as "readers can do a
atomic_add_return_relaxed() to consume an entry, and writers can do a
atomic_cmpxchg() to produce one".

I think the only possible lockless model is that readers have some
exclusion against other readers (so now you have only one reader at a
time), and then you fundamentally know that a reader can update the
tail pointer without worrying about any races (because a reader will
never race against another reader, and a writer will only ever _read_
the tail pointer). So then - once you have reader-reader exclusion,
the _tail_ update is simple:

    smp_store_release(old_tail+1, &tail);

because while there are multiple writers, the reader doesn't care (the
above, btw, assumes that head/tail themselves aren't doing the "&
bufsize" thing, and that that is done at use time - that also makes it
easy to see the difference between empty and full).

But this still requires that lock around all readers. And we have
that: the pipe mutex.

But writers have the same issue, and we don't want to use the pipe
mutex for writer exclusion, since we want to allow atomic writers.

So writers do have to have a lock, or they need to do something clever
with a reservation system that uses cmpxchg. Honestly, just do the
lock.

The way this should be done:

 1) do the head/tail conversion using the EXISTING locking. We have
the pipe_mutex, and it serializes everybody right now, and it makes
any barriers or atomics pointless. Just covert to a sane and simple
head/tail model.

    NO BARRIERS!

*after* you've done this, do the next phase:

 2) convert the writers to use an irq-safe spinlock, and either make
the readers look at it too, or do the above smp_store_release() for
the final tail update (and the writers need to use a
smp_load_acquire() to look at the tail, despite their lock).

    NO BARRIERS (well, outside the tail thing above).

and *after* you've done #2, you can now do writes from atomic context.

At that point you are done. Consider youself happy, but also allow for
the possibility that some day you'll see lock contention with lots of
writers, and that *maybe* some day (unlikely) you'd want to look at
doing

 3) see if you can make the writers do some clever lockless reservation model.

But that #3 really should be considered a "we are pretty sure it's
possible in theory, but it's questinably useful, and hopefully we'll
never even need to consider it".

                Linus
David Howells Sept. 19, 2019, 1:59 p.m. UTC | #7
Linus Torvalds <torvalds@linux-foundation.org> wrote:

> > It mandates using smp_store_release() to update buffer->head in the producer
> > and buffer->tail in the consumer - but these need pairing with memory barriers
> > used when reading buffer->head and buffer->tail on the other side.
>
> No, the rule with smp_store_release() should be that it's paired with
> "smp_load_acquire()".

Yes.

> No other barriers needed.

See below.

> If you do that
>
>    thread #1            thread #2
>
>    ... add data to queue ..
>    smp_store_release(x)
>
>                         smp_load_acquire(x)
>                         ... read data from queue ..
> ...
> But yes, store_release(x) should always pair with the load_acquire(x),
> and the guarantee is that if the load_acquire reads the value that the
> store_release stored, then all subsequent reads in thread #2 will see
> all preceding writes in thread #1.

I agree with this bit, but it's only half the picture.

> then you should need no other barriers.

But I don't agree with this.  You're missing half the barriers.  There should
be *four* barriers.  The document mandates only 3 barriers, and uses
READ_ONCE() where the fourth should be, i.e.:

   thread #1            thread #2

                        smp_load_acquire(head)
                        ... read data from queue ..
                        smp_store_release(tail)

   READ_ONCE(tail)
   ... add data to queue ..
   smp_store_release(head)

but I think that READ_ONCE() should instead be smp_load_acquire() so that the
read of the data is ordered before it gets overwritten by the writer, ordering
with respect to accesses on tail (it now implies smp_read_barrier_depends()
which I think is sufficient).

Another way of looking at it is that smp_store_release(tail) needs pairing
with something, so it should be:

   thread #1            thread #2

                        smp_load_acquire(head)
                        ... read data from queue ..
                        smp_store_release(tail)

   smp_load_acquire(tail)
   ... add data to queue ..
   smp_store_release(head)

Take your example, reorder the threads and add the missing indices accesses:

   thread #1            thread #2

                        smp_load_acquire(x)
                        ... read data from queue ..
			tail++

   read tail;
   ... add data to queue ..
   smp_store_release(x)

Is there anything to stop the cpus doing out-of-order loads/stores such that
the data read in thread 2 doesn't come after the update of tail?  If that can
happen, the data being written by thread 1 may be being read by thread 2 when
in fact, it shouldn't see it yet, e.g.:

				LOAD head
				ACQUIRE_BARRIER
				LOAD tail
	LOAD head		STORE tail++
	LOAD tail
	STORE data[head]
				LOAD data[tail]
	RELEASE_BARRIER
	STORE head++

I would really like Paul and Will to check me on this.

-~-

> HOWEVER.
>
> I think this is all entirely pointless wrt the pipe buffer use. You
> don't have a simple queue. You have multiple writers, and you have
> multiple readers. As a result, you need real locking.

Yes, and I don't deny that.  Between readers and readers you do; between
writers and writers you do.  But barriers are about the coupling between the
active reader and the active writer - and even with locking, you *still* need
the barriers, it's just that there are barriers implicit in the locking
primitives - so they're there, just hidden.

> So don't do the barriers. If you do the barriers, you're almost
> certainly doing something buggy. You don't have the simple "consumer
> -> producer" thing. Or rather, you don't _only_ have that thing.

I think what you're thinking of still has a problem.  Could you give a simple
'trace' of what it is you're thinking of, particularly in the reader, so that
I can see.

I think that the following points need to be considered:

 (1) Accessing the ring requires *four* barriers, two read-side and two
     write-side.

 (2) pipe_lock() and pipe_unlock() currently provide the barrier and the code
     places them very wide so that they encompass the index access, the data
     access and the index update, and order them with respect to pipe->mutex:

	pipe_lock(); // barrier #1
	... get curbuf and nbufs...
	... write data ...
	... nbufs++ ...
	pipe_unlock(); // barrier #2

				pipe_lock(); // barrier #3
				... get curbuf and nbufs...
				... read data ...
				... curbuf++; nbufs--; ...
				pipe_unlock(); // barrier #4

     The barriers pair up #2->#3 and #4->#1.

 (3) Multiple readers are serialised against each other by pipe_lock(), and
     will continue to be so.

 (4) Multiple userspace writers are serialised against each other by
     pipe_lock(), and will continue to be so.

 (5) Readers and userspace writers are serialised against each other by
     pipe_lock().  This could be changed at some point in the future.

 (6) pipe_lock() and pipe_unlock() cannot be used by post_notification() as it
     needs to be able to insert a message from softirq context or with a
     spinlock held.

 (7) It might be possible to use pipe->wait's spinlock to guard access to the
     ring indices since we might be taking that lock anyway to poke the other
     side.

 (8) We don't want to call copy_page_to/from_iter() with spinlock held.

As we discussed at Plumbers, we could use pipe->wait's spinlock in the
from-userspace writer to do something like:

	pipe_lock();
	...
	spin_lock_irq(&pipe->wait->lock); // barrier #1
	... get indices...
	... advance index ...
	spin_unlock_irq(&pipe->wait->lock);
	... write data ...
	pipe_unlock(); // barrier #2

ie. allocate the slot inside the spinlock, then write it whilst holding off
the reader with the pipe lock.  Yes, that would work as pipe_unlock() acts as
the closing barrier.  But this doesn't work for post_notification().  That
would have to do:

	spin_lock_irq(&pipe->wait->lock); // barrier #1
	... get indices...
	... write data ...
	... advance index ...
	spin_unlock_irq(&pipe->wait->lock); // barrier #2

which is fine, even if it occurs within the from-userspace writer as the
latter is holding the pipe lock.  However, consider the reader.  If you're
thinking of:

	pipe_lock(); // barrier #3?
	...
	... get indices...
	... read data ...
	spin_lock_irq(&pipe->wait->lock);
	... advance index ...
	spin_unlock_irq(&pipe->wait->lock); // barrier #4
	pipe_unlock();

that works against the from-userspace writer, but barrier #3 is in the wrong
place against post_notification().  We could do this:

	pipe_lock();
	...
	spin_lock_irq(&pipe->wait->lock); // barrier #3
	... get indices...
	... read data ...
	... advance index ...
	spin_unlock_irq(&pipe->wait->lock); // barrier #4
	pipe_unlock();

which does indeed protect the reader against post_notification(), but it means
that the data copy happens with the lock held.  An alternative could be:

	pipe_lock();
	...
	spin_lock_irq(&pipe->wait->lock); // barrier #3
	... get indices...
	spin_unlock_irq(&pipe->wait->lock);
	... read data ...
	spin_lock_irq(&pipe->wait->lock);
	... advance index ...
	spin_unlock_irq(&pipe->wait->lock); // barrier #4
	pipe_unlock();

but that would take the spinlock twice.

There shouldn't need to be *any* common lock between the reader and the
writer[*] - not even pipe_lock() or pipe->wait, both of which could be split.

 [*] Apart from the minor fact that, currently, the writer can violate normal
     ring-buffer semantics and go back and continue filling something it's
     already committed.

David
Linus Torvalds Sept. 19, 2019, 3:59 p.m. UTC | #8
On Thu, Sep 19, 2019 at 6:59 AM David Howells <dhowells@redhat.com> wrote:
>
> But I don't agree with this.  You're missing half the barriers.  There should
> be *four* barriers.  The document mandates only 3 barriers, and uses
> READ_ONCE() where the fourth should be, i.e.:
>
>    thread #1            thread #2
>
>                         smp_load_acquire(head)
>                         ... read data from queue ..
>                         smp_store_release(tail)
>
>    READ_ONCE(tail)
>    ... add data to queue ..
>    smp_store_release(head)

The document is right, but you shouldn't do this.

The reason that READ_ONCE() is possible - instead of a
smp_load_acquire() - is that there's now an address dependency chain
from the READ_ONCE to the subsequent writes of the data.

And while there isn't any barrier, a data or control dependency to a
_write_ does end up ordering things (even on alpha - it's only the
read->read dependencies that might be unordered on alpha).

But again, don't do this.

Also, you ignored the part where I told you to not do this because we
already  have locking.

I'm not goign to discuss this further. Locking works. Spinlocks are
cheap. Lockless algorithms that need atomics aren't even cheaper than
spinlocks: they can in fact scale *worse*, because they don't have the
nice queuing optimization that our spinlock have.

Lockless algorithms are great if they can avoid the contention on the
lock and instead only work on distributed data and avoid contention
entirely.

But in this case the lock would be right next to the data anyway, so
even that case doesn't hold.

               Linus
Peter Zijlstra Sept. 23, 2019, 2:49 p.m. UTC | #9
On Thu, Sep 19, 2019 at 02:59:06PM +0100, David Howells wrote:

> But I don't agree with this.  You're missing half the barriers.  There should
> be *four* barriers.  The document mandates only 3 barriers, and uses
> READ_ONCE() where the fourth should be, i.e.:
> 
>    thread #1            thread #2
> 
>                         smp_load_acquire(head)
>                         ... read data from queue ..
>                         smp_store_release(tail)
> 
>    READ_ONCE(tail)
>    ... add data to queue ..
>    smp_store_release(head)
> 

Notably your READ_ONCE() pseudo code is lacking a conditional;
kernel/events/ring_buffer.c writes it like so:

 *   kernel                             user
 *
 *   if (LOAD ->data_tail) {            LOAD ->data_head
 *                      (A)             smp_rmb()       (C)
 *      STORE $data                     LOAD $data
 *      smp_wmb()       (B)             smp_mb()        (D)
 *      STORE ->data_head               STORE ->data_tail
 *   }
 *
 * Where A pairs with D, and B pairs with C.
 *
 * In our case (A) is a control dependency that separates the load of
 * the ->data_tail and the stores of $data. In case ->data_tail
 * indicates there is no room in the buffer to store $data we do not.
 *
 * D needs to be a full barrier since it separates the data READ
 * from the tail WRITE.
 *
 * For B a WMB is sufficient since it separates two WRITEs, and for C
 * an RMB is sufficient since it separates two READs.

Where 'kernel' is the producer and 'user' is the consumer. This was
written before load-acquire and store-release came about (I _think_),
and I've so far resisted updating B to store-release because smp_wmb()
is actually cheaper than store-release on a number of architectures
(notably ARM).

C ought to be a load-aquire, and D really should be a store-release, but
I don't think the perf userspace has that (or uses C11).
Andrea Parri Sept. 27, 2019, 9:51 a.m. UTC | #10
On Mon, Sep 23, 2019 at 04:49:31PM +0200, Peter Zijlstra wrote:
> On Thu, Sep 19, 2019 at 02:59:06PM +0100, David Howells wrote:
> 
> > But I don't agree with this.  You're missing half the barriers.  There should
> > be *four* barriers.  The document mandates only 3 barriers, and uses
> > READ_ONCE() where the fourth should be, i.e.:
> > 
> >    thread #1            thread #2
> > 
> >                         smp_load_acquire(head)
> >                         ... read data from queue ..
> >                         smp_store_release(tail)
> > 
> >    READ_ONCE(tail)
> >    ... add data to queue ..
> >    smp_store_release(head)
> > 
> 
> Notably your READ_ONCE() pseudo code is lacking a conditional;
> kernel/events/ring_buffer.c writes it like so:
> 
>  *   kernel                             user
>  *
>  *   if (LOAD ->data_tail) {            LOAD ->data_head
>  *                      (A)             smp_rmb()       (C)
>  *      STORE $data                     LOAD $data
>  *      smp_wmb()       (B)             smp_mb()        (D)
>  *      STORE ->data_head               STORE ->data_tail
>  *   }
>  *
>  * Where A pairs with D, and B pairs with C.
>  *
>  * In our case (A) is a control dependency that separates the load of
>  * the ->data_tail and the stores of $data. In case ->data_tail
>  * indicates there is no room in the buffer to store $data we do not.

To elaborate on this, dependencies are tricky...  ;-)

For the record, the LKMM doesn't currently model "order" derived from
control dependencies to a _plain_ access (even if the plain access is
a write): in particular, the following is racy (as far as the current
LKMM is concerned):

C rb

{ }

P0(int *tail, int *data, int *head)
{
	if (READ_ONCE(*tail)) {
		*data = 1;
		smp_wmb();
		WRITE_ONCE(*head, 1);
	}
}

P1(int *tail, int *data, int *head)
{
	int r0;
	int r1;

	r0 = READ_ONCE(*head);
	smp_rmb();
	r1 = *data;
	smp_mb();
	WRITE_ONCE(*tail, 1);
}

Replacing the plain "*data = 1" with "WRITE_ONCE(*data, 1)" (or doing
s/READ_ONCE(*tail)/smp_load_acquire(tail)) suffices to avoid the race.
Maybe I'm short of imagination this morning...  but I can't currently
see how the compiler could "break" the above scenario.

I also didn't spend much time thinking about it.  memory-barriers.txt
has a section "CONTROL DEPENDENCIES" dedicated to "alerting developers
using control dependencies for ordering".  That's quite a long section
(and probably still incomplete); the last paragraph summarizes:  ;-)

(*) Compilers do not understand control dependencies.  It is therefore
    your job to ensure that they do not break your code.

  Andrea


>  *
>  * D needs to be a full barrier since it separates the data READ
>  * from the tail WRITE.
>  *
>  * For B a WMB is sufficient since it separates two WRITEs, and for C
>  * an RMB is sufficient since it separates two READs.
> 
> Where 'kernel' is the producer and 'user' is the consumer. This was
> written before load-acquire and store-release came about (I _think_),
> and I've so far resisted updating B to store-release because smp_wmb()
> is actually cheaper than store-release on a number of architectures
> (notably ARM).
> 
> C ought to be a load-aquire, and D really should be a store-release, but
> I don't think the perf userspace has that (or uses C11).
Peter Zijlstra Sept. 27, 2019, 12:49 p.m. UTC | #11
On Fri, Sep 27, 2019 at 11:51:07AM +0200, Andrea Parri wrote:

> For the record, the LKMM doesn't currently model "order" derived from
> control dependencies to a _plain_ access (even if the plain access is
> a write): in particular, the following is racy (as far as the current
> LKMM is concerned):
> 
> C rb
> 
> { }
> 
> P0(int *tail, int *data, int *head)
> {
> 	if (READ_ONCE(*tail)) {
> 		*data = 1;
> 		smp_wmb();
> 		WRITE_ONCE(*head, 1);
> 	}
> }
> 
> P1(int *tail, int *data, int *head)
> {
> 	int r0;
> 	int r1;
> 
> 	r0 = READ_ONCE(*head);
> 	smp_rmb();
> 	r1 = *data;
> 	smp_mb();
> 	WRITE_ONCE(*tail, 1);
> }
> 
> Replacing the plain "*data = 1" with "WRITE_ONCE(*data, 1)" (or doing
> s/READ_ONCE(*tail)/smp_load_acquire(tail)) suffices to avoid the race.
> Maybe I'm short of imagination this morning...  but I can't currently
> see how the compiler could "break" the above scenario.

The compiler; if sufficiently smart; is 'allowed' to change P0 into
something terrible like:

	*data = 1;
	if (*tail) {
		smp_wmb();
		*head = 1;
	} else
		*data = 0;


(assuming it knows *data was 0 from a prior store or something)

Using WRITE_ONCE() defeats this because volatile indicates external
visibility.

> I also didn't spend much time thinking about it.  memory-barriers.txt
> has a section "CONTROL DEPENDENCIES" dedicated to "alerting developers
> using control dependencies for ordering".  That's quite a long section
> (and probably still incomplete); the last paragraph summarizes:  ;-)

Barring LTO the above works for perf because of inter-translation-unit
function calls, which imply a compiler barrier.

Now, when the compiler inlines, it looses that sync point (and thereby
subtlely changes semantics from the non-inline variant). I suspect LTO
does the same and can cause subtle breakage through this transformation.

> (*) Compilers do not understand control dependencies.  It is therefore
>     your job to ensure that they do not break your code.

It is one the list of things I want to talk about when I finally get
relevant GCC and LLVM people in the same room ;-)

Ideally the compiler can be taught to recognise conditionals dependent
on 'volatile' loads and disallow problematic transformations around
them.

I've added Nick (clang) and Jose (GCC) on Cc, hopefully they can help
find the right people for us.
Peter Zijlstra Sept. 27, 2019, 3:57 p.m. UTC | #12
On Fri, Sep 27, 2019 at 02:49:29PM +0200, Peter Zijlstra wrote:
> On Fri, Sep 27, 2019 at 11:51:07AM +0200, Andrea Parri wrote:
> 
> > For the record, the LKMM doesn't currently model "order" derived from
> > control dependencies to a _plain_ access (even if the plain access is
> > a write): in particular, the following is racy (as far as the current
> > LKMM is concerned):
> > 
> > C rb
> > 
> > { }
> > 
> > P0(int *tail, int *data, int *head)
> > {
> > 	if (READ_ONCE(*tail)) {
> > 		*data = 1;
> > 		smp_wmb();
> > 		WRITE_ONCE(*head, 1);
> > 	}
> > }
> > 
> > P1(int *tail, int *data, int *head)
> > {
> > 	int r0;
> > 	int r1;
> > 
> > 	r0 = READ_ONCE(*head);
> > 	smp_rmb();
> > 	r1 = *data;
> > 	smp_mb();
> > 	WRITE_ONCE(*tail, 1);
> > }
> > 
> > Replacing the plain "*data = 1" with "WRITE_ONCE(*data, 1)" (or doing
> > s/READ_ONCE(*tail)/smp_load_acquire(tail)) suffices to avoid the race.
> > Maybe I'm short of imagination this morning...  but I can't currently
> > see how the compiler could "break" the above scenario.
> 
> The compiler; if sufficiently smart; is 'allowed' to change P0 into
> something terrible like:
> 
> 	*data = 1;
> 	if (*tail) {
> 		smp_wmb();
> 		*head = 1;
> 	} else
> 		*data = 0;
> 
> 
> (assuming it knows *data was 0 from a prior store or something)
> 
> Using WRITE_ONCE() defeats this because volatile indicates external
> visibility.

The much simpler solution might be writing it like:

	if (READ_ONCE(*tail) {
		barrier();
		*data = 1;
		smp_wmb();
		WRITE_ONCE(*head, 1);
	}

which I don't think the compiler is allowed to mess up.
Nick Desaulniers Sept. 27, 2019, 8:43 p.m. UTC | #13
On Fri, Sep 27, 2019 at 5:49 AM Peter Zijlstra <peterz@infradead.org> wrote:
>
> On Fri, Sep 27, 2019 at 11:51:07AM +0200, Andrea Parri wrote:
>
> > For the record, the LKMM doesn't currently model "order" derived from
> > control dependencies to a _plain_ access (even if the plain access is
> > a write): in particular, the following is racy (as far as the current
> > LKMM is concerned):
> >
> > C rb
> >
> > { }
> >
> > P0(int *tail, int *data, int *head)
> > {
> >       if (READ_ONCE(*tail)) {
> >               *data = 1;
> >               smp_wmb();
> >               WRITE_ONCE(*head, 1);
> >       }
> > }
> >
> > P1(int *tail, int *data, int *head)
> > {
> >       int r0;
> >       int r1;
> >
> >       r0 = READ_ONCE(*head);
> >       smp_rmb();
> >       r1 = *data;
> >       smp_mb();
> >       WRITE_ONCE(*tail, 1);
> > }
> >
> > Replacing the plain "*data = 1" with "WRITE_ONCE(*data, 1)" (or doing
> > s/READ_ONCE(*tail)/smp_load_acquire(tail)) suffices to avoid the race.
> > Maybe I'm short of imagination this morning...  but I can't currently
> > see how the compiler could "break" the above scenario.
>
> The compiler; if sufficiently smart; is 'allowed' to change P0 into
> something terrible like:
>
>         *data = 1;
>         if (*tail) {
>                 smp_wmb();
>                 *head = 1;
>         } else
>                 *data = 0;

I don't think so.  This snippet has different side effects than P0.
P0 never assigned *data to zero, this snippet does.  P0 *may* assign
*data to 1.  This snippet will unconditionally assign to *data,
conditionally 1 or 0.  I think the REVERSE transform (from your
snippet to P0) would actually be legal, but IANALL (I am not a
language lawyer; haven't yet passed the BAR).

>
>
> (assuming it knows *data was 0 from a prior store or something)

Oh, in that case I'm less sure (I still don't think so, but I would
love to be proven wrong, preferably with a godbolt link).  I think the
best would be to share a godbolt.org link to a case that's clearly
broken, or cite the relevant part of the ISO C standard (which itself
leaves room for interpretation), otherwise the discussion is too
hypothetical.  Those two things are single-handedly the best way to
communicate with compiler folks.

>
> Using WRITE_ONCE() defeats this because volatile indicates external
> visibility.

Could data be declared as a pointer to volatile qualified int?

>
> > I also didn't spend much time thinking about it.  memory-barriers.txt
> > has a section "CONTROL DEPENDENCIES" dedicated to "alerting developers
> > using control dependencies for ordering".  That's quite a long section
> > (and probably still incomplete); the last paragraph summarizes:  ;-)
>
> Barring LTO the above works for perf because of inter-translation-unit
> function calls, which imply a compiler barrier.
>
> Now, when the compiler inlines, it looses that sync point (and thereby
> subtlely changes semantics from the non-inline variant). I suspect LTO
> does the same and can cause subtle breakage through this transformation.

Do you have a bug report or godbolt link for the above?  I trust that
you're familiar enough with the issue to be able to quickly reproduce
it?  These descriptions of problems are difficult for me to picture in
code or generated code, and when I try to read through
memory-barriers.txt my eyes start to glaze over (then something else
catches fire and I have to go put that out).  Having a concise test
case I think would better illustrate potential issues with LTO that
we'd then be able to focus on trying to fix/support.

We definitely have heavy hitting language lawyers and our LTO folks
are super sharp; I just don't have the necessary compiler experience
just yet to be as helpful in these discussions as we need but I'm
happy to bring them cases that don't work for the kernel and drive
their resolution.

>
> > (*) Compilers do not understand control dependencies.  It is therefore
> >     your job to ensure that they do not break your code.
>
> It is one the list of things I want to talk about when I finally get
> relevant GCC and LLVM people in the same room ;-)
>
> Ideally the compiler can be taught to recognise conditionals dependent
> on 'volatile' loads and disallow problematic transformations around
> them.
>
> I've added Nick (clang) and Jose (GCC) on Cc, hopefully they can help
> find the right people for us.
Nick Desaulniers Sept. 27, 2019, 9:58 p.m. UTC | #14
On Fri, Sep 27, 2019 at 1:43 PM Nick Desaulniers
<ndesaulniers@google.com> wrote:
>
> On Fri, Sep 27, 2019 at 5:49 AM Peter Zijlstra <peterz@infradead.org> wrote:
> > Barring LTO the above works for perf because of inter-translation-unit
> > function calls, which imply a compiler barrier.
> >
> > Now, when the compiler inlines, it looses that sync point (and thereby
> > subtlely changes semantics from the non-inline variant). I suspect LTO
> > does the same and can cause subtle breakage through this transformation.
>
> Do you have a bug report or godbolt link for the above?  I trust that
> you're familiar enough with the issue to be able to quickly reproduce
> it?  These descriptions of problems are difficult for me to picture in
> code or generated code, and when I try to read through
> memory-barriers.txt my eyes start to glaze over (then something else
> catches fire and I have to go put that out).  Having a concise test
> case I think would better illustrate potential issues with LTO that
> we'd then be able to focus on trying to fix/support.
>

Further, if we identified a case were the existing invariants were
broken under LTO, such a case could be added to
CONFIG_DEBUG_LOCKING_API_SELFTESTS (or whatever the most appropriate
kself test is).
Peter Zijlstra Sept. 30, 2019, 9:33 a.m. UTC | #15
On Fri, Sep 27, 2019 at 01:43:18PM -0700, Nick Desaulniers wrote:
> On Fri, Sep 27, 2019 at 5:49 AM Peter Zijlstra <peterz@infradead.org> wrote:

> Oh, in that case I'm less sure (I still don't think so, but I would
> love to be proven wrong, preferably with a godbolt link).  I think the
> best would be to share a godbolt.org link to a case that's clearly
> broken, or cite the relevant part of the ISO C standard (which itself
> leaves room for interpretation), otherwise the discussion is too
> hypothetical.  Those two things are single-handedly the best way to
> communicate with compiler folks.

Ah, I'm not sure current compilers will get it wrong -- and I'm trying
to be preemptive here. I'm looking for a guarantee that compilers will
recognise and respect control depenencies.

The C language spec does not recognise the construct at _all_ and I'd be
fine with it being behind some optional compiler knob.

So far we're mostly very careful when using it, recognising that
compilers can screw us over because they have no clue.

> > Using WRITE_ONCE() defeats this because volatile indicates external
> > visibility.
> 
> Could data be declared as a pointer to volatile qualified int?

It's not actually 'int' data, mostly its a void* and we use memcpy().

> > Barring LTO the above works for perf because of inter-translation-unit
> > function calls, which imply a compiler barrier.

Having looked at it again, I think we're good and have sufficient
barrier() in there to not rely on function calls being a sync point.

> > Now, when the compiler inlines, it looses that sync point (and thereby
> > subtlely changes semantics from the non-inline variant). I suspect LTO
> > does the same and can cause subtle breakage through this transformation.
> 
> Do you have a bug report or godbolt link for the above?  I trust that
> you're familiar enough with the issue to be able to quickly reproduce
> it?  These descriptions of problems are difficult for me to picture in
> code or generated code, and when I try to read through
> memory-barriers.txt my eyes start to glaze over (then something else
> catches fire and I have to go put that out).  Having a concise test
> case I think would better illustrate potential issues with LTO that
> we'd then be able to focus on trying to fix/support.
> 
> We definitely have heavy hitting language lawyers and our LTO folks
> are super sharp; I just don't have the necessary compiler experience
> just yet to be as helpful in these discussions as we need but I'm
> happy to bring them cases that don't work for the kernel and drive
> their resolution.

Like said; I've not seen it go wrong -- but it is one of the things I'm
always paranoid about with LTO.

Furthermore, if it were to go wrong, it'd be a very subtle data race and
finding it would be super hard and painful. Which is again why I would
love to get compiler folks on board to actually support control
dependencies in some way.

Like I said before, something like: "disallowing store hoists over control
flow depending on a volatile load" would be sufficient I think.

Yes this is outside of ISO/C, but it is something that is really
important to us because, as said above, getting it wrong would be
*SUPER* painful.

So basically I'm asking for a language extension I suppose; a new
guarantee from the compiler's memory model that does not exist _at_all_
in the spec, one that we're actively using.

And I'm hoping that getting the compilers to (optionally) support this
is easier than waiting another few decades until Paul McKenney has
wrestled the C committee into sanity and only then (maybe) getting it.
Look at the horrible mess vs data dependencies and consume ordering
(another fun thing we're actively using lots that the compilers are
still struggling with).
Peter Zijlstra Sept. 30, 2019, 11:54 a.m. UTC | #16
On Mon, Sep 30, 2019 at 11:33:52AM +0200, Peter Zijlstra wrote:
> Like I said before, something like: "disallowing store hoists over control
> flow depending on a volatile load" would be sufficient I think.

We need to add 'control flow depending on an inline-asm' to that. We
also very much use that.
Peter Zijlstra Sept. 30, 2019, 12:02 p.m. UTC | #17
On Mon, Sep 30, 2019 at 01:54:40PM +0200, Peter Zijlstra wrote:
> On Mon, Sep 30, 2019 at 11:33:52AM +0200, Peter Zijlstra wrote:
> > Like I said before, something like: "disallowing store hoists over control
> > flow depending on a volatile load" would be sufficient I think.
> 
> We need to add 'control flow depending on an inline-asm' to that. We
> also very much use that.

An example of that would be something like:

bool spin_try_lock(struct spinlock *lock)
{
	u32 zero = 0;

	if (atomic_try_cmpxchg_relaxed(&lock->val, &zero, 1)) {
		smp_acquire__after_ctrl_dep(); /* aka smp_rmb() */
		return true;
	}

	return false;
}

(I think most our actual trylock functions use cmpxchg_acquire(), but the
above would be a valid implementation -- and it is the simplest
construct using smp_acquire__after_ctrl_dep() I could come up with in a
hurry)

Patch
diff mbox series

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index ea8237513dfa..c857675e054e 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -1466,7 +1466,7 @@  static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
 	if (ret < 0)
 		goto out;
 
-	if (pipe->nrbufs + cs.nr_segs > pipe->buffers) {
+	if (pipe->head - pipe->tail + cs.nr_segs > pipe->buffers) {
 		ret = -EIO;
 		goto out;
 	}
@@ -2029,6 +2029,7 @@  static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
 				     struct file *out, loff_t *ppos,
 				     size_t len, unsigned int flags)
 {
+	unsigned int head, tail, mask, count;
 	unsigned nbuf;
 	unsigned idx;
 	struct pipe_buffer *bufs;
@@ -2043,8 +2044,12 @@  static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
 
 	pipe_lock(pipe);
 
-	bufs = kvmalloc_array(pipe->nrbufs, sizeof(struct pipe_buffer),
-			      GFP_KERNEL);
+	head = pipe_get_head_for_read(pipe);
+	tail = pipe->tail;
+	mask = pipe->buffers - 1;
+	count = head - tail;
+
+	bufs = kvmalloc_array(count, sizeof(struct pipe_buffer), GFP_KERNEL);
 	if (!bufs) {
 		pipe_unlock(pipe);
 		return -ENOMEM;
@@ -2052,8 +2057,8 @@  static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
 
 	nbuf = 0;
 	rem = 0;
-	for (idx = 0; idx < pipe->nrbufs && rem < len; idx++)
-		rem += pipe->bufs[(pipe->curbuf + idx) & (pipe->buffers - 1)].len;
+	for (idx = tail; idx < head && rem < len; idx++)
+		rem += pipe->bufs[idx & mask].len;
 
 	ret = -EINVAL;
 	if (rem < len)
@@ -2065,15 +2070,15 @@  static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
 		struct pipe_buffer *obuf;
 
 		BUG_ON(nbuf >= pipe->buffers);
-		BUG_ON(!pipe->nrbufs);
-		ibuf = &pipe->bufs[pipe->curbuf];
+		BUG_ON(tail == head);
+		ibuf = &pipe->bufs[tail];
 		obuf = &bufs[nbuf];
 
 		if (rem >= ibuf->len) {
 			*obuf = *ibuf;
 			ibuf->ops = NULL;
-			pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-			pipe->nrbufs--;
+			tail++;
+			pipe_post_read_barrier(pipe, tail);
 		} else {
 			if (!pipe_buf_get(pipe, ibuf))
 				goto out_free;
diff --git a/fs/pipe.c b/fs/pipe.c
index 8a2ab2f974bd..aa410ee0f423 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -43,10 +43,10 @@  unsigned long pipe_user_pages_hard;
 unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
 
 /*
- * We use a start+len construction, which provides full use of the 
+ * We use a start+len construction, which provides full use of the
  * allocated memory.
  * -- Florian Coosmann (FGC)
- * 
+ *
  * Reads with count = 0 should always return 0.
  * -- Julian Bradfield 1999-06-07.
  *
@@ -285,10 +285,15 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	ret = 0;
 	__pipe_lock(pipe);
 	for (;;) {
-		int bufs = pipe->nrbufs;
-		if (bufs) {
-			int curbuf = pipe->curbuf;
-			struct pipe_buffer *buf = pipe->bufs + curbuf;
+		/* Barrier: head belongs to the write side, so order reading
+		 * the data after reading the head pointer.
+		 */
+		unsigned int head = READ_ONCE(pipe->head);
+		unsigned int tail = pipe->tail;
+		unsigned int mask = pipe->buffers - 1;
+
+		if (tail != head) {
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t chars = buf->len;
 			size_t written;
 			int error;
@@ -321,17 +326,21 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 
 			if (!buf->len) {
 				pipe_buf_release(pipe, buf);
-				curbuf = (curbuf + 1) & (pipe->buffers - 1);
-				pipe->curbuf = curbuf;
-				pipe->nrbufs = --bufs;
+				tail++;
+				/* Barrier: order the reading of the data
+				 * before the update to the tail pointer as buf
+				 * thereafter belongs to the write side.
+				 */
+				smp_store_release(&pipe->tail, tail);
 				do_wakeup = 1;
 			}
 			total_len -= chars;
 			if (!total_len)
 				break;	/* common path: read succeeded */
+			if (tail != head)	/* More to do? */
+				continue;
 		}
-		if (bufs)	/* More to do? */
-			continue;
+
 		if (!pipe->writers)
 			break;
 		if (!pipe->waiting_writers) {
@@ -380,6 +389,7 @@  pipe_write(struct kiocb *iocb, struct iov_iter *from)
 {
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
+	unsigned int head, tail, buffers, mask;
 	ssize_t ret = 0;
 	int do_wakeup = 0;
 	size_t total_len = iov_iter_count(from);
@@ -397,12 +407,18 @@  pipe_write(struct kiocb *iocb, struct iov_iter *from)
 		goto out;
 	}
 
+	/* Barrier: tail belongs to the reader side, order access to the buffer
+	 * after reading the tail pointer.
+	 */
+	tail = READ_ONCE(pipe->tail);
+	head = pipe->head;
+	buffers = pipe->buffers;
+	mask = buffers - 1;
+
 	/* We try to merge small writes */
 	chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
-	if (pipe->nrbufs && chars != 0) {
-		int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
-							(pipe->buffers - 1);
-		struct pipe_buffer *buf = pipe->bufs + lastbuf;
+	if (head != tail && chars != 0) {
+		struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask];
 		int offset = buf->offset + buf->len;
 
 		if (pipe_buf_can_merge(buf) && offset + chars <= PAGE_SIZE) {
@@ -423,18 +439,19 @@  pipe_write(struct kiocb *iocb, struct iov_iter *from)
 	}
 
 	for (;;) {
-		int bufs;
-
 		if (!pipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			if (!ret)
 				ret = -EPIPE;
 			break;
 		}
-		bufs = pipe->nrbufs;
-		if (bufs < pipe->buffers) {
-			int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1);
-			struct pipe_buffer *buf = pipe->bufs + newbuf;
+
+		/* Barrier: tail belongs to the reader side, order access to
+		 * the buffer after reading the tail pointer.
+		 */
+		tail = READ_ONCE(pipe->tail);
+		if ((int)head - (int)tail < buffers ) {
+			struct pipe_buffer *buf = &pipe->bufs[head & mask];
 			struct page *page = pipe->tmp_page;
 			int copied;
 
@@ -470,14 +487,20 @@  pipe_write(struct kiocb *iocb, struct iov_iter *from)
 				buf->ops = &packet_pipe_buf_ops;
 				buf->flags = PIPE_BUF_FLAG_PACKET;
 			}
-			pipe->nrbufs = ++bufs;
+
+			/* Barrier: order head update after data write. */
+			head++;
+			smp_store_release(&pipe->head, head);
 			pipe->tmp_page = NULL;
 
 			if (!iov_iter_count(from))
 				break;
 		}
-		if (bufs < pipe->buffers)
+
+		if ((int)head - (int)tail < buffers)
 			continue;
+
+		/* Wait for buffer space to become available. */
 		if (filp->f_flags & O_NONBLOCK) {
 			if (!ret)
 				ret = -EAGAIN;
@@ -515,17 +538,22 @@  pipe_write(struct kiocb *iocb, struct iov_iter *from)
 static long pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg)
 {
 	struct pipe_inode_info *pipe = filp->private_data;
-	int count, buf, nrbufs;
+	int count, head, tail, mask;
 
 	switch (cmd) {
 		case FIONREAD:
 			__pipe_lock(pipe);
 			count = 0;
-			buf = pipe->curbuf;
-			nrbufs = pipe->nrbufs;
-			while (--nrbufs >= 0) {
-				count += pipe->bufs[buf].len;
-				buf = (buf+1) & (pipe->buffers - 1);
+			/* Barrier: head belongs to the write side, so order
+			 * reading the data after reading the head pointer.
+			 */
+			head = READ_ONCE(pipe->head);
+			tail = pipe->tail;
+			mask = pipe->buffers - 1;
+
+			while (tail < head) {
+				count += pipe->bufs[tail & mask].len;
+				tail++;
 			}
 			__pipe_unlock(pipe);
 
@@ -541,21 +569,26 @@  pipe_poll(struct file *filp, poll_table *wait)
 {
 	__poll_t mask;
 	struct pipe_inode_info *pipe = filp->private_data;
-	int nrbufs;
+	unsigned int head = READ_ONCE(pipe->head);
+	unsigned int tail = READ_ONCE(pipe->tail);
+	int occupancy = (int)head - (int)tail;
 
 	poll_wait(filp, &pipe->wait, wait);
 
+	BUG_ON(occupancy > pipe->buffers);
+
 	/* Reading only -- no need for acquiring the semaphore.  */
-	nrbufs = pipe->nrbufs;
 	mask = 0;
 	if (filp->f_mode & FMODE_READ) {
-		mask = (nrbufs > 0) ? EPOLLIN | EPOLLRDNORM : 0;
+		if (occupancy > 0)
+			mask |= EPOLLIN | EPOLLRDNORM;
 		if (!pipe->writers && filp->f_version != pipe->w_counter)
 			mask |= EPOLLHUP;
 	}
 
 	if (filp->f_mode & FMODE_WRITE) {
-		mask |= (nrbufs < pipe->buffers) ? EPOLLOUT | EPOLLWRNORM : 0;
+		if (occupancy < pipe->buffers)
+			mask |= EPOLLOUT | EPOLLWRNORM;
 		/*
 		 * Most Unices do not set EPOLLERR for FIFOs but on Linux they
 		 * behave exactly like pipes for poll().
@@ -880,7 +913,7 @@  SYSCALL_DEFINE1(pipe, int __user *, fildes)
 
 static int wait_for_partner(struct pipe_inode_info *pipe, unsigned int *cnt)
 {
-	int cur = *cnt;	
+	int cur = *cnt;
 
 	while (cur == *cnt) {
 		pipe_wait(pipe);
@@ -955,7 +988,7 @@  static int fifo_open(struct inode *inode, struct file *filp)
 			}
 		}
 		break;
-	
+
 	case FMODE_WRITE:
 	/*
 	 *  O_WRONLY
@@ -975,7 +1008,7 @@  static int fifo_open(struct inode *inode, struct file *filp)
 				goto err_wr;
 		}
 		break;
-	
+
 	case FMODE_READ | FMODE_WRITE:
 	/*
 	 *  O_RDWR
@@ -1054,7 +1087,7 @@  unsigned int round_pipe_size(unsigned long size)
 static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
 {
 	struct pipe_buffer *bufs;
-	unsigned int size, nr_pages;
+	unsigned int size, nr_pages, head, tail, mask, n;
 	unsigned long user_bufs;
 	long ret = 0;
 
@@ -1086,12 +1119,16 @@  static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
 	}
 
 	/*
-	 * We can shrink the pipe, if arg >= pipe->nrbufs. Since we don't
-	 * expect a lot of shrink+grow operations, just free and allocate
-	 * again like we would do for growing. If the pipe currently
-	 * contains more buffers than arg, then return busy.
+	 * We can shrink the pipe, if arg is greater than the number of
+	 * buffers.  Since we don't expect a lot of shrink+grow operations,
+	 * just free and allocate again like we would do for growing.  If the
+	 * pipe currently contains more buffers than arg, then return busy.
 	 */
-	if (nr_pages < pipe->nrbufs) {
+	head = pipe->head & mask;
+	tail = pipe->tail & mask;
+	mask = pipe->buffers - 1;
+	n = head - tail;
+	if (nr_pages < n) {
 		ret = -EBUSY;
 		goto out_revert_acct;
 	}
@@ -1107,27 +1144,24 @@  static long pipe_set_size(struct pipe_inode_info *pipe, unsigned long arg)
 	 * The pipe array wraps around, so just start the new one at zero
 	 * and adjust the indexes.
 	 */
-	if (pipe->nrbufs) {
-		unsigned int tail;
-		unsigned int head;
-
-		tail = pipe->curbuf + pipe->nrbufs;
-		if (tail < pipe->buffers)
-			tail = 0;
-		else
-			tail &= (pipe->buffers - 1);
-
-		head = pipe->nrbufs - tail;
-		if (head)
-			memcpy(bufs, pipe->bufs + pipe->curbuf, head * sizeof(struct pipe_buffer));
-		if (tail)
-			memcpy(bufs + head, pipe->bufs, tail * sizeof(struct pipe_buffer));
+	if (n > 0) {
+		if (head > tail) {
+			memcpy(bufs, &pipe->bufs[tail], n * sizeof(struct pipe_buffer));
+		} else {
+			unsigned int s = pipe->buffers - tail;
+			if (n - s > 0)
+				memcpy(bufs + s, &pipe->bufs[0],
+				       (n - s) * sizeof(struct pipe_buffer));
+			memcpy(bufs, &pipe->bufs[tail], s * sizeof(struct pipe_buffer));
+		}
 	}
 
-	pipe->curbuf = 0;
+	head = n;
+	tail = 0;
+
 	kfree(pipe->bufs);
-	pipe->bufs = bufs;
-	pipe->buffers = nr_pages;
+	pipe->tail = tail;
+	smp_store_release(&pipe->head, head);
 	return nr_pages * PAGE_SIZE;
 
 out_revert_acct:
diff --git a/fs/splice.c b/fs/splice.c
index 98412721f056..fbded7749bd0 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -185,6 +185,9 @@  ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		       struct splice_pipe_desc *spd)
 {
 	unsigned int spd_pages = spd->nr_pages;
+	unsigned int tail = pipe_get_tail_for_write(pipe);
+	unsigned int head = pipe->head;
+	unsigned int mask = pipe->buffers - 1;
 	int ret = 0, page_nr = 0;
 
 	if (!spd_pages)
@@ -196,9 +199,8 @@  ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		goto out;
 	}
 
-	while (pipe->nrbufs < pipe->buffers) {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		struct pipe_buffer *buf = pipe->bufs + newbuf;
+	while (head - tail < pipe->buffers) {
+		struct pipe_buffer *buf = &pipe->bufs[head & mask];
 
 		buf->page = spd->pages[page_nr];
 		buf->offset = spd->partial[page_nr].offset;
@@ -207,7 +209,8 @@  ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		buf->ops = spd->ops;
 		buf->flags = 0;
 
-		pipe->nrbufs++;
+		head++;
+		pipe_post_write_barrier(pipe, head);
 		page_nr++;
 		ret += buf->len;
 
@@ -228,17 +231,19 @@  EXPORT_SYMBOL_GPL(splice_to_pipe);
 
 ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf)
 {
+	unsigned int head = pipe->head;
+	unsigned int tail = pipe_get_tail_for_write(pipe);
+	unsigned int mask = pipe->buffers - 1;
 	int ret;
 
 	if (unlikely(!pipe->readers)) {
 		send_sig(SIGPIPE, current, 0);
 		ret = -EPIPE;
-	} else if (pipe->nrbufs == pipe->buffers) {
+	} else if (head - tail == pipe->buffers) {
 		ret = -EAGAIN;
 	} else {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		pipe->bufs[newbuf] = *buf;
-		pipe->nrbufs++;
+		pipe->bufs[head & mask] = *buf;
+		pipe_post_write_barrier(pipe, head);
 		return buf->len;
 	}
 	pipe_buf_release(pipe, buf);
@@ -298,10 +303,11 @@  ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 {
 	struct iov_iter to;
 	struct kiocb kiocb;
-	int idx, ret;
+	unsigned int i_head;
+	int ret;
 
 	iov_iter_pipe(&to, READ, pipe, len);
-	idx = to.idx;
+	i_head = to.head;
 	init_sync_kiocb(&kiocb, in);
 	kiocb.ki_pos = *ppos;
 	ret = call_read_iter(in, &kiocb, &to);
@@ -309,7 +315,7 @@  ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 		*ppos = kiocb.ki_pos;
 		file_accessed(in);
 	} else if (ret < 0) {
-		to.idx = idx;
+		to.head = i_head;
 		to.iov_offset = 0;
 		iov_iter_advance(&to, 0); /* to free what was emitted */
 		/*
@@ -370,11 +376,12 @@  static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 	struct iov_iter to;
 	struct page **pages;
 	unsigned int nr_pages;
+	unsigned int mask;
 	size_t offset, base, copied = 0;
 	ssize_t res;
 	int i;
 
-	if (pipe->nrbufs == pipe->buffers)
+	if (pipe->head - pipe->tail == pipe->buffers)
 		return -EAGAIN;
 
 	/*
@@ -400,8 +407,9 @@  static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 		}
 	}
 
-	pipe->bufs[to.idx].offset = offset;
-	pipe->bufs[to.idx].len -= offset;
+	mask = pipe->buffers - 1;
+	pipe->bufs[to.head & mask].offset = offset;
+	pipe->bufs[to.head & mask].len -= offset;
 
 	for (i = 0; i < nr_pages; i++) {
 		size_t this_len = min_t(size_t, len, PAGE_SIZE - offset);
@@ -443,7 +451,7 @@  static int pipe_to_sendpage(struct pipe_inode_info *pipe,
 
 	more = (sd->flags & SPLICE_F_MORE) ? MSG_MORE : 0;
 
-	if (sd->len < sd->total_len && pipe->nrbufs > 1)
+	if (sd->len < sd->total_len && pipe->head - pipe->tail > 1)
 		more |= MSG_SENDPAGE_NOTLAST;
 
 	return file->f_op->sendpage(file, buf->page, buf->offset,
@@ -481,10 +489,13 @@  static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
 static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_desc *sd,
 			  splice_actor *actor)
 {
+	unsigned int head = pipe_get_head_for_read(pipe);
+	unsigned int tail = pipe->tail;
+	unsigned int mask = pipe->buffers - 1;
 	int ret;
 
-	while (pipe->nrbufs) {
-		struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+	while (tail != head) {
+		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 
 		sd->len = buf->len;
 		if (sd->len > sd->total_len)
@@ -511,8 +522,8 @@  static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
 
 		if (!buf->len) {
 			pipe_buf_release(pipe, buf);
-			pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-			pipe->nrbufs--;
+			tail++;
+			pipe_post_read_barrier(pipe, tail);
 			if (pipe->files)
 				sd->need_wakeup = true;
 		}
@@ -543,7 +554,7 @@  static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
 	if (signal_pending(current))
 		return -ERESTARTSYS;
 
-	while (!pipe->nrbufs) {
+	while (pipe->tail == pipe->head) {
 		if (!pipe->writers)
 			return 0;
 
@@ -699,8 +710,11 @@  iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 	splice_from_pipe_begin(&sd);
 	while (sd.total_len) {
 		struct iov_iter from;
+		unsigned int head = pipe_get_head_for_read(pipe);
+		unsigned int tail = pipe->tail;
+		unsigned int mask = pipe->buffers - 1;
 		size_t left;
-		int n, idx;
+		int n;
 
 		ret = splice_from_pipe_next(pipe, &sd);
 		if (ret <= 0)
@@ -719,16 +733,13 @@  iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 
 		/* build the vector */
 		left = sd.total_len;
-		for (n = 0, idx = pipe->curbuf; left && n < pipe->nrbufs; n++, idx++) {
-			struct pipe_buffer *buf = pipe->bufs + idx;
+		for (n = 0; left && tail < head && n < nbufs; tail++, n++) {
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t this_len = buf->len;
 
 			if (this_len > left)
 				this_len = left;
 
-			if (idx == pipe->buffers - 1)
-				idx = -1;
-
 			ret = pipe_buf_confirm(pipe, buf);
 			if (unlikely(ret)) {
 				if (ret == -ENODATA)
@@ -752,14 +763,15 @@  iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 		*ppos = sd.pos;
 
 		/* dismiss the fully eaten buffers, adjust the partial one */
+		tail = pipe->tail;
 		while (ret) {
-			struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+			struct pipe_buffer *buf = &pipe->bufs[tail];
 			if (ret >= buf->len) {
 				ret -= buf->len;
 				buf->len = 0;
 				pipe_buf_release(pipe, buf);
-				pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-				pipe->nrbufs--;
+				tail++;
+				pipe_post_read_barrier(pipe, tail);
 				if (pipe->files)
 					sd.need_wakeup = true;
 			} else {
@@ -942,15 +954,17 @@  ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	sd->flags &= ~SPLICE_F_NONBLOCK;
 	more = sd->flags & SPLICE_F_MORE;
 
-	WARN_ON_ONCE(pipe->nrbufs != 0);
+	WARN_ON_ONCE(pipe->head != pipe->tail);
 
 	while (len) {
+		unsigned int p_nr;
 		size_t read_len;
 		loff_t pos = sd->pos, prev_pos = pos;
 
 		/* Don't try to read more the pipe has space for. */
+		p_nr = READ_ONCE(pipe->head) - READ_ONCE(pipe->tail);
 		read_len = min_t(size_t, len,
-				 (pipe->buffers - pipe->nrbufs) << PAGE_SHIFT);
+				 (pipe->buffers - p_nr) << PAGE_SHIFT);
 		ret = do_splice_to(in, &pos, pipe, read_len, flags);
 		if (unlikely(ret <= 0))
 			goto out_release;
@@ -989,7 +1003,7 @@  ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	}
 
 done:
-	pipe->nrbufs = pipe->curbuf = 0;
+	pipe->tail = pipe->head = 0;
 	file_accessed(in);
 	return bytes;
 
@@ -999,7 +1013,7 @@  ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	 * the pipe buffers in question:
 	 */
 	for (i = 0; i < pipe->buffers; i++) {
-		struct pipe_buffer *buf = pipe->bufs + i;
+		struct pipe_buffer *buf = &pipe->bufs[i];
 
 		if (buf->ops)
 			pipe_buf_release(pipe, buf);
@@ -1075,7 +1089,7 @@  static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
 			send_sig(SIGPIPE, current, 0);
 			return -EPIPE;
 		}
-		if (pipe->nrbufs != pipe->buffers)
+		if (pipe->head - READ_ONCE(pipe->tail) != pipe->buffers)
 			return 0;
 		if (flags & SPLICE_F_NONBLOCK)
 			return -EAGAIN;
@@ -1442,16 +1456,16 @@  static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	int ret;
 
 	/*
-	 * Check ->nrbufs without the inode lock first. This function
+	 * Check the pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs)
+	if (pipe->head == pipe->tail)
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (!pipe->nrbufs) {
+	while (READ_ONCE(pipe->head) == READ_ONCE(pipe->tail)) {
 		if (signal_pending(current)) {
 			ret = -ERESTARTSYS;
 			break;
@@ -1483,13 +1497,13 @@  static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	 * Check ->nrbufs without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs < pipe->buffers)
+	if (READ_ONCE(pipe->head) - READ_ONCE(pipe->tail) >= pipe->buffers)
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (pipe->nrbufs >= pipe->buffers) {
+	while (READ_ONCE(pipe->head) - READ_ONCE(pipe->tail) >= pipe->buffers) {
 		if (!pipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			ret = -EPIPE;
@@ -1520,7 +1534,10 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			       size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
-	int ret = 0, nbuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
+	int ret = 0;
 	bool input_wakeup = false;
 
 
@@ -1540,7 +1557,14 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->buffers - 1;
+	o_head = opipe->head;
+	o_mask = opipe->buffers - 1;
+
 	do {
+		size_t o_len;
+
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			if (!ret)
@@ -1548,14 +1572,17 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		if (!ipipe->nrbufs && !ipipe->writers)
+		i_head = pipe_get_head_for_read(ipipe);
+		o_tail = pipe_get_tail_for_write(opipe);
+
+		if (i_head != i_tail && !ipipe->writers)
 			break;
 
 		/*
 		 * Cannot make any progress, because either the input
 		 * pipe is empty or the output pipe is full.
 		 */
-		if (!ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) {
+		if (i_head != i_tail || o_head - o_tail >= opipe->buffers) {
 			/* Already processed some buffers, break */
 			if (ret)
 				break;
@@ -1575,9 +1602,8 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			goto retry;
 		}
 
-		ibuf = ipipe->bufs + ipipe->curbuf;
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
-		obuf = opipe->bufs + nbuf;
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		if (len >= ibuf->len) {
 			/*
@@ -1585,10 +1611,12 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			 */
 			*obuf = *ibuf;
 			ibuf->ops = NULL;
-			opipe->nrbufs++;
-			ipipe->curbuf = (ipipe->curbuf + 1) & (ipipe->buffers - 1);
-			ipipe->nrbufs--;
+			i_tail++;
+			pipe_post_read_barrier(ipipe, i_tail);
 			input_wakeup = true;
+			o_len = obuf->len;
+			o_head++;
+			pipe_post_write_barrier(opipe, o_head);
 		} else {
 			/*
 			 * Get a reference to this pipe buffer,
@@ -1610,12 +1638,14 @@  static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			pipe_buf_mark_unmergeable(obuf);
 
 			obuf->len = len;
-			opipe->nrbufs++;
-			ibuf->offset += obuf->len;
-			ibuf->len -= obuf->len;
+			ibuf->offset += len;
+			ibuf->len -= len;
+			o_len = len;
+			o_head++;
+			pipe_post_write_barrier(opipe, o_head);
 		}
-		ret += obuf->len;
-		len -= obuf->len;
+		ret += o_len;
+		len -= o_len;
 	} while (len);
 
 	pipe_unlock(ipipe);
@@ -1641,6 +1671,9 @@  static int link_pipe(struct pipe_inode_info *ipipe,
 		     size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
 	int ret = 0, i = 0, nbuf;
 
 	/*
@@ -1650,6 +1683,11 @@  static int link_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->buffers - 1;
+	o_head = opipe->head;
+	o_mask = opipe->buffers - 1;
+
 	do {
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
@@ -1658,15 +1696,18 @@  static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
+		i_head = pipe_get_head_for_read(ipipe);
+		o_tail = pipe_get_tail_for_write(opipe);
+
 		/*
-		 * If we have iterated all input buffers or ran out of
+		 * If we have iterated all input buffers or run out of
 		 * output room, break.
 		 */
-		if (i >= ipipe->nrbufs || opipe->nrbufs >= opipe->buffers)
+		if (i_head != i_tail || o_head - o_tail >= opipe->buffers)
 			break;
 
-		ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (ipipe->buffers-1));
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		/*
 		 * Get a reference to this pipe buffer,
@@ -1678,7 +1719,6 @@  static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		obuf = opipe->bufs + nbuf;
 		*obuf = *ibuf;
 
 		/*
@@ -1691,10 +1731,11 @@  static int link_pipe(struct pipe_inode_info *ipipe,
 
 		if (obuf->len > len)
 			obuf->len = len;
-
-		opipe->nrbufs++;
 		ret += obuf->len;
 		len -= obuf->len;
+
+		o_head++;
+		pipe_post_write_barrier(opipe, o_head);
 		i++;
 	} while (len);
 
diff --git a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
index 5c626fdc10db..b0d74bcb54a1 100644
--- a/include/linux/pipe_fs_i.h
+++ b/include/linux/pipe_fs_i.h
@@ -30,9 +30,9 @@  struct pipe_buffer {
  *	struct pipe_inode_info - a linux kernel pipe
  *	@mutex: mutex protecting the whole thing
  *	@wait: reader/writer wait point in case of empty/full pipe
- *	@nrbufs: the number of non-empty pipe buffers in this pipe
+ *	@head: The point of buffer production
+ *	@tail: The point of buffer consumption
  *	@buffers: total number of buffers (should be a power of 2)
- *	@curbuf: the current pipe buffer entry
  *	@tmp_page: cached released page
  *	@readers: number of current readers of this pipe
  *	@writers: number of current writers of this pipe
@@ -48,7 +48,7 @@  struct pipe_buffer {
 struct pipe_inode_info {
 	struct mutex mutex;
 	wait_queue_head_t wait;
-	unsigned int nrbufs, curbuf, buffers;
+	unsigned int head, tail, buffers;
 	unsigned int readers;
 	unsigned int writers;
 	unsigned int files;
@@ -104,6 +104,76 @@  struct pipe_buf_operations {
 	bool (*get)(struct pipe_inode_info *, struct pipe_buffer *);
 };
 
+/**
+ * pipe_get_tail_for_write - Get pipe buffer tail pointer for write-side use
+ * @pipe: The pipe in question
+ *
+ * Get the tail pointer for use in the write-side code.  This may need to
+ * insert a barrier against the reader to order reading the tail pointer
+ * against the reader reading the buffer.
+ */
+static inline unsigned int pipe_get_tail_for_write(struct pipe_inode_info *pipe)
+{
+	return READ_ONCE(pipe->tail);
+}
+
+/**
+ * pipe_post_read_barrier - Set pipe buffer tail pointer in the read-side
+ * @pipe: The pipe in question
+ * @tail: The new tail pointer
+ *
+ * Update the tail pointer in the read-side code.  This inserts a barrier
+ * against the writer such that the data write is ordered before the tail
+ * pointer update.
+ */
+static inline void pipe_post_read_barrier(struct pipe_inode_info *pipe,
+					  unsigned int tail)
+{
+	smp_store_release(&pipe->tail, tail);
+}
+
+/**
+ * pipe_get_head_for_read - Get pipe buffer head pointer for read-side use
+ * @pipe: The pipe in question
+ *
+ * Get the head pointer for use in the read-side code.  This inserts a barrier
+ * against the reader such that the head pointer is read before the data it
+ * points to.
+ */
+static inline unsigned int pipe_get_head_for_read(struct pipe_inode_info *pipe)
+{
+	return READ_ONCE(pipe->head);
+}
+
+/**
+ * pipe_post_write_barrier - Set pipe buffer head pointer in the write-side
+ * @pipe: The pipe in question
+ * @head: The new head pointer
+ *
+ * Update the head pointer in the write-side code.  This inserts a barrier
+ * against the reader such that the data write is ordered before the head
+ * pointer update.
+ */
+static inline void pipe_post_write_barrier(struct pipe_inode_info *pipe,
+					   unsigned int head)
+{
+	smp_store_release(&pipe->head, head);
+}
+
+/**
+ * pipe_set_tail_for_read - Set pipe buffer tail pointer for read-side use
+ * @pipe: The pipe in question
+ * @tail: The new tail pointer
+ *
+ * Set the tail pointer for use in the read-side code.  This inserts a barrier
+ * against the writer such that the data write is ordered before the tail
+ * pointer update.
+ */
+static inline void pipe_get_tail_for_read(struct pipe_inode_info *pipe, unsigned int tail)
+{
+	smp_store_release(&pipe->tail, tail);
+}
+
 /**
  * pipe_buf_get - get a reference to a pipe_buffer
  * @pipe:	the pipe that the buffer belongs to
diff --git a/include/linux/uio.h b/include/linux/uio.h
index ab5f523bc0df..9576fd8158d7 100644
--- a/include/linux/uio.h
+++ b/include/linux/uio.h
@@ -45,8 +45,8 @@  struct iov_iter {
 	union {
 		unsigned long nr_segs;
 		struct {
-			int idx;
-			int start_idx;
+			unsigned int head;
+			unsigned int start_head;
 		};
 	};
 };
diff --git a/lib/iov_iter.c b/lib/iov_iter.c
index f1e0569b4539..54c250d9370b 100644
--- a/lib/iov_iter.c
+++ b/lib/iov_iter.c
@@ -325,27 +325,32 @@  static size_t copy_page_from_iter_iovec(struct page *page, size_t offset, size_t
 static bool sanity(const struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
-	int idx = i->idx;
-	int next = pipe->curbuf + pipe->nrbufs;
+	unsigned int p_head = pipe->head;
+	unsigned int p_tail = pipe_get_tail_for_write(pipe);
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int p_occupancy = p_head - p_tail;
+	unsigned int i_head = i->head;
+	unsigned int idx;
+
 	if (i->iov_offset) {
 		struct pipe_buffer *p;
-		if (unlikely(!pipe->nrbufs))
+		if (unlikely(p_occupancy == 0))
 			goto Bad;	// pipe must be non-empty
-		if (unlikely(idx != ((next - 1) & (pipe->buffers - 1))))
+		if (unlikely(i_head != p_head - 1))
 			goto Bad;	// must be at the last buffer...
 
-		p = &pipe->bufs[idx];
+		p = &pipe->bufs[i_head & p_mask];
 		if (unlikely(p->offset + p->len != i->iov_offset))
 			goto Bad;	// ... at the end of segment
 	} else {
-		if (idx != (next & (pipe->buffers - 1)))
+		if (i_head != p_head)
 			goto Bad;	// must be right after the last buffer
 	}
 	return true;
 Bad:
-	printk(KERN_ERR "idx = %d, offset = %zd\n", i->idx, i->iov_offset);
-	printk(KERN_ERR "curbuf = %d, nrbufs = %d, buffers = %d\n",
-			pipe->curbuf, pipe->nrbufs, pipe->buffers);
+	printk(KERN_ERR "idx = %d, offset = %zd\n", i_head, i->iov_offset);
+	printk(KERN_ERR "head = %d, tail = %d, buffers = %d\n",
+			p_head, p_tail, pipe->buffers);
 	for (idx = 0; idx < pipe->buffers; idx++)
 		printk(KERN_ERR "[%p %p %d %d]\n",
 			pipe->bufs[idx].ops,
@@ -359,18 +364,15 @@  static bool sanity(const struct iov_iter *i)
 #define sanity(i) true
 #endif
 
-static inline int next_idx(int idx, struct pipe_inode_info *pipe)
-{
-	return (idx + 1) & (pipe->buffers - 1);
-}
-
 static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t bytes,
 			 struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
 	struct pipe_buffer *buf;
+	unsigned int p_tail = pipe_get_tail_for_write(pipe);
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head = i->head;
 	size_t off;
-	int idx;
 
 	if (unlikely(bytes > i->count))
 		bytes = i->count;
@@ -382,8 +384,7 @@  static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
 		return 0;
 
 	off = i->iov_offset;
-	idx = i->idx;
-	buf = &pipe->bufs[idx];
+	buf = &pipe->bufs[i_head & p_mask];
 	if (off) {
 		if (offset == off && buf->page == page) {
 			/* merge with the last one */
@@ -391,18 +392,21 @@  static size_t copy_page_to_iter_pipe(struct page *page, size_t offset, size_t by
 			i->iov_offset += bytes;
 			goto out;
 		}
-		idx = next_idx(idx, pipe);
-		buf = &pipe->bufs[idx];
+		i_head++;
+		buf = &pipe->bufs[i_head & p_mask];
 	}
-	if (idx == pipe->curbuf && pipe->nrbufs)
+	if (i_head - p_tail == pipe->buffers)
 		return 0;
-	pipe->nrbufs++;
-	buf->ops = &page_cache_pipe_buf_ops;
-	get_page(buf->page = page);
+
+	get_page(page);
+	buf->page = page;
 	buf->offset = offset;
 	buf->len = bytes;
+	buf->ops = &page_cache_pipe_buf_ops;
+
+	pipe_post_read_barrier(pipe, i_head);
 	i->iov_offset = offset + bytes;
-	i->idx = idx;
+	i->head = i_head;
 out:
 	i->count -= bytes;
 	return bytes;
@@ -480,24 +484,30 @@  static inline bool allocated(struct pipe_buffer *buf)
 	return buf->ops == &default_pipe_buf_ops;
 }
 
-static inline void data_start(const struct iov_iter *i, int *idxp, size_t *offp)
+static inline void data_start(const struct iov_iter *i, unsigned int *i_headp,
+			      size_t *offp)
 {
+	unsigned int p_mask = i->pipe->buffers - 1;
+	unsigned int i_head = i->head;
 	size_t off = i->iov_offset;
-	int idx = i->idx;
-	if (off && (!allocated(&i->pipe->bufs[idx]) || off == PAGE_SIZE)) {
-		idx = next_idx(idx, i->pipe);
+
+	if (off && (!allocated(&i->pipe->bufs[i_head & p_mask]) ||
+		    off == PAGE_SIZE)) {
+		i_head++;
 		off = 0;
 	}
-	*idxp = idx;
+	*i_headp = i_head;
 	*offp = off;
 }
 
 static size_t push_pipe(struct iov_iter *i, size_t size,
-			int *idxp, size_t *offp)
+			int *i_headp, size_t *offp)
 {
 	struct pipe_inode_info *pipe = i->pipe;
+	unsigned int p_tail = pipe_get_tail_for_write(pipe);
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head;
 	size_t off;
-	int idx;
 	ssize_t left;
 
 	if (unlikely(size > i->count))
@@ -506,33 +516,34 @@  static size_t push_pipe(struct iov_iter *i, size_t size,
 		return 0;
 
 	left = size;
-	data_start(i, &idx, &off);
-	*idxp = idx;
+	data_start(i, &i_head, &off);
+	*i_headp = i_head;
 	*offp = off;
 	if (off) {
 		left -= PAGE_SIZE - off;
 		if (left <= 0) {
-			pipe->bufs[idx].len += size;
+			pipe->bufs[i_head & p_mask].len += size;
 			return size;
 		}
-		pipe->bufs[idx].len = PAGE_SIZE;
-		idx = next_idx(idx, pipe);
+		pipe->bufs[i_head & p_mask].len = PAGE_SIZE;
+		i_head++;
 	}
-	while (idx != pipe->curbuf || !pipe->nrbufs) {
+	while (i_head != p_tail + pipe->buffers) {
+		struct pipe_buffer *buf = &pipe->bufs[i_head & p_mask];
 		struct page *page = alloc_page(GFP_USER);
 		if (!page)
 			break;
-		pipe->nrbufs++;
-		pipe->bufs[idx].ops = &default_pipe_buf_ops;
-		pipe->bufs[idx].page = page;
-		pipe->bufs[idx].offset = 0;
-		if (left <= PAGE_SIZE) {
-			pipe->bufs[idx].len = left;
+
+		buf->ops = &default_pipe_buf_ops;
+		buf->page = page;
+		buf->offset = 0;
+		buf->len = max_t(ssize_t, left, PAGE_SIZE);
+		left -= buf->len;
+		i_head++;
+		pipe_post_write_barrier(pipe, i_head);
+
+		if (left == 0)
 			return size;
-		}
-		pipe->bufs[idx].len = PAGE_SIZE;
-		left -= PAGE_SIZE;
-		idx = next_idx(idx, pipe);
 	}
 	return size - left;
 }
@@ -541,23 +552,26 @@  static size_t copy_pipe_to_iter(const void *addr, size_t bytes,
 				struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head;
 	size_t n, off;
-	int idx;
 
 	if (!sanity(i))
 		return 0;
 
-	bytes = n = push_pipe(i, bytes, &idx, &off);
+	bytes = n = push_pipe(i, bytes, &i_head, &off);
 	if (unlikely(!n))
 		return 0;
-	for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+	do {
 		size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
-		memcpy_to_page(pipe->bufs[idx].page, off, addr, chunk);
-		i->idx = idx;
+		memcpy_to_page(pipe->bufs[i_head & p_mask].page, off, addr, chunk);
+		i->head = i_head;
 		i->iov_offset = off + chunk;
 		n -= chunk;
 		addr += chunk;
-	}
+		off = 0;
+		i_head++;
+	} while (n);
 	i->count -= bytes;
 	return bytes;
 }
@@ -573,28 +587,31 @@  static size_t csum_and_copy_to_pipe_iter(const void *addr, size_t bytes,
 				__wsum *csum, struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head;
 	size_t n, r;
 	size_t off = 0;
 	__wsum sum = *csum;
-	int idx;
 
 	if (!sanity(i))
 		return 0;
 
-	bytes = n = push_pipe(i, bytes, &idx, &r);
+	bytes = n = push_pipe(i, bytes, &i_head, &r);
 	if (unlikely(!n))
 		return 0;
-	for ( ; n; idx = next_idx(idx, pipe), r = 0) {
+	do {
 		size_t chunk = min_t(size_t, n, PAGE_SIZE - r);
-		char *p = kmap_atomic(pipe->bufs[idx].page);
+		char *p = kmap_atomic(pipe->bufs[i_head & p_mask].page);
 		sum = csum_and_memcpy(p + r, addr, chunk, sum, off);
 		kunmap_atomic(p);
-		i->idx = idx;
+		i->head = i_head;
 		i->iov_offset = r + chunk;
 		n -= chunk;
 		off += chunk;
 		addr += chunk;
-	}
+		r = 0;
+		i_head++;
+	} while (n);
 	i->count -= bytes;
 	*csum = sum;
 	return bytes;
@@ -645,29 +662,32 @@  static size_t copy_pipe_to_iter_mcsafe(const void *addr, size_t bytes,
 				struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head;
 	size_t n, off, xfer = 0;
-	int idx;
 
 	if (!sanity(i))
 		return 0;
 
-	bytes = n = push_pipe(i, bytes, &idx, &off);
+	bytes = n = push_pipe(i, bytes, &i_head, &off);
 	if (unlikely(!n))
 		return 0;
-	for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+	do {
 		size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
 		unsigned long rem;
 
-		rem = memcpy_mcsafe_to_page(pipe->bufs[idx].page, off, addr,
-				chunk);
-		i->idx = idx;
+		rem = memcpy_mcsafe_to_page(pipe->bufs[i_head & p_mask].page,
+					    off, addr, chunk);
+		i->head = i_head;
 		i->iov_offset = off + chunk - rem;
 		xfer += chunk - rem;
 		if (rem)
 			break;
 		n -= chunk;
 		addr += chunk;
-	}
+		off = 0;
+		i_head++;
+	} while (n);
 	i->count -= xfer;
 	return xfer;
 }
@@ -925,6 +945,8 @@  EXPORT_SYMBOL(copy_page_from_iter);
 static size_t pipe_zero(size_t bytes, struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
+	unsigned int p_mask = pipe->buffers - 1;
+	unsigned int i_head;
 	size_t n, off;
 	int idx;
 
@@ -935,13 +957,15 @@  static size_t pipe_zero(size_t bytes, struct iov_iter *i)
 	if (unlikely(!n))
 		return 0;
 
-	for ( ; n; idx = next_idx(idx, pipe), off = 0) {
+	do {
 		size_t chunk = min_t(size_t, n, PAGE_SIZE - off);
-		memzero_page(pipe->bufs[idx].page, off, chunk);
-		i->idx = idx;
+		memzero_page(pipe->bufs[i_head & p_mask].page, off, chunk);
+		i->head = i_head;
 		i->iov_offset = off + chunk;
 		n -= chunk;
-	}
+		off = 0;
+		i_head++;
+	} while (n);
 	i->count -= bytes;
 	return bytes;
 }
@@ -987,20 +1011,26 @@  EXPORT_SYMBOL(iov_iter_copy_from_user_atomic);
 static inline void pipe_truncate(struct iov_iter *i)
 {
 	struct pipe_inode_info *pipe = i->pipe;
-	if (pipe->nrbufs) {
+	unsigned int p_tail = pipe_get_tail_for_write(pipe);
+	unsigned int p_head = pipe->head;
+	unsigned int p_mask = pipe->buffers - 1;
+
+	if (p_head != p_tail) {
+		struct pipe_buffer *buf;
+		unsigned int i_head = i->head;
 		size_t off = i->iov_offset;
-		int idx = i->idx;
-		int nrbufs = (idx - pipe->curbuf) & (pipe->buffers - 1);
+
 		if (off) {
-			pipe->bufs[idx].len = off - pipe->bufs[idx].offset;
-			idx = next_idx(idx, pipe);
-			nrbufs++;
+			buf = &pipe->bufs[i_head & p_mask];
+			buf->len = off - buf->offset;
+			i_head++;
 		}
-		while (pipe->nrbufs > nrbufs) {
-			pipe_buf_release(pipe, &pipe->bufs[idx]);
-			idx = next_idx(idx, pipe);
-			pipe->nrbufs--;
+		while (i_head < p_head) {
+			pipe_buf_release(pipe, &pipe->bufs[i_head & p_mask]);
+			i_head++;
 		}
+
+		pipe_post_write_barrier(pipe, i_head);
 	}
 }
 
@@ -1011,18 +1041,20 @@  static void pipe_advance(struct iov_iter *i, size_t size)
 		size = i->count;
 	if (size) {
 		struct pipe_buffer *buf;
+		unsigned int p_mask = pipe->buffers - 1;
+		unsigned int i_head = i->head;
 		size_t off = i->iov_offset, left = size;
-		int idx = i->idx;
+
 		if (off) /* make it relative to the beginning of buffer */
-			left += off - pipe->bufs[idx].offset;
+			left += off - pipe->bufs[i_head & p_mask].offset;
 		while (1) {
-			buf = &pipe->bufs[idx];
+			buf = &pipe->bufs[i_head & p_mask];
 			if (left <= buf->len)
 				break;
 			left -= buf->len;
-			idx = next_idx(idx, pipe);
+			i_head++;
 		}
-		i->idx = idx;
+		i->head = i_head;
 		i->iov_offset = buf->offset + left;
 	}
 	i->count -= size;
@@ -1053,25 +1085,27 @@  void iov_iter_revert(struct iov_iter *i, size_t unroll)
 	i->count += unroll;
 	if (unlikely(iov_iter_is_pipe(i))) {
 		struct pipe_inode_info *pipe = i->pipe;
-		int idx = i->idx;
+		unsigned int p_mask = pipe->buffers - 1;
+		unsigned int i_head = i->head;
 		size_t off = i->iov_offset;
 		while (1) {
-			size_t n = off - pipe->bufs[idx].offset;
+			struct pipe_buffer *b = &pipe->bufs[i_head & p_mask];
+			size_t n = off - b->offset;
 			if (unroll < n) {
 				off -= unroll;
 				break;
 			}
 			unroll -= n;
-			if (!unroll && idx == i->start_idx) {
+			if (!unroll && i_head == i->start_head) {
 				off = 0;
 				break;
 			}
-			if (!idx--)
-				idx = pipe->buffers - 1;
-			off = pipe->bufs[idx].offset + pipe->bufs[idx].len;
+			i_head--;
+			b = &pipe->bufs[i_head & p_mask];
+			off = b->offset + b->len;
 		}
 		i->iov_offset = off;
-		i->idx = idx;
+		i->head = i_head;
 		pipe_truncate(i);
 		return;
 	}
@@ -1159,13 +1193,13 @@  void iov_iter_pipe(struct iov_iter *i, unsigned int direction,
 			size_t count)
 {
 	BUG_ON(direction != READ);
-	WARN_ON(pipe->nrbufs == pipe->buffers);
+	WARN_ON(pipe->head - pipe->tail == pipe->buffers);
 	i->type = ITER_PIPE | READ;
 	i->pipe = pipe;
-	i->idx = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
+	i->head = pipe->head;
 	i->iov_offset = 0;
 	i->count = count;
-	i->start_idx = i->idx;
+	i->start_head = i->head;
 }
 EXPORT_SYMBOL(iov_iter_pipe);
 
@@ -1189,11 +1223,12 @@  EXPORT_SYMBOL(iov_iter_discard);
 
 unsigned long iov_iter_alignment(const struct iov_iter *i)
 {
+	unsigned int p_mask = i->pipe->buffers - 1;
 	unsigned long res = 0;
 	size_t size = i->count;
 
 	if (unlikely(iov_iter_is_pipe(i))) {
-		if (size && i->iov_offset && allocated(&i->pipe->bufs[i->idx]))
+		if (size && i->iov_offset && allocated(&i->pipe->bufs[i->head & p_mask]))
 			return size | i->iov_offset;
 		return size;
 	}
@@ -1231,19 +1266,20 @@  EXPORT_SYMBOL(iov_iter_gap_alignment);
 static inline ssize_t __pipe_get_pages(struct iov_iter *i,
 				size_t maxsize,
 				struct page **pages,
-				int idx,
+				int i_head,
 				size_t *start)
 {
 	struct pipe_inode_info *pipe = i->pipe;
-	ssize_t n = push_pipe(i, maxsize, &idx, start);
+	unsigned int p_mask = pipe->buffers - 1;
+	ssize_t n = push_pipe(i, maxsize, &i_head, start);
 	if (!n)
 		return -EFAULT;
 
 	maxsize = n;
 	n += *start;
 	while (n > 0) {
-		get_page(*pages++ = pipe->bufs[idx].page);
-		idx = next_idx(idx, pipe);
+		get_page(*pages++ = pipe->bufs[i_head & p_mask].page);
+		i_head++;
 		n -= PAGE_SIZE;
 	}
 
@@ -1254,9 +1290,10 @@  static ssize_t pipe_get_pages(struct iov_iter *i,
 		   struct page **pages, size_t maxsize, unsigned maxpages,
 		   size_t *start)
 {
+	unsigned int p_tail;
+	unsigned int i_head;
 	unsigned npages;
 	size_t capacity;
-	int idx;
 
 	if (!maxsize)
 		return 0;
@@ -1264,12 +1301,13 @@  static ssize_t pipe_get_pages(struct iov_iter *i,
 	if (!sanity(i))
 		return -EFAULT;
 
-	data_start(i, &idx, start);
-	/* some of this one + all after this one */
-	npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1;
-	capacity = min(npages,maxpages) * PAGE_SIZE - *start;
+	data_start(i, &i_head, start);
+	p_tail = pipe_get_tail_for_write(i->pipe);
+	/* Amount of free space: some of this one + all after this one */
+	npages = (p_tail + i->pipe->buffers) - i_head;
+	capacity = min(npages, maxpages) * PAGE_SIZE - *start;
 
-	return __pipe_get_pages(i, min(maxsize, capacity), pages, idx, start);
+	return __pipe_get_pages(i, min(maxsize, capacity), pages, i_head, start);
 }
 
 ssize_t iov_iter_get_pages(struct iov_iter *i,
@@ -1323,8 +1361,9 @@  static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
 		   size_t *start)
 {
 	struct page **p;
+	unsigned int p_tail;
+	unsigned int i_head;
 	ssize_t n;
-	int idx;
 	int npages;
 
 	if (!maxsize)
@@ -1333,9 +1372,9 @@  static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
 	if (!sanity(i))
 		return -EFAULT;
 
-	data_start(i, &idx, start);
-	/* some of this one + all after this one */
-	npages = ((i->pipe->curbuf - idx - 1) & (i->pipe->buffers - 1)) + 1;
+	data_start(i, &i_head, start);
+	/* Amount of free space: some of this one + all after this one */
+	npages = (p_tail + i->pipe->buffers) - i_head;
 	n = npages * PAGE_SIZE - *start;
 	if (maxsize > n)
 		maxsize = n;
@@ -1344,7 +1383,7 @@  static ssize_t pipe_get_pages_alloc(struct iov_iter *i,
 	p = get_pages_array(npages);
 	if (!p)
 		return -ENOMEM;
-	n = __pipe_get_pages(i, maxsize, p, idx, start);
+	n = __pipe_get_pages(i, maxsize, p, i_head, start);
 	if (n > 0)
 		*pages = p;
 	else
@@ -1560,15 +1599,16 @@  int iov_iter_npages(const struct iov_iter *i, int maxpages)
 
 	if (unlikely(iov_iter_is_pipe(i))) {
 		struct pipe_inode_info *pipe = i->pipe;
+		unsigned int p_tail = pipe_get_tail_for_write(pipe);
+		unsigned int i_head;
 		size_t off;
-		int idx;
 
 		if (!sanity(i))
 			return 0;
 
-		data_start(i, &idx, &off);
+		data_start(i, &i_head, &off);
 		/* some of this one + all after this one */
-		npages = ((pipe->curbuf - idx - 1) & (pipe->buffers - 1)) + 1;
+		npages = (p_tail + pipe->buffers) - i_head;
 		if (npages >= maxpages)
 			return maxpages;
 	} else iterate_all_kinds(i, size, v, ({