diff mbox series

pipe_read: don't wake up the writer if the pipe is still full

Message ID 20250102140715.GA7091@redhat.com (mailing list archive)
State New
Headers show
Series pipe_read: don't wake up the writer if the pipe is still full | expand

Commit Message

Oleg Nesterov Jan. 2, 2025, 2:07 p.m. UTC
wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after
the reading, the writer sleeping in wait_event(wr_wait, pipe_writable())
will check the pipe_writable() == !pipe_full() condition and sleep again.

Only wake the writer if we actually released a pipe buf, and the pipe was
full before we did so.

Signed-off-by: Oleg Nesterov <oleg@redhat.com>
---
 fs/pipe.c | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

Comments

WangYuli Jan. 2, 2025, 4:20 p.m. UTC | #1
[Adding some of my colleagues who were part of the original submission 
to the CC list for their information.]


On 2025/1/2 22:07, Oleg Nesterov wrote:
> wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after
> the reading, the writer sleeping in wait_event(wr_wait, pipe_writable())
> will check the pipe_writable() == !pipe_full() condition and sleep again.
>
> Only wake the writer if we actually released a pipe buf, and the pipe was
> full before we did so.

As Linus said, for fs/pipe, he "want any patches to be very clearly 
documented," perhaps we should include a link to the original discussion 
here.

Link: 
https://lore.kernel.org/all/75B06EE0B67747ED+20241225094202.597305-1-wangyuli@uniontech.com/

Link: https://lore.kernel.org/all/20241229135737.GA3293@redhat.com/

> Signed-off-by: Oleg Nesterov<oleg@redhat.com>

Reported-by: WangYuli <wangyuli@uniontech.com>

I'm happy to provide more test results for this patch if it's not too late.

> ---
>   fs/pipe.c | 19 ++++++++++---------
>   1 file changed, 10 insertions(+), 9 deletions(-)
>
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 12b22c2723b7..82fede0f2111 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -253,7 +253,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>   	size_t total_len = iov_iter_count(to);
>   	struct file *filp = iocb->ki_filp;
>   	struct pipe_inode_info *pipe = filp->private_data;
> -	bool was_full, wake_next_reader = false;
> +	bool wake_writer = false, wake_next_reader = false;
>   	ssize_t ret;
>   
>   	/* Null read succeeds. */
> @@ -264,14 +264,13 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>   	mutex_lock(&pipe->mutex);
>   
>   	/*
> -	 * We only wake up writers if the pipe was full when we started
> -	 * reading in order to avoid unnecessary wakeups.
> +	 * We only wake up writers if the pipe was full when we started reading
> +	 * and it is no longer full after reading to avoid unnecessary wakeups.
>   	 *
>   	 * But when we do wake up writers, we do so using a sync wakeup
>   	 * (WF_SYNC), because we want them to get going and generate more
>   	 * data for us.
>   	 */
> -	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
>   	for (;;) {
>   		/* Read ->head with a barrier vs post_one_notification() */
>   		unsigned int head = smp_load_acquire(&pipe->head);
> @@ -340,8 +339,10 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>   				buf->len = 0;
>   			}
>   
> -			if (!buf->len)
> +			if (!buf->len) {
> +				wake_writer |= pipe_full(head, tail, pipe->max_usage);
>   				tail = pipe_update_tail(pipe, buf, tail);
> +			}
>   			total_len -= chars;
>   			if (!total_len)
>   				break;	/* common path: read succeeded */
> @@ -377,7 +378,7 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>   		 * _very_ unlikely case that the pipe was full, but we got
>   		 * no data.
>   		 */
> -		if (unlikely(was_full))
> +		if (unlikely(wake_writer))
>   			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
>   		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
>   
> @@ -390,15 +391,15 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to)
>   		if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
>   			return -ERESTARTSYS;
>   
> -		mutex_lock(&pipe->mutex);
> -		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
> +		wake_writer = false;
>   		wake_next_reader = true;
> +		mutex_lock(&pipe->mutex);
>   	}
>   	if (pipe_empty(pipe->head, pipe->tail))
>   		wake_next_reader = false;
>   	mutex_unlock(&pipe->mutex);
>   
> -	if (was_full)
> +	if (wake_writer)
>   		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
>   	if (wake_next_reader)
>   		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
Hmm..
Initially, the sole purpose of our original patch was to simply check if 
there were any waiting processes in the process wait queue to avoid 
unnecessary wake-ups, for both reads and writes.
And then, sincerely thank you all for taking the time to review it!
While your patch and ours share some little similarities, our primary 
goals may vary slightly. Do you have any suggestions on how we could 
better achieve our original objective?

Thanks,
Oleg Nesterov Jan. 2, 2025, 4:46 p.m. UTC | #2
On 01/03, WangYuli wrote:
>
> [Adding some of my colleagues who were part of the original submission to
> the CC list for their information.]

OK,

> perhaps we should include a link to the original discussion
>
> Link: https://lore.kernel.org/all/75B06EE0B67747ED+20241225094202.597305-1-wangyuli@uniontech.com/
...
> Reported-by: WangYuli <wangyuli@uniontech.com>

WangYuli, this patch has nothing to do with your original patch and
the discussion above.

> I'm happy to provide more test results for this patch if it's not too late.

Would be great, but I don't think this patch can make any difference
performance-wise in practice. Short reads are not that common, I guess.

> Hmm..
> Initially, the sole purpose of our original patch was to simply check if
> there were any waiting processes in the process wait queue to avoid
> unnecessary wake-ups, for both reads and writes.

Exactly. So once again, this patch is orthogonal to the possible
wq_has_sleeper() optimizations.

> Do you have any suggestions on how we could better
> achieve our original objective?

See
	wakeup_pipe_readers/writers() && pipe_poll()
	https://lore.kernel.org/all/20250102163320.GA17691@redhat.com/

Oleg.
Christian Brauner Jan. 4, 2025, 8:42 a.m. UTC | #3
On Thu, 02 Jan 2025 15:07:15 +0100, Oleg Nesterov wrote:
> wake_up(pipe->wr_wait) makes no sense if pipe_full() is still true after
> the reading, the writer sleeping in wait_event(wr_wait, pipe_writable())
> will check the pipe_writable() == !pipe_full() condition and sleep again.
> 
> Only wake the writer if we actually released a pipe buf, and the pipe was
> full before we did so.
> 
> [...]

Applied to the vfs-6.14.misc branch of the vfs/vfs.git tree.
Patches in the vfs-6.14.misc branch should appear in linux-next soon.

Please report any outstanding bugs that were missed during review in a
new review to the original patch series allowing us to drop it.

It's encouraged to provide Acked-bys and Reviewed-bys even though the
patch has now been applied. If possible patch trailers will be updated.

Note that commit hashes shown below are subject to change due to rebase,
trailer updates or similar. If in doubt, please check the listed branch.

tree:   https://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs.git
branch: vfs-6.14.misc

[1/1] pipe_read: don't wake up the writer if the pipe is still full
      https://git.kernel.org/vfs/vfs/c/b004b4d254e7
diff mbox series

Patch

diff --git a/fs/pipe.c b/fs/pipe.c
index 12b22c2723b7..82fede0f2111 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -253,7 +253,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	size_t total_len = iov_iter_count(to);
 	struct file *filp = iocb->ki_filp;
 	struct pipe_inode_info *pipe = filp->private_data;
-	bool was_full, wake_next_reader = false;
+	bool wake_writer = false, wake_next_reader = false;
 	ssize_t ret;
 
 	/* Null read succeeds. */
@@ -264,14 +264,13 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 	mutex_lock(&pipe->mutex);
 
 	/*
-	 * We only wake up writers if the pipe was full when we started
-	 * reading in order to avoid unnecessary wakeups.
+	 * We only wake up writers if the pipe was full when we started reading
+	 * and it is no longer full after reading to avoid unnecessary wakeups.
 	 *
 	 * But when we do wake up writers, we do so using a sync wakeup
 	 * (WF_SYNC), because we want them to get going and generate more
 	 * data for us.
 	 */
-	was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
 	for (;;) {
 		/* Read ->head with a barrier vs post_one_notification() */
 		unsigned int head = smp_load_acquire(&pipe->head);
@@ -340,8 +339,10 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 				buf->len = 0;
 			}
 
-			if (!buf->len)
+			if (!buf->len) {
+				wake_writer |= pipe_full(head, tail, pipe->max_usage);
 				tail = pipe_update_tail(pipe, buf, tail);
+			}
 			total_len -= chars;
 			if (!total_len)
 				break;	/* common path: read succeeded */
@@ -377,7 +378,7 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		 * _very_ unlikely case that the pipe was full, but we got
 		 * no data.
 		 */
-		if (unlikely(was_full))
+		if (unlikely(wake_writer))
 			wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
 
@@ -390,15 +391,15 @@  pipe_read(struct kiocb *iocb, struct iov_iter *to)
 		if (wait_event_interruptible_exclusive(pipe->rd_wait, pipe_readable(pipe)) < 0)
 			return -ERESTARTSYS;
 
-		mutex_lock(&pipe->mutex);
-		was_full = pipe_full(pipe->head, pipe->tail, pipe->max_usage);
+		wake_writer = false;
 		wake_next_reader = true;
+		mutex_lock(&pipe->mutex);
 	}
 	if (pipe_empty(pipe->head, pipe->tail))
 		wake_next_reader = false;
 	mutex_unlock(&pipe->mutex);
 
-	if (was_full)
+	if (wake_writer)
 		wake_up_interruptible_sync_poll(&pipe->wr_wait, EPOLLOUT | EPOLLWRNORM);
 	if (wake_next_reader)
 		wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);