diff mbox series

PATCH? avoid the unnecessary wakeups in pipe_read()

Message ID 20241229135737.GA3293@redhat.com (mailing list archive)
State New
Headers show
Series PATCH? avoid the unnecessary wakeups in pipe_read() | expand

Commit Message

Oleg Nesterov Dec. 29, 2024, 1:57 p.m. UTC
The previous discussion was very confusing, let me start another thread.
This is orthogonal to the possible wq_has_sleeper() optimizations in fs/pipe.c
we discussed before.

Let me quote one of my previous emails. Consider

	int main(void)
	{
		int fd[2], cnt;
		char c;

		pipe(fd);

		if (!fork()) {
			// wait until the parent blocks in pipe_write() ->
			// wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
			sleep(1);

			for (cnt = 0; cnt < 4096; ++cnt)
				read(fd[0], &c, 1);
			return 0;
		}

		// parent
		for (;;)
			write(fd[1], &c, 1);
	}

If I read this code correctly, in this case the child will wakeup the parent
4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
until the last read(fd[0], &c, 1) does

	if (!buf->len)
		tail = pipe_update_tail(pipe, buf, tail);

and after that the parent can write the next char.

Does the patch below make sense? With this patch pipe_read() wakes the
writer up only when pipe_full() changes from T to F.

Still incomplete, obviously not for inclusion. But is it correct or not?
I am not sure I understand this nontrivial logic...

Oleg.
---

Comments

Linus Torvalds Dec. 29, 2024, 5:27 p.m. UTC | #1
On Sun, 29 Dec 2024 at 05:58, Oleg Nesterov <oleg@redhat.com> wrote:
>
> If I read this code correctly, in this case the child will wakeup the parent
> 4095 times for no reason, pipe_writable() == !pipe_pull() will still be true
> until the last read(fd[0], &c, 1) does

Ack, that patch looks sane to me.

Only wake writer if we actually released a pipe slot, and it was full
before we did so.

Makes sense.

                Linus
diff mbox series

Patch

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..27ffb650f131 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	size_t total_len = iov_iter_count(to);
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool wake_writer = false, wake_next_reader = false;
 	ssize_t ret;
 
 	/* Null read succeeds. */
@@ -271,7 +271,6 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	 * (WF_SYNC), because we want them to get going and generate more
 	 * data for us.
 	 */
-	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 	for (;;) {
 		/* Read ->head with a barrier vs post_one_notification() */
 		unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 				buf->len = 0;
 			}
 
-			if (!buf->len)
+			if (!buf->len) {
+				wake_writer |= pipe_full(head, tail, pipe->max_usage);
 				tail = pipe_update_tail(pipe, buf, tail);
+			}
 			total_len -= chars;
 			if (!total_len)
 				break;	/* common path: read succeeded */
@@ -377,7 +378,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		 * _very_ unlikely case that the pipe was full, but we got
 		 * no data.
 		 */
-		if (unlikely(was_full))
+		if (unlikely(wake_writer))
 			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 
@@ -391,14 +392,14 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 			return -ERESTARTSYS;
 
 		mutex_lock(&pipe->mutex);
-		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 		wake_next_reader = true;
+		wake_writer = false;
 	}
 	if (pipe_empty(pipe->head, pipe->tail))
 		wake_next_reader = false;
 	mutex_unlock(&pipe->mutex);
 
-	if (was_full)
+	if (wake_writer)
 		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 	if (wake_next_reader)
 		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);