diff mbox series

PATCH? avoid the unnecessary wakeups in pipe_read()

Message ID 20241229135737.GA3293@redhat.com (mailing list archive)
State New
Headers show
Series PATCH? avoid the unnecessary wakeups in pipe_read() | expand

Commit Message

Oleg Nesterov Dec. 29, 2024, 1:57 p.m. UTC
The previous discussion was very confusing, let me start another thread.
This is orthogonal to the possible wq_has_sleeper() optimizations in fs/pipe.c
we discussed before.

Let me quote one of my previous emails. Consider

	int main(void)
	{
		int fd[2], cnt;
		char c;

		pipe(fd);

		if (!fork()) {
			// wait until the parent blocks in pipe_write() ->
			// wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
			sleep(1);

			for (cnt = 0; cnt < 4096; ++cnt)
				read(fd[0], &c, 1);
			return 0;
		}

		// parent
		for (;;)
			write(fd[1], &c, 1);
	}

If I read this code correctly, in this case the child will wakeup the parent
4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
until the last read(fd[0], &c, 1) does

	if (!buf->len)
		tail = pipe_update_tail(pipe, buf, tail);

and after that the parent can write the next char.

Does the patch below make sense? With this patch pipe_read() wakes the
writer up only when pipe_full() changes from T to F.

Still incomplete, obviously not for inclusion. But is it correct or not?
I am not sure I understand this nontrivial logic...

Oleg.
---

Comments

Linus Torvalds Dec. 29, 2024, 5:27 p.m. UTC | #1
On Sun, 29 Dec 2024 at 05:58, Oleg Nesterov <oleg@redhat.com> wrote:
>
> If I read this code correctly, in this case the child will wakeup the parent
> 4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
> until the last read(fd[0], &c, 1) does

Ack, that patch looks sane to me.

Only wake writer if we actually released a pipe slot, and it was full
before we did so.

Makes sense.

                Linus
Oleg Nesterov Jan. 2, 2025, 4:33 p.m. UTC | #2
I was going to send a one-liner patch which adds mb() into pipe_poll()
but then I decided to make even more spam and ask some questions first.

	static void wakeup_pipe_readers(struct pipe_inode_info *pipe)
	{
		smp_mb();
		if (waitqueue_active(&pipe->rd_wait))
			wake_up_interruptible(&pipe->rd_wait);
		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
	}

I think that wq_has_sleeper() + wake_up_interruptible_poll(POLLIN) make more
sense but this is minor.

Either way the waitqueue_active() check is only correct if the waiter has a
barrier between __add_wait_queue() and "check the condition". wait_event()
is fine, but pipe_poll() does:

	// poll_wait()
	__pollwait() -> add_wait_queue(pipe->rd_wait) -> list_add()

	READ_ONCE(pipe->head);
	READ_ONCE(pipe->tail);

In theory these LOAD's can leak into the critical section in add_wait_queue()
and they can happen before list_add(entry, rd_wait.head).

So I think we need the trivial

	--- a/fs/pipe.c
	+++ b/fs/pipe.c
	@@ -680,6 +680,7 @@ pipe_poll(struct file *filp, poll_table *wait)
		 * if something changes and you got it wrong, the poll
		 * table entry will wake you up and fix it.
		 */
	+	smp_mb();
		head = READ_ONCE(pipe->head);
		tail = READ_ONCE(pipe->tail);

and after that pipe_read/pipe_write can use the wq_has_sleeper() check too
(this is what the patch from WangYuli did).

-------------------------------------------------------------------------------
But perhaps this mb() should go into __pollwait() ? We can have more
waitqueue_active() users which do not take .poll() into account...

The are more init_poll_funcptr()'s, but at least epoll looks fine,
epi_fget() in ep_item_poll() provides a full barrier before vfs_poll().

-------------------------------------------------------------------------------
Or really add mb() into __add_wait_queue/__add_wait_queue_entry_tail as
Manfred suggests? Somehow I am not sure about this change.

Oleg.
Manfred Spraul Jan. 4, 2025, 8:57 p.m. UTC | #3
Hi Oleg,

On 1/2/25 5:33 PM, Oleg Nesterov wrote:
> I was going to send a one-liner patch which adds mb() into pipe_poll()
> but then I decided to make even more spam and ask some questions first.
>
> 	static void wakeup_pipe_readers(struct pipe_inode_info *pipe)
> 	{
> 		smp_mb();
> 		if (waitqueue_active(&pipe->rd_wait))
> 			wake_up_interruptible(&pipe->rd_wait);
> 		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
> 	}
>
> I think that wq_has_sleeper() + wake_up_interruptible_poll(POLLIN) make more
> sense but this is minor.
>
> Either way the waitqueue_active() check is only correct if the waiter has a
> barrier between __add_wait_queue() and "check the condition". wait_event()
> is fine, but pipe_poll() does:
>
> 	// poll_wait()
> 	__pollwait() -> add_wait_queue(pipe->rd_wait) -> list_add()
>
> 	READ_ONCE(pipe->head);
> 	READ_ONCE(pipe->tail);
>
> In theory these LOAD's can leak into the critical section in add_wait_queue()
> and they can happen before list_add(entry, rd_wait.head).
>
> So I think we need the trivial
>
> 	--- a/fs/pipe.c
> 	+++ b/fs/pipe.c
> 	@@ -680,6 +680,7 @@ pipe_poll(struct file *filp, poll_table *wait)
> 		 * if something changes and you got it wrong, the poll
> 		 * table entry will wake you up and fix it.
> 		 */
> 	+	smp_mb();
> 		head = READ_ONCE(pipe->head);
> 		tail = READ_ONCE(pipe->tail);
>
> and after that pipe_read/pipe_write can use the wq_has_sleeper() check too
> (this is what the patch from WangYuli did).

Would it be possible to create a perf probe to get some statistics?

I see at least 4 options:

- do nothing

- add the smp_mb() into pipe_poll, and convert pipe to wq_has_sleepers()

- add the smp_mb() into poll_wait(), convert pipe and potentially 
further poll users to wq_has_sleepers()

- add the smp_mb() into __add_wait_queue(), and merge wq_has_sleepers() 
into wake_up().

The tricky part is probably to differentiate wake_up on empty wait 
queues vs. wake_up on wait queues with entries.

--

     Manfred
Linus Torvalds Jan. 4, 2025, 10:05 p.m. UTC | #4
On Thu, 2 Jan 2025 at 08:33, Oleg Nesterov <oleg@redhat.com> wrote:
>
> I was going to send a one-liner patch which adds mb() into pipe_poll()
> but then I decided to make even more spam and ask some questions first.

poll functions are not *supposed* to need memory barriers.

They are supposed to do "poll_wait()" and then not need any more
serialization after that, because we either

 (a) have a NULL wait-address, in which case we're not going to sleep
and this is just a "check state"

 (b) the waiting function is supposed to do add_wait_queue() (usually
by way of __pollwait) and that should be a sufficient barrier to
anybody who does a wakeup

Note that add_wait_queue() ends up doing a spinlock sequence, and
while that is not a full memory barrier (well, it is on x86, but not
necessarily in general), it *should* be sufficient against an actual
waker.

That's kind of how add_wait_queue() vs wake_up() is supposed to work.

Of course, the fact that we're not discussing the pipe code *not*
doing a full wake sequence (but just a "is the wait queue empty"
thing) is what then messes with the generic rules.

And this makes me think that the whole comment above
waitqueue_active() is just fundamentally wrong. The smp_mb() is *not*
sufficient in the sequence

    smp_mb();
    if (waitqueue_active(wq_head))
        wake_up(wq_head);

because while it happens to work wrt prepare_to_wait() sequences, is
is *not* against other users of add_wait_queue().

In those other sequences the smp_mb() in set_current_state might have
happened long long before.

Those other users aren't just 'poll()', btw. There's quite a lot of
add_wait_queue() users in the kernel. It's a traditional model even if
it's not something people generally add to any more.

Now, hopefully many of those add_wait_queue() users end up using
set_current_state() and getting the memory barrier that way. Or they
use wait_woken() or any of the other proper helpers we have.

But I think this poll() thing is very much an example of this *not*
being valid, and I don't think it's in any way pipe-specific.

So maybe we really do need to add the memory barrier to
__add_wait_queue(). That's going to be painful, particularly with lots
of users not needing it because they have the barrier in all the other
places.

End result: maybe adding it just to __pollwait() is the thing to do,
in the hopes that non-poll users all use the proper sequences.

But no, this is most definitely not a pipe-only thing.

          Linus
diff mbox series

Patch

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..27ffb650f131 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	size_t total_len = iov_iter_count(to);
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool wake_writer = false, wake_next_reader = false;
 	ssize_t ret;
 
 	/* Null read succeeds. */
@@ -271,7 +271,6 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	 * (WF_SYNC), because we want them to get going and generate more
 	 * data for us.
 	 */
-	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 	for (;;) {
 		/* Read ->head with a barrier vs post_one_notification() */
 		unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 				buf->len = 0;
 			}
 
-			if (!buf->len)
+			if (!buf->len) {
+				wake_writer |= pipe_full(head, tail, pipe->max_usage);
 				tail = pipe_update_tail(pipe, buf, tail);
+			}
 			total_len -= chars;
 			if (!total_len)
 				break;	/* common path: read succeeded */
@@ -377,7 +378,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		 * _very_ unlikely case that the pipe was full, but we got
 		 * no data.
 		 */
-		if (unlikely(was_full))
+		if (unlikely(wake_writer))
 			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 
@@ -391,14 +392,14 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 			return -ERESTARTSYS;
 
 		mutex_lock(&pipe->mutex);
-		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 		wake_next_reader = true;
+		wake_writer = false;
 	}
 	if (pipe_empty(pipe->head, pipe->tail))
 		wake_next_reader = false;
 	mutex_unlock(&pipe->mutex);
 
-	if (was_full)
+	if (wake_writer)
 		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 	if (wake_next_reader)
 		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);