diff mbox series

[4/7] repair: parallelise phase 6

Message ID 20201022051537.2286402-5-david@fromorbit.com (mailing list archive)
State Superseded
Headers show
Series repair: Phase 6 performance improvements | expand

Commit Message

Dave Chinner Oct. 22, 2020, 5:15 a.m. UTC
From: Dave Chinner <dchinner@redhat.com>

A recent metadump provided to us caused repair to take hours in
phase6. It wasn't IO bound - it was fully CPU bound the entire time.
The only way to speed it up is to make phase 6 run multiple
concurrent processing threads.

The obvious way to do this is to spread the concurrency across AGs,
like the other phases, and while this works it is not optimal. When
a processing thread hits a really large directory, it essentially
sits CPU bound until that directory is processed. IF an AG has lots
of large directories, we end up with a really long single threaded
tail that limits concurrency.

Hence we also need to have concurrency /within/ the AG. This is
realtively easy, as the inode chunk records allow for a simple
concurrency mechanism within an AG. We can simply feed each chunk
record to a workqueue, and we get concurrency within the AG for
free. However, this allows prefetch to run way ahead of processing
and this blows out the buffer cache size and can cause OOM.

However, we can use the new workqueue depth limiting to limit the
number of inode chunks queued, and this then backs up the inode
prefetching to it's maximum queue depth. Hence we prevent having the
prefetch code queue the entire AG's inode chunks on the workqueue
blowing out memory by throttling the prefetch consumer.

This takes phase 6 from taking many, many hours down to:

Phase 6:        10/30 21:12:58  10/30 21:40:48  27 minutes, 50 seconds

And burning 20-30 cpus that entire time on my test rig.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 repair/phase6.c | 43 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 35 insertions(+), 8 deletions(-)

Comments

Darrick J. Wong Oct. 22, 2020, 6:11 a.m. UTC | #1
On Thu, Oct 22, 2020 at 04:15:34PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@redhat.com>
> 
> A recent metadump provided to us caused repair to take hours in
> phase6. It wasn't IO bound - it was fully CPU bound the entire time.
> The only way to speed it up is to make phase 6 run multiple
> concurrent processing threads.
> 
> The obvious way to do this is to spread the concurrency across AGs,
> like the other phases, and while this works it is not optimal. When
> a processing thread hits a really large directory, it essentially
> sits CPU bound until that directory is processed. IF an AG has lots
> of large directories, we end up with a really long single threaded
> tail that limits concurrency.
> 
> Hence we also need to have concurrency /within/ the AG. This is
> realtively easy, as the inode chunk records allow for a simple
> concurrency mechanism within an AG. We can simply feed each chunk
> record to a workqueue, and we get concurrency within the AG for
> free. However, this allows prefetch to run way ahead of processing
> and this blows out the buffer cache size and can cause OOM.
> 
> However, we can use the new workqueue depth limiting to limit the
> number of inode chunks queued, and this then backs up the inode
> prefetching to it's maximum queue depth.

I'm interested in (some day) hooking up xfs_scrub to max_queued, since
it has the same concurrency problem when one of the AGs has a number of
hugely fragmented files.

> Hence we prevent having the
> prefetch code queue the entire AG's inode chunks on the workqueue
> blowing out memory by throttling the prefetch consumer.
> 
> This takes phase 6 from taking many, many hours down to:
> 
> Phase 6:        10/30 21:12:58  10/30 21:40:48  27 minutes, 50 seconds
> 
> And burning 20-30 cpus that entire time on my test rig.

Yay!

> Signed-off-by: Dave Chinner <dchinner@redhat.com>
> ---
>  repair/phase6.c | 43 +++++++++++++++++++++++++++++++++++--------
>  1 file changed, 35 insertions(+), 8 deletions(-)
> 
> diff --git a/repair/phase6.c b/repair/phase6.c
> index 70d32089bb57..bf0719c186fb 100644
> --- a/repair/phase6.c
> +++ b/repair/phase6.c
> @@ -6,6 +6,7 @@
>  
>  #include "libxfs.h"
>  #include "threads.h"
> +#include "threads.h"
>  #include "prefetch.h"
>  #include "avl.h"
>  #include "globals.h"
> @@ -3109,20 +3110,45 @@ check_for_orphaned_inodes(
>  }
>  
>  static void
> -traverse_function(
> +do_dir_inode(
>  	struct workqueue	*wq,
> -	xfs_agnumber_t 		agno,
> +	xfs_agnumber_t		agno,
>  	void			*arg)
>  {
> -	ino_tree_node_t 	*irec;
> +	struct ino_tree_node	*irec = arg;
>  	int			i;
> +
> +	for (i = 0; i < XFS_INODES_PER_CHUNK; i++)  {
> +		if (inode_isadir(irec, i))
> +			process_dir_inode(wq->wq_ctx, agno, irec, i);
> +	}
> +}
> +
> +static void
> +traverse_function(
> +	struct workqueue	*wq,
> +	xfs_agnumber_t		agno,
> +	void			*arg)
> +{
> +	struct ino_tree_node	*irec;
>  	prefetch_args_t		*pf_args = arg;
> +	struct workqueue	lwq;
> +	struct xfs_mount	*mp = wq->wq_ctx;
> +
>  
>  	wait_for_inode_prefetch(pf_args);
>  
>  	if (verbose)
>  		do_log(_("        - agno = %d\n"), agno);
>  
> +	/*
> +	 * The more AGs we have in flight at once, the fewer processing threads
> +	 * per AG. This means we don't overwhelm the machine with hundreds of
> +	 * threads when we start acting on lots of AGs at once. We just want
> +	 * enough that we can keep multiple CPUs busy across multiple AGs.
> +	 */
> +	workqueue_create_bound(&lwq, mp, ag_stride, 1000);

Eeeeee, magic number! :)

/me tosses in obligatory hand-wringing about 2000 CPU systems running
out of work.  How about ag_stride * 50 or something? :P

(Aside from that this all looks ok to me)

--D

> +
>  	for (irec = findfirst_inode_rec(agno); irec; irec = next_ino_rec(irec)) {
>  		if (irec->ino_isa_dir == 0)
>  			continue;
> @@ -3130,18 +3156,19 @@ traverse_function(
>  		if (pf_args) {
>  			sem_post(&pf_args->ra_count);
>  #ifdef XR_PF_TRACE
> +			{
> +			int	i;
>  			sem_getvalue(&pf_args->ra_count, &i);
>  			pftrace(
>  		"processing inode chunk %p in AG %d (sem count = %d)",
>  				irec, agno, i);
> +			}
>  #endif
>  		}
>  
> -		for (i = 0; i < XFS_INODES_PER_CHUNK; i++)  {
> -			if (inode_isadir(irec, i))
> -				process_dir_inode(wq->wq_ctx, agno, irec, i);
> -		}
> +		queue_work(&lwq, do_dir_inode, agno, irec);
>  	}
> +	destroy_work_queue(&lwq);
>  	cleanup_inode_prefetch(pf_args);
>  }
>  
> @@ -3169,7 +3196,7 @@ static void
>  traverse_ags(
>  	struct xfs_mount	*mp)
>  {
> -	do_inode_prefetch(mp, 0, traverse_function, false, true);
> +	do_inode_prefetch(mp, ag_stride, traverse_function, false, true);
>  }
>  
>  void
> -- 
> 2.28.0
>
Dave Chinner Oct. 27, 2020, 5:10 a.m. UTC | #2
On Wed, Oct 21, 2020 at 11:11:00PM -0700, Darrick J. Wong wrote:
> On Thu, Oct 22, 2020 at 04:15:34PM +1100, Dave Chinner wrote:
> > +static void
> > +traverse_function(
> > +	struct workqueue	*wq,
> > +	xfs_agnumber_t		agno,
> > +	void			*arg)
> > +{
> > +	struct ino_tree_node	*irec;
> >  	prefetch_args_t		*pf_args = arg;
> > +	struct workqueue	lwq;
> > +	struct xfs_mount	*mp = wq->wq_ctx;
> > +
> >  
> >  	wait_for_inode_prefetch(pf_args);
> >  
> >  	if (verbose)
> >  		do_log(_("        - agno = %d\n"), agno);
> >  
> > +	/*
> > +	 * The more AGs we have in flight at once, the fewer processing threads
> > +	 * per AG. This means we don't overwhelm the machine with hundreds of
> > +	 * threads when we start acting on lots of AGs at once. We just want
> > +	 * enough that we can keep multiple CPUs busy across multiple AGs.
> > +	 */
> > +	workqueue_create_bound(&lwq, mp, ag_stride, 1000);
> 
> Eeeeee, magic number! :)
> 
> /me tosses in obligatory hand-wringing about 2000 CPU systems running
> out of work.  How about ag_stride * 50 or something? :P

ag_stride already determines concurrency via how many AGs are being
scanned at once. However, it provides no insight into the depth of
the queue we need to use per AG.

What this magic number does is bound how deep the work queue gets
before we ask another worker thread to start also processing the
queue. We've already got async threads doing inode prefetch, so the
bound here throttles the rate at which inodes are
prefetched into the buffer cache. In general, we're going to be IO
bound and waiting on readahead rather than throttling on processing
the inodes, so all this bound is doing is preventing readahead from
running too far ahead of processing and potentially causing cache
thrashing.

However, we don't want to go using lots of threads to parallelise
the work within the AG when we have already parallelised across AGs.
We want the initial worker thread per AG to just keep working away
burning CPU while the prefetch code is blocking waiting for more
inodes from disk. Then we get another burst of work being queued,
and so on.

Hence the queue needs to be quite deep so that we can soak up the
bursts of processing that readahead triggers without asking lots of
worker threads to do work. However, if the worker thread hits some
big directories and starts falling behind readahead, that's when it
will hit the maximum queue depth and kick another thread to do work.

IOWs, the queue depth needs to be deep enough to prevent bursts from
triggering extra workers from running, but shallow enough that extra
workers will be scheduled when processing falls behind readahead.

I really don't have a good way to automatically calculate this
depth. I just figure that if we have a 1000 inodes queued up for
processing, we really should kick another thread to start working on
them. It's a simple solution, so I'd like to see if we have problems
with this simple threshold before we try to replace the magic number
with a magic heuristic....

Cheers,

Dave.
Darrick J. Wong Oct. 29, 2020, 5:20 p.m. UTC | #3
On Tue, Oct 27, 2020 at 04:10:44PM +1100, Dave Chinner wrote:
> On Wed, Oct 21, 2020 at 11:11:00PM -0700, Darrick J. Wong wrote:
> > On Thu, Oct 22, 2020 at 04:15:34PM +1100, Dave Chinner wrote:
> > > +static void
> > > +traverse_function(
> > > +	struct workqueue	*wq,
> > > +	xfs_agnumber_t		agno,
> > > +	void			*arg)
> > > +{
> > > +	struct ino_tree_node	*irec;
> > >  	prefetch_args_t		*pf_args = arg;
> > > +	struct workqueue	lwq;
> > > +	struct xfs_mount	*mp = wq->wq_ctx;
> > > +
> > >  
> > >  	wait_for_inode_prefetch(pf_args);
> > >  
> > >  	if (verbose)
> > >  		do_log(_("        - agno = %d\n"), agno);
> > >  
> > > +	/*
> > > +	 * The more AGs we have in flight at once, the fewer processing threads
> > > +	 * per AG. This means we don't overwhelm the machine with hundreds of
> > > +	 * threads when we start acting on lots of AGs at once. We just want
> > > +	 * enough that we can keep multiple CPUs busy across multiple AGs.
> > > +	 */
> > > +	workqueue_create_bound(&lwq, mp, ag_stride, 1000);
> > 
> > Eeeeee, magic number! :)
> > 
> > /me tosses in obligatory hand-wringing about 2000 CPU systems running
> > out of work.  How about ag_stride * 50 or something? :P
> 
> ag_stride already determines concurrency via how many AGs are being
> scanned at once. However, it provides no insight into the depth of
> the queue we need to use per AG.
> 
> What this magic number does is bound how deep the work queue gets
> before we ask another worker thread to start also processing the
> queue.

It does?  I didn't think we'd wake up extra worker threads when the
queue depth reached max_queued:

	if (wq->max_queued && wq->next_item) {
		/* more work, wake up another worker */
		pthread_cond_signal(&wq->wakeup);
	}

AFAICT, any time a worker dequeues a work item, observes that we have
a max queue depth, and sees that there's still more work to do, it'll
wake up another worker.

TBH I think this is better for cpu utilization because we should never
have idle workers while there's more work to do.  At least for the scrub
case...

<shrug> Either that or I think I've misunderstood something?

> We've already got async threads doing inode prefetch, so the
> bound here throttles the rate at which inodes are
> prefetched into the buffer cache. In general, we're going to be IO
> bound and waiting on readahead rather than throttling on processing
> the inodes, so all this bound is doing is preventing readahead from
> running too far ahead of processing and potentially causing cache
> thrashing.
> 
> However, we don't want to go using lots of threads to parallelise
> the work within the AG when we have already parallelised across AGs.
> We want the initial worker thread per AG to just keep working away
> burning CPU while the prefetch code is blocking waiting for more
> inodes from disk. Then we get another burst of work being queued,
> and so on.
> Hence the queue needs to be quite deep so that we can soak up the
> bursts of processing that readahead triggers without asking lots of
> worker threads to do work. However, if the worker thread hits some
> big directories and starts falling behind readahead, that's when it
> will hit the maximum queue depth and kick another thread to do work.

...ah, I think I grok the differences between what repair and scrub are
trying to do with the workqueue.  Repair reaadahead is driving
workqueue_add() calls, so you don't really want to increase parallelism
of the workqueue until (a) you can be reasonably certain that the
workers won't block on IO and (b) the workers have bogged down on huge
directories such that the buffer cache is filling up with memory that
has no immediate consumer.

It's a little different from what I'm doing with scrub, which
effectively reads inobt records and queues each of them separately for
processing.  In this case, the queue depth merely restrains our work
item memory allocations.

> IOWs, the queue depth needs to be deep enough to prevent bursts from
> triggering extra workers from running, but shallow enough that extra
> workers will be scheduled when processing falls behind readahead.
> 
> I really don't have a good way to automatically calculate this
> depth. I just figure that if we have a 1000 inodes queued up for
> processing, we really should kick another thread to start working on
> them. It's a simple solution, so I'd like to see if we have problems
> with this simple threshold before we try to replace the magic number
> with a magic heuristic....

Hm, in that case, shouldn't that code snippet above read:

	if (wq->max_queued && wq->item_count == wq->max_queued - 1) {
		/* more work, wake up another worker */
		pthread_cond_signal(&wq->wakeup);
	}

That would seem to wake up another worker, but only after 1000 inodes
have been added to the queue.

--D

> Cheers,
> 
> Dave.
> -- 
> Dave Chinner
> david@fromorbit.com
diff mbox series

Patch

diff --git a/repair/phase6.c b/repair/phase6.c
index 70d32089bb57..bf0719c186fb 100644
--- a/repair/phase6.c
+++ b/repair/phase6.c
@@ -6,6 +6,7 @@ 
 
 #include "libxfs.h"
 #include "threads.h"
+#include "threads.h"
 #include "prefetch.h"
 #include "avl.h"
 #include "globals.h"
@@ -3109,20 +3110,45 @@  check_for_orphaned_inodes(
 }
 
 static void
-traverse_function(
+do_dir_inode(
 	struct workqueue	*wq,
-	xfs_agnumber_t 		agno,
+	xfs_agnumber_t		agno,
 	void			*arg)
 {
-	ino_tree_node_t 	*irec;
+	struct ino_tree_node	*irec = arg;
 	int			i;
+
+	for (i = 0; i < XFS_INODES_PER_CHUNK; i++)  {
+		if (inode_isadir(irec, i))
+			process_dir_inode(wq->wq_ctx, agno, irec, i);
+	}
+}
+
+static void
+traverse_function(
+	struct workqueue	*wq,
+	xfs_agnumber_t		agno,
+	void			*arg)
+{
+	struct ino_tree_node	*irec;
 	prefetch_args_t		*pf_args = arg;
+	struct workqueue	lwq;
+	struct xfs_mount	*mp = wq->wq_ctx;
+
 
 	wait_for_inode_prefetch(pf_args);
 
 	if (verbose)
 		do_log(_("        - agno = %d\n"), agno);
 
+	/*
+	 * The more AGs we have in flight at once, the fewer processing threads
+	 * per AG. This means we don't overwhelm the machine with hundreds of
+	 * threads when we start acting on lots of AGs at once. We just want
+	 * enough that we can keep multiple CPUs busy across multiple AGs.
+	 */
+	workqueue_create_bound(&lwq, mp, ag_stride, 1000);
+
 	for (irec = findfirst_inode_rec(agno); irec; irec = next_ino_rec(irec)) {
 		if (irec->ino_isa_dir == 0)
 			continue;
@@ -3130,18 +3156,19 @@  traverse_function(
 		if (pf_args) {
 			sem_post(&pf_args->ra_count);
 #ifdef XR_PF_TRACE
+			{
+			int	i;
 			sem_getvalue(&pf_args->ra_count, &i);
 			pftrace(
 		"processing inode chunk %p in AG %d (sem count = %d)",
 				irec, agno, i);
+			}
 #endif
 		}
 
-		for (i = 0; i < XFS_INODES_PER_CHUNK; i++)  {
-			if (inode_isadir(irec, i))
-				process_dir_inode(wq->wq_ctx, agno, irec, i);
-		}
+		queue_work(&lwq, do_dir_inode, agno, irec);
 	}
+	destroy_work_queue(&lwq);
 	cleanup_inode_prefetch(pf_args);
 }
 
@@ -3169,7 +3196,7 @@  static void
 traverse_ags(
 	struct xfs_mount	*mp)
 {
-	do_inode_prefetch(mp, 0, traverse_function, false, true);
+	do_inode_prefetch(mp, ag_stride, traverse_function, false, true);
 }
 
 void