diff mbox series

fs/aio: obey min_nr when doing wakeups

Message ID 20231122234257.179390-1-kent.overstreet@linux.dev (mailing list archive)
State New
Headers show
Series fs/aio: obey min_nr when doing wakeups | expand

Commit Message

Kent Overstreet Nov. 22, 2023, 11:42 p.m. UTC
Unclear who's maintaining fs/aio.c these days - who wants to take this?
-- >8 --

I've been observing workloads where IPIs due to wakeups in
aio_complete() are ~15% of total CPU time in the profile. Most of those
wakeups are unnecessary when completion batching is in use in
io_getevents().

This plumbs min_nr through via the wait eventry, so that aio_complete()
can avoid doing unnecessary wakeups.

Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
Cc: Benjamin LaHaise <bcrl@kvack.org
Cc: Christian Brauner <brauner@kernel.org>
Cc: linux-aio@kvack.org
Cc: linux-fsdevel@vger.kernel.org
---
 fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 56 insertions(+), 10 deletions(-)

Comments

Christian Brauner Nov. 23, 2023, 5:40 p.m. UTC | #1
On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote:
> Unclear who's maintaining fs/aio.c these days - who wants to take this?
> -- >8 --
> 
> I've been observing workloads where IPIs due to wakeups in
> aio_complete() are ~15% of total CPU time in the profile. Most of those
> wakeups are unnecessary when completion batching is in use in
> io_getevents().
> 
> This plumbs min_nr through via the wait eventry, so that aio_complete()
> can avoid doing unnecessary wakeups.
> 
> Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
> Cc: Benjamin LaHaise <bcrl@kvack.org
> Cc: Christian Brauner <brauner@kernel.org>
> Cc: linux-aio@kvack.org
> Cc: linux-fsdevel@vger.kernel.org
> ---
>  fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++---------
>  1 file changed, 56 insertions(+), 10 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index f8589caef9c1..c69e7caacd1b 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb)
>  	kmem_cache_free(kiocb_cachep, iocb);
>  }
>  
> +struct aio_waiter {
> +	struct wait_queue_entry	w;
> +	size_t			min_nr;
> +};
> +
>  /* aio_complete
>   *	Called when the io request on the given iocb is complete.
>   */
> @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb)
>  	struct kioctx	*ctx = iocb->ki_ctx;
>  	struct aio_ring	*ring;
>  	struct io_event	*ev_page, *event;
> -	unsigned tail, pos, head;
> +	unsigned tail, pos, head, avail;
>  	unsigned long	flags;
>  
>  	/*
> @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb)
>  	ctx->completed_events++;
>  	if (ctx->completed_events > 1)
>  		refill_reqs_available(ctx, head, tail);
> +
> +	avail = tail > head
> +		? tail - head
> +		: tail + ctx->nr_events - head;
>  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
>  
>  	pr_debug("added to ring %p at [%u]\n", iocb, tail);
> @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb)
>  	 */
>  	smp_mb();
>  
> -	if (waitqueue_active(&ctx->wait))
> -		wake_up(&ctx->wait);
> +	if (waitqueue_active(&ctx->wait)) {
> +		struct aio_waiter *curr, *next;
> +		unsigned long flags;
> +
> +		spin_lock_irqsave(&ctx->wait.lock, flags);
> +		list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
> +			if (avail >= curr->min_nr) {
> +				list_del_init_careful(&curr->w.entry);
> +				wake_up_process(curr->w.private);
> +			}
> +		spin_unlock_irqrestore(&ctx->wait.lock, flags);
> +	}
>  }
>  
>  static inline void iocb_put(struct aio_kiocb *iocb)
> @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
>  			struct io_event __user *event,
>  			ktime_t until)
>  {
> -	long ret = 0;
> +	struct hrtimer_sleeper	t;
> +	struct aio_waiter	w;
> +	long ret = 0, ret2 = 0;
>  
>  	/*
>  	 * Note that aio_read_events() is being called as the conditional - i.e.
> @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
>  	 * the ringbuffer empty. So in practice we should be ok, but it's
>  	 * something to be aware of when touching this code.
>  	 */
> -	if (until == 0)
> -		aio_read_events(ctx, min_nr, nr, event, &ret);
> -	else
> -		wait_event_interruptible_hrtimeout(ctx->wait,
> -				aio_read_events(ctx, min_nr, nr, event, &ret),
> -				until);
> +	aio_read_events(ctx, min_nr, nr, event, &ret);
> +	if (until == 0 || ret < 0 || ret >= min_nr)
> +		return ret;
> +
> +	hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
> +	if (until != KTIME_MAX) {
> +		hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
> +		hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
> +	}
> +
> +	init_wait(&w.w);
> +
> +	while (1) {
> +		unsigned long nr_got = ret;
> +
> +		w.min_nr = min_nr - ret;

Hm, can this underflow?

> +
> +		ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?:
> +			!t.task ? -ETIME : 0;

I'd like to avoid the nested ?: as that's rather hard to read.
I _think_ this is equivalent to:

if (!ret2 && !t.task)
	ret = -ETIME;

I can just fix this in-tree though. Did I parse that correctly?

> +
> +		if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2)
> +			break;
> +
> +		if (nr_got == ret)
> +			schedule();
> +	}
> +
> +	finish_wait(&ctx->wait, &w.w);
> +	hrtimer_cancel(&t.timer);
> +	destroy_hrtimer_on_stack(&t.timer);
> +
>  	return ret;
>  }
>  
> -- 
> 2.42.0
>
Kent Overstreet Nov. 24, 2023, 12:24 a.m. UTC | #2
On Thu, Nov 23, 2023 at 06:40:01PM +0100, Christian Brauner wrote:
> On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote:
> > Unclear who's maintaining fs/aio.c these days - who wants to take this?
> > -- >8 --
> > 
> > I've been observing workloads where IPIs due to wakeups in
> > aio_complete() are ~15% of total CPU time in the profile. Most of those
> > wakeups are unnecessary when completion batching is in use in
> > io_getevents().
> > 
> > This plumbs min_nr through via the wait eventry, so that aio_complete()
> > can avoid doing unnecessary wakeups.
> > 
> > Signed-off-by: Kent Overstreet <kent.overstreet@linux.dev>
> > Cc: Benjamin LaHaise <bcrl@kvack.org
> > Cc: Christian Brauner <brauner@kernel.org>
> > Cc: linux-aio@kvack.org
> > Cc: linux-fsdevel@vger.kernel.org
> > ---
> >  fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++---------
> >  1 file changed, 56 insertions(+), 10 deletions(-)
> > 
> > diff --git a/fs/aio.c b/fs/aio.c
> > index f8589caef9c1..c69e7caacd1b 100644
> > --- a/fs/aio.c
> > +++ b/fs/aio.c
> > @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb)
> >  	kmem_cache_free(kiocb_cachep, iocb);
> >  }
> >  
> > +struct aio_waiter {
> > +	struct wait_queue_entry	w;
> > +	size_t			min_nr;
> > +};
> > +
> >  /* aio_complete
> >   *	Called when the io request on the given iocb is complete.
> >   */
> > @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb)
> >  	struct kioctx	*ctx = iocb->ki_ctx;
> >  	struct aio_ring	*ring;
> >  	struct io_event	*ev_page, *event;
> > -	unsigned tail, pos, head;
> > +	unsigned tail, pos, head, avail;
> >  	unsigned long	flags;
> >  
> >  	/*
> > @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb)
> >  	ctx->completed_events++;
> >  	if (ctx->completed_events > 1)
> >  		refill_reqs_available(ctx, head, tail);
> > +
> > +	avail = tail > head
> > +		? tail - head
> > +		: tail + ctx->nr_events - head;
> >  	spin_unlock_irqrestore(&ctx->completion_lock, flags);
> >  
> >  	pr_debug("added to ring %p at [%u]\n", iocb, tail);
> > @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb)
> >  	 */
> >  	smp_mb();
> >  
> > -	if (waitqueue_active(&ctx->wait))
> > -		wake_up(&ctx->wait);
> > +	if (waitqueue_active(&ctx->wait)) {
> > +		struct aio_waiter *curr, *next;
> > +		unsigned long flags;
> > +
> > +		spin_lock_irqsave(&ctx->wait.lock, flags);
> > +		list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
> > +			if (avail >= curr->min_nr) {
> > +				list_del_init_careful(&curr->w.entry);
> > +				wake_up_process(curr->w.private);
> > +			}
> > +		spin_unlock_irqrestore(&ctx->wait.lock, flags);
> > +	}
> >  }
> >  
> >  static inline void iocb_put(struct aio_kiocb *iocb)
> > @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
> >  			struct io_event __user *event,
> >  			ktime_t until)
> >  {
> > -	long ret = 0;
> > +	struct hrtimer_sleeper	t;
> > +	struct aio_waiter	w;
> > +	long ret = 0, ret2 = 0;
> >  
> >  	/*
> >  	 * Note that aio_read_events() is being called as the conditional - i.e.
> > @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
> >  	 * the ringbuffer empty. So in practice we should be ok, but it's
> >  	 * something to be aware of when touching this code.
> >  	 */
> > -	if (until == 0)
> > -		aio_read_events(ctx, min_nr, nr, event, &ret);
> > -	else
> > -		wait_event_interruptible_hrtimeout(ctx->wait,
> > -				aio_read_events(ctx, min_nr, nr, event, &ret),
> > -				until);
> > +	aio_read_events(ctx, min_nr, nr, event, &ret);
> > +	if (until == 0 || ret < 0 || ret >= min_nr)
> > +		return ret;
> > +
> > +	hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
> > +	if (until != KTIME_MAX) {
> > +		hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
> > +		hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
> > +	}
> > +
> > +	init_wait(&w.w);
> > +
> > +	while (1) {
> > +		unsigned long nr_got = ret;
> > +
> > +		w.min_nr = min_nr - ret;
> 
> Hm, can this underflow?

No, because if ret >= min_nr aio_read_events() returned true, and we're
done.

> > +
> > +		ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?:
> > +			!t.task ? -ETIME : 0;
> 
> I'd like to avoid the nested ?: as that's rather hard to read.
> I _think_ this is equivalent to:
> 
> if (!ret2 && !t.task)
> 	ret = -ETIME;
> 
> I can just fix this in-tree though. Did I parse that correctly?

You did, except it needs to be ret2 = -ETIME - we don't return that to
userspace.
Christian Brauner Nov. 24, 2023, 7:47 a.m. UTC | #3
On Wed, 22 Nov 2023 18:42:53 -0500, Kent Overstreet wrote:
> I've been observing workloads where IPIs due to wakeups in
> aio_complete() are ~15% of total CPU time in the profile. Most of those
> wakeups are unnecessary when completion batching is in use in
> io_getevents().
> 
> This plumbs min_nr through via the wait eventry, so that aio_complete()
> can avoid doing unnecessary wakeups.
> 
> [...]

Applied to the vfs.misc branch of the vfs/vfs.git tree.
Patches in the vfs.misc branch should appear in linux-next soon.

Please report any outstanding bugs that were missed during review in a
new review to the original patch series allowing us to drop it.

It's encouraged to provide Acked-bys and Reviewed-bys even though the
patch has now been applied. If possible patch trailers will be updated.

Note that commit hashes shown below are subject to change due to rebase,
trailer updates or similar. If in doubt, please check the listed branch.

tree:   https://git.kernel.org/pub/scm/linux/kernel/git/vfs/vfs.git
branch: vfs.misc

[1/1] fs/aio: obey min_nr when doing wakeups
      https://git.kernel.org/vfs/vfs/c/7a2c359a17b5
diff mbox series

Patch

diff --git a/fs/aio.c b/fs/aio.c
index f8589caef9c1..c69e7caacd1b 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -1106,6 +1106,11 @@  static inline void iocb_destroy(struct aio_kiocb *iocb)
 	kmem_cache_free(kiocb_cachep, iocb);
 }
 
+struct aio_waiter {
+	struct wait_queue_entry	w;
+	size_t			min_nr;
+};
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
@@ -1114,7 +1119,7 @@  static void aio_complete(struct aio_kiocb *iocb)
 	struct kioctx	*ctx = iocb->ki_ctx;
 	struct aio_ring	*ring;
 	struct io_event	*ev_page, *event;
-	unsigned tail, pos, head;
+	unsigned tail, pos, head, avail;
 	unsigned long	flags;
 
 	/*
@@ -1156,6 +1161,10 @@  static void aio_complete(struct aio_kiocb *iocb)
 	ctx->completed_events++;
 	if (ctx->completed_events > 1)
 		refill_reqs_available(ctx, head, tail);
+
+	avail = tail > head
+		? tail - head
+		: tail + ctx->nr_events - head;
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
 	pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -1176,8 +1185,18 @@  static void aio_complete(struct aio_kiocb *iocb)
 	 */
 	smp_mb();
 
-	if (waitqueue_active(&ctx->wait))
-		wake_up(&ctx->wait);
+	if (waitqueue_active(&ctx->wait)) {
+		struct aio_waiter *curr, *next;
+		unsigned long flags;
+
+		spin_lock_irqsave(&ctx->wait.lock, flags);
+		list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry)
+			if (avail >= curr->min_nr) {
+				list_del_init_careful(&curr->w.entry);
+				wake_up_process(curr->w.private);
+			}
+		spin_unlock_irqrestore(&ctx->wait.lock, flags);
+	}
 }
 
 static inline void iocb_put(struct aio_kiocb *iocb)
@@ -1290,7 +1309,9 @@  static long read_events(struct kioctx *ctx, long min_nr, long nr,
 			struct io_event __user *event,
 			ktime_t until)
 {
-	long ret = 0;
+	struct hrtimer_sleeper	t;
+	struct aio_waiter	w;
+	long ret = 0, ret2 = 0;
 
 	/*
 	 * Note that aio_read_events() is being called as the conditional - i.e.
@@ -1306,12 +1327,37 @@  static long read_events(struct kioctx *ctx, long min_nr, long nr,
 	 * the ringbuffer empty. So in practice we should be ok, but it's
 	 * something to be aware of when touching this code.
 	 */
-	if (until == 0)
-		aio_read_events(ctx, min_nr, nr, event, &ret);
-	else
-		wait_event_interruptible_hrtimeout(ctx->wait,
-				aio_read_events(ctx, min_nr, nr, event, &ret),
-				until);
+	aio_read_events(ctx, min_nr, nr, event, &ret);
+	if (until == 0 || ret < 0 || ret >= min_nr)
+		return ret;
+
+	hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+	if (until != KTIME_MAX) {
+		hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns);
+		hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL);
+	}
+
+	init_wait(&w.w);
+
+	while (1) {
+		unsigned long nr_got = ret;
+
+		w.min_nr = min_nr - ret;
+
+		ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?:
+			!t.task ? -ETIME : 0;
+
+		if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2)
+			break;
+
+		if (nr_got == ret)
+			schedule();
+	}
+
+	finish_wait(&ctx->wait, &w.w);
+	hrtimer_cancel(&t.timer);
+	destroy_hrtimer_on_stack(&t.timer);
+
 	return ret;
 }