diff mbox series

[1/7] workqueue: bound maximum queue depth

Message ID 20201022051537.2286402-2-david@fromorbit.com (mailing list archive)
State Superseded
Headers show
Series repair: Phase 6 performance improvements | expand

Commit Message

Dave Chinner Oct. 22, 2020, 5:15 a.m. UTC
From: Dave Chinner <dchinner@redhat.com>

Existing users of workqueues have bound maximum queue depths in
their external algorithms (e.g. prefetch counts). For parallelising
work that doesn't have an external bound, allow workqueues to
throttle incoming requests at a maximum bound. bounded workqueues
also need to distribute work over all worker threads themselves as
there is no external bounding or worker function throttling
provided.

Existing callers are not throttled and retain direct control of
worker threads, only users of the new create interface will be
throttled and concurrency managed.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 libfrog/workqueue.c | 42 +++++++++++++++++++++++++++++++++++++++---
 libfrog/workqueue.h |  4 ++++
 2 files changed, 43 insertions(+), 3 deletions(-)

Comments

Darrick J. Wong Oct. 22, 2020, 5:54 a.m. UTC | #1
On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@redhat.com>
> 
> Existing users of workqueues have bound maximum queue depths in
> their external algorithms (e.g. prefetch counts). For parallelising
> work that doesn't have an external bound, allow workqueues to
> throttle incoming requests at a maximum bound. bounded workqueues

Nit: capitalize the 'B' in 'bounded'.

> also need to distribute work over all worker threads themselves as
> there is no external bounding or worker function throttling
> provided.
> 
> Existing callers are not throttled and retain direct control of
> worker threads, only users of the new create interface will be
> throttled and concurrency managed.
> 
> Signed-off-by: Dave Chinner <dchinner@redhat.com>
> ---
>  libfrog/workqueue.c | 42 +++++++++++++++++++++++++++++++++++++++---
>  libfrog/workqueue.h |  4 ++++
>  2 files changed, 43 insertions(+), 3 deletions(-)
> 
> diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> index fe3de4289379..e42b2a2f678b 100644
> --- a/libfrog/workqueue.c
> +++ b/libfrog/workqueue.c
> @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
>  		}
>  
>  		/*
> -		 *  Dequeue work from the head of the list.
> +		 *  Dequeue work from the head of the list. If the queue was
> +		 *  full then send a wakeup if we're configured to do so.
>  		 */
>  		assert(wq->item_count > 0);
> +		if (wq->max_queued && wq->item_count == wq->max_queued)
> +			pthread_cond_signal(&wq->queue_full);
> +
>  		wi = wq->next_item;
>  		wq->next_item = wi->next;
>  		wq->item_count--;
>  
> +		if (wq->max_queued && wq->next_item) {
> +			/* more work, wake up another worker */
> +			pthread_cond_signal(&wq->wakeup);

Hmm.  The net effect of this is that we wake up workers faster when a
ton of work comes in, right?  And I bet none of the current workqueue
users have suffered from this because they queue a bunch of work and
then call workqueue_terminate, which wakes all the threads, and they
never go to sleep again.

Does it make sense to simplify the test to "if (wq->next_item) {"?

Other than that, looks good!

--D

> +		}
>  		pthread_mutex_unlock(&wq->lock);
>  
>  		(wi->function)(wi->queue, wi->index, wi->arg);
> @@ -58,10 +66,11 @@ workqueue_thread(void *arg)
>  
>  /* Allocate a work queue and threads.  Returns zero or negative error code. */
>  int
> -workqueue_create(
> +workqueue_create_bound(
>  	struct workqueue	*wq,
>  	void			*wq_ctx,
> -	unsigned int		nr_workers)
> +	unsigned int		nr_workers,
> +	unsigned int		max_queue)
>  {
>  	unsigned int		i;
>  	int			err = 0;
> @@ -70,12 +79,16 @@ workqueue_create(
>  	err = -pthread_cond_init(&wq->wakeup, NULL);
>  	if (err)
>  		return err;
> +	err = -pthread_cond_init(&wq->queue_full, NULL);
> +	if (err)
> +		goto out_wake;
>  	err = -pthread_mutex_init(&wq->lock, NULL);
>  	if (err)
>  		goto out_cond;
>  
>  	wq->wq_ctx = wq_ctx;
>  	wq->thread_count = nr_workers;
> +	wq->max_queued = max_queue;
>  	wq->threads = malloc(nr_workers * sizeof(pthread_t));
>  	if (!wq->threads) {
>  		err = -errno;
> @@ -102,10 +115,21 @@ workqueue_create(
>  out_mutex:
>  	pthread_mutex_destroy(&wq->lock);
>  out_cond:
> +	pthread_cond_destroy(&wq->queue_full);
> +out_wake:
>  	pthread_cond_destroy(&wq->wakeup);
>  	return err;
>  }
>  
> +int
> +workqueue_create(
> +	struct workqueue	*wq,
> +	void			*wq_ctx,
> +	unsigned int		nr_workers)
> +{
> +	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
> +}
> +
>  /*
>   * Create a work item consisting of a function and some arguments and schedule
>   * the work item to be run via the thread pool.  Returns zero or a negative
> @@ -140,6 +164,7 @@ workqueue_add(
>  
>  	/* Now queue the new work structure to the work queue. */
>  	pthread_mutex_lock(&wq->lock);
> +restart:
>  	if (wq->next_item == NULL) {
>  		assert(wq->item_count == 0);
>  		ret = -pthread_cond_signal(&wq->wakeup);
> @@ -150,6 +175,16 @@ workqueue_add(
>  		}
>  		wq->next_item = wi;
>  	} else {
> +		/* throttle on a full queue if configured */
> +		if (wq->max_queued && wq->item_count == wq->max_queued) {
> +			pthread_cond_wait(&wq->queue_full, &wq->lock);
> +			/*
> +			 * Queue might be empty or even still full by the time
> +			 * we get the lock back, so restart the lookup so we do
> +			 * the right thing with the current state of the queue.
> +			 */
> +			goto restart;
> +		}
>  		wq->last_item->next = wi;
>  	}
>  	wq->last_item = wi;
> @@ -201,5 +236,6 @@ workqueue_destroy(
>  	free(wq->threads);
>  	pthread_mutex_destroy(&wq->lock);
>  	pthread_cond_destroy(&wq->wakeup);
> +	pthread_cond_destroy(&wq->queue_full);
>  	memset(wq, 0, sizeof(*wq));
>  }
> diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h
> index a56d1cf14081..a9c108d0e66a 100644
> --- a/libfrog/workqueue.h
> +++ b/libfrog/workqueue.h
> @@ -31,10 +31,14 @@ struct workqueue {
>  	unsigned int		thread_count;
>  	bool			terminate;
>  	bool			terminated;
> +	int			max_queued;
> +	pthread_cond_t		queue_full;
>  };
>  
>  int workqueue_create(struct workqueue *wq, void *wq_ctx,
>  		unsigned int nr_workers);
> +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
> +		unsigned int nr_workers, unsigned int max_queue);
>  int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
>  		uint32_t index, void *arg);
>  int workqueue_terminate(struct workqueue *wq);
> -- 
> 2.28.0
>
Dave Chinner Oct. 22, 2020, 8:11 a.m. UTC | #2
On Wed, Oct 21, 2020 at 10:54:25PM -0700, Darrick J. Wong wrote:
> On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> > From: Dave Chinner <dchinner@redhat.com>
> > 
> > Existing users of workqueues have bound maximum queue depths in
> > their external algorithms (e.g. prefetch counts). For parallelising
> > work that doesn't have an external bound, allow workqueues to
> > throttle incoming requests at a maximum bound. bounded workqueues
> 
> Nit: capitalize the 'B' in 'bounded'.
> 
> > also need to distribute work over all worker threads themselves as
> > there is no external bounding or worker function throttling
> > provided.
> > 
> > Existing callers are not throttled and retain direct control of
> > worker threads, only users of the new create interface will be
> > throttled and concurrency managed.
> > 
> > Signed-off-by: Dave Chinner <dchinner@redhat.com>
> > ---
> >  libfrog/workqueue.c | 42 +++++++++++++++++++++++++++++++++++++++---
> >  libfrog/workqueue.h |  4 ++++
> >  2 files changed, 43 insertions(+), 3 deletions(-)
> > 
> > diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> > index fe3de4289379..e42b2a2f678b 100644
> > --- a/libfrog/workqueue.c
> > +++ b/libfrog/workqueue.c
> > @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
> >  		}
> >  
> >  		/*
> > -		 *  Dequeue work from the head of the list.
> > +		 *  Dequeue work from the head of the list. If the queue was
> > +		 *  full then send a wakeup if we're configured to do so.
> >  		 */
> >  		assert(wq->item_count > 0);
> > +		if (wq->max_queued && wq->item_count == wq->max_queued)
> > +			pthread_cond_signal(&wq->queue_full);
> > +
> >  		wi = wq->next_item;
> >  		wq->next_item = wi->next;
> >  		wq->item_count--;
> >  
> > +		if (wq->max_queued && wq->next_item) {
> > +			/* more work, wake up another worker */
> > +			pthread_cond_signal(&wq->wakeup);
> 
> Hmm.  The net effect of this is that we wake up workers faster when a
> ton of work comes in, right?

Effectively. What it does is increase the concurrency of processing
only when the current worker threads cannot keep up with the
incoming work....

> And I bet none of the current workqueue
> users have suffered from this because they queue a bunch of work and
> then call workqueue_terminate, which wakes all the threads, and they
> never go to sleep again.
> 
> Does it make sense to simplify the test to "if (wq->next_item) {"?

Perhaps so, but I didn't want to make subtle changes to the way the
prefetch stuff works - that's a tangled ball of string that is easy
to deadlock and really hard to debug....

> Other than that, looks good!

Ta!

CHeers,

Dave.
Darrick J. Wong Oct. 25, 2020, 4:41 a.m. UTC | #3
On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@redhat.com>
> 
> Existing users of workqueues have bound maximum queue depths in
> their external algorithms (e.g. prefetch counts). For parallelising
> work that doesn't have an external bound, allow workqueues to
> throttle incoming requests at a maximum bound. bounded workqueues
> also need to distribute work over all worker threads themselves as
> there is no external bounding or worker function throttling
> provided.
> 
> Existing callers are not throttled and retain direct control of
> worker threads, only users of the new create interface will be
> throttled and concurrency managed.
> 
> Signed-off-by: Dave Chinner <dchinner@redhat.com>
> ---
>  libfrog/workqueue.c | 42 +++++++++++++++++++++++++++++++++++++++---
>  libfrog/workqueue.h |  4 ++++
>  2 files changed, 43 insertions(+), 3 deletions(-)
> 
> diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> index fe3de4289379..e42b2a2f678b 100644
> --- a/libfrog/workqueue.c
> +++ b/libfrog/workqueue.c
> @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
>  		}
>  
>  		/*
> -		 *  Dequeue work from the head of the list.
> +		 *  Dequeue work from the head of the list. If the queue was
> +		 *  full then send a wakeup if we're configured to do so.
>  		 */
>  		assert(wq->item_count > 0);
> +		if (wq->max_queued && wq->item_count == wq->max_queued)
> +			pthread_cond_signal(&wq->queue_full);
> +
>  		wi = wq->next_item;
>  		wq->next_item = wi->next;
>  		wq->item_count--;
>  
> +		if (wq->max_queued && wq->next_item) {
> +			/* more work, wake up another worker */
> +			pthread_cond_signal(&wq->wakeup);
> +		}
>  		pthread_mutex_unlock(&wq->lock);
>  
>  		(wi->function)(wi->queue, wi->index, wi->arg);
> @@ -58,10 +66,11 @@ workqueue_thread(void *arg)
>  
>  /* Allocate a work queue and threads.  Returns zero or negative error code. */
>  int
> -workqueue_create(
> +workqueue_create_bound(
>  	struct workqueue	*wq,
>  	void			*wq_ctx,
> -	unsigned int		nr_workers)
> +	unsigned int		nr_workers,
> +	unsigned int		max_queue)
>  {
>  	unsigned int		i;
>  	int			err = 0;
> @@ -70,12 +79,16 @@ workqueue_create(
>  	err = -pthread_cond_init(&wq->wakeup, NULL);
>  	if (err)
>  		return err;
> +	err = -pthread_cond_init(&wq->queue_full, NULL);
> +	if (err)
> +		goto out_wake;
>  	err = -pthread_mutex_init(&wq->lock, NULL);
>  	if (err)
>  		goto out_cond;
>  
>  	wq->wq_ctx = wq_ctx;
>  	wq->thread_count = nr_workers;
> +	wq->max_queued = max_queue;
>  	wq->threads = malloc(nr_workers * sizeof(pthread_t));
>  	if (!wq->threads) {
>  		err = -errno;
> @@ -102,10 +115,21 @@ workqueue_create(
>  out_mutex:
>  	pthread_mutex_destroy(&wq->lock);
>  out_cond:
> +	pthread_cond_destroy(&wq->queue_full);
> +out_wake:
>  	pthread_cond_destroy(&wq->wakeup);
>  	return err;
>  }
>  
> +int
> +workqueue_create(
> +	struct workqueue	*wq,
> +	void			*wq_ctx,
> +	unsigned int		nr_workers)
> +{
> +	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
> +}
> +
>  /*
>   * Create a work item consisting of a function and some arguments and schedule
>   * the work item to be run via the thread pool.  Returns zero or a negative
> @@ -140,6 +164,7 @@ workqueue_add(
>  
>  	/* Now queue the new work structure to the work queue. */
>  	pthread_mutex_lock(&wq->lock);
> +restart:
>  	if (wq->next_item == NULL) {
>  		assert(wq->item_count == 0);
>  		ret = -pthread_cond_signal(&wq->wakeup);
> @@ -150,6 +175,16 @@ workqueue_add(
>  		}
>  		wq->next_item = wi;
>  	} else {
> +		/* throttle on a full queue if configured */
> +		if (wq->max_queued && wq->item_count == wq->max_queued) {
> +			pthread_cond_wait(&wq->queue_full, &wq->lock);

I ported xfs_scrub to use max_queued for the inode scanner, and got a
hang here.  It uses two workqueues -- the first is an unbouned workqueue
that receives one work item per AG in which each work item calls
INUMBERS, creates a work item for the returned inode chunk, and throws
it at the second workqueue.  The second workqueue is a bounded workqueue
that calls BULKSTAT on the INUMBERS work item and then calls the
iteration function on each bulkstat record returned.

The hang happens when the inumbers workqueue has more than one thread
running.  Both* threads notice the full workqueue and wait on
queue_full.  One of the workers in the second workqueue goes to pull off
the next work item, ends up in this if body, signals one of the sleeping
threads, and starts calling bulkstat.

In the time it takes to wake up the sleeping thread from wq 1, the
second workqueue pulls far enough ahead that the single thread from wq1
never manages to fill wq2 again.  Often, the wq1 thread was sleeping so
that it could add the last inode chunk of that AG to wq2.  We therefore
never wake up the *other* sleeping thread from wq1, and the whole app
stalls.

I dunno if that's a sane way to structure an inumbers/bulkstat scan, but
it seemed reasonable to me.  I can envision two possible fixes here: (1)
use pthread_cond_broadcast to wake everything up; or (2) always call
pthread_cond_wait when we pull a work item off the queue.  Thoughts?

--D

*by "both threads" I really mean "the other 31 threads that are asleep
trying to add things to wq2" but afaict this is a general problem.

> +			/*
> +			 * Queue might be empty or even still full by the time
> +			 * we get the lock back, so restart the lookup so we do
> +			 * the right thing with the current state of the queue.
> +			 */
> +			goto restart;
> +		}
>  		wq->last_item->next = wi;
>  	}
>  	wq->last_item = wi;
> @@ -201,5 +236,6 @@ workqueue_destroy(
>  	free(wq->threads);
>  	pthread_mutex_destroy(&wq->lock);
>  	pthread_cond_destroy(&wq->wakeup);
> +	pthread_cond_destroy(&wq->queue_full);
>  	memset(wq, 0, sizeof(*wq));
>  }
> diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h
> index a56d1cf14081..a9c108d0e66a 100644
> --- a/libfrog/workqueue.h
> +++ b/libfrog/workqueue.h
> @@ -31,10 +31,14 @@ struct workqueue {
>  	unsigned int		thread_count;
>  	bool			terminate;
>  	bool			terminated;
> +	int			max_queued;
> +	pthread_cond_t		queue_full;
>  };
>  
>  int workqueue_create(struct workqueue *wq, void *wq_ctx,
>  		unsigned int nr_workers);
> +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
> +		unsigned int nr_workers, unsigned int max_queue);
>  int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
>  		uint32_t index, void *arg);
>  int workqueue_terminate(struct workqueue *wq);
> -- 
> 2.28.0
>
Dave Chinner Oct. 26, 2020, 10:29 p.m. UTC | #4
On Sat, Oct 24, 2020 at 09:41:14PM -0700, Darrick J. Wong wrote:
> On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> > @@ -140,6 +164,7 @@ workqueue_add(
> >  
> >  	/* Now queue the new work structure to the work queue. */
> >  	pthread_mutex_lock(&wq->lock);
> > +restart:
> >  	if (wq->next_item == NULL) {
> >  		assert(wq->item_count == 0);
> >  		ret = -pthread_cond_signal(&wq->wakeup);
> > @@ -150,6 +175,16 @@ workqueue_add(
> >  		}
> >  		wq->next_item = wi;
> >  	} else {
> > +		/* throttle on a full queue if configured */
> > +		if (wq->max_queued && wq->item_count == wq->max_queued) {
> > +			pthread_cond_wait(&wq->queue_full, &wq->lock);
> 
> I ported xfs_scrub to use max_queued for the inode scanner, and got a
> hang here.  It uses two workqueues -- the first is an unbouned workqueue
> that receives one work item per AG in which each work item calls
> INUMBERS, creates a work item for the returned inode chunk, and throws
> it at the second workqueue.  The second workqueue is a bounded workqueue
> that calls BULKSTAT on the INUMBERS work item and then calls the
> iteration function on each bulkstat record returned.
> 
> The hang happens when the inumbers workqueue has more than one thread
> running.

IIUC, that means you have multiple producer threads? IIRC, he usage
in this patchset is single producer, so it won't hit this problem...

> Both* threads notice the full workqueue and wait on
> queue_full.  One of the workers in the second workqueue goes to pull off
> the next work item, ends up in this if body, signals one of the sleeping
> threads, and starts calling bulkstat.
> 
> In the time it takes to wake up the sleeping thread from wq 1, the
> second workqueue pulls far enough ahead that the single thread from wq1
> never manages to fill wq2 again.  Often, the wq1 thread was sleeping so
> that it could add the last inode chunk of that AG to wq2.  We therefore
> never wake up the *other* sleeping thread from wq1, and the whole app
> stalls.
> 
> I dunno if that's a sane way to structure an inumbers/bulkstat scan, but
> it seemed reasonable to me.  I can envision two possible fixes here: (1)
> use pthread_cond_broadcast to wake everything up; or (2) always call
> pthread_cond_wait when we pull a work item off the queue.  Thoughts?

pthread_cond_broadcast() makes more sense, but I suspect there will
be other issues with multiple producers that render the throttling
ineffective. I suspect supporting multiple producers should be a
separate patchset...

Cheers,

Dave.
Darrick J. Wong Oct. 26, 2020, 10:40 p.m. UTC | #5
On Tue, Oct 27, 2020 at 09:29:43AM +1100, Dave Chinner wrote:
> On Sat, Oct 24, 2020 at 09:41:14PM -0700, Darrick J. Wong wrote:
> > On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> > > @@ -140,6 +164,7 @@ workqueue_add(
> > >  
> > >  	/* Now queue the new work structure to the work queue. */
> > >  	pthread_mutex_lock(&wq->lock);
> > > +restart:
> > >  	if (wq->next_item == NULL) {
> > >  		assert(wq->item_count == 0);
> > >  		ret = -pthread_cond_signal(&wq->wakeup);
> > > @@ -150,6 +175,16 @@ workqueue_add(
> > >  		}
> > >  		wq->next_item = wi;
> > >  	} else {
> > > +		/* throttle on a full queue if configured */
> > > +		if (wq->max_queued && wq->item_count == wq->max_queued) {
> > > +			pthread_cond_wait(&wq->queue_full, &wq->lock);
> > 
> > I ported xfs_scrub to use max_queued for the inode scanner, and got a
> > hang here.  It uses two workqueues -- the first is an unbouned workqueue
> > that receives one work item per AG in which each work item calls
> > INUMBERS, creates a work item for the returned inode chunk, and throws
> > it at the second workqueue.  The second workqueue is a bounded workqueue
> > that calls BULKSTAT on the INUMBERS work item and then calls the
> > iteration function on each bulkstat record returned.
> > 
> > The hang happens when the inumbers workqueue has more than one thread
> > running.
> 
> IIUC, that means you have multiple producer threads? IIRC, he usage
> in this patchset is single producer, so it won't hit this problem...

Right, there are multiple producers, because that seemed like fun.
At this stage, I'm merely using this patch in anger (as it were) to
prototype a fix to scrub.

I'm not even sure it's a reasonable enhancement for xfs_scrub since each
of those bulkstat workers will then fight with the inumbers workers for
the AGI, but so it goes.  It does seem to eliminate the problem of one
thread working hard on an AG that has one huge fragmented file while the
other threads sit idle, but otherwise I mostly just see more buffer lock
contention.  The total runtime doesn't change on more balanced
filesystems, at least. :P

> > Both* threads notice the full workqueue and wait on
> > queue_full.  One of the workers in the second workqueue goes to pull off
> > the next work item, ends up in this if body, signals one of the sleeping
> > threads, and starts calling bulkstat.
> > 
> > In the time it takes to wake up the sleeping thread from wq 1, the
> > second workqueue pulls far enough ahead that the single thread from wq1
> > never manages to fill wq2 again.  Often, the wq1 thread was sleeping so
> > that it could add the last inode chunk of that AG to wq2.  We therefore
> > never wake up the *other* sleeping thread from wq1, and the whole app
> > stalls.
> > 
> > I dunno if that's a sane way to structure an inumbers/bulkstat scan, but
> > it seemed reasonable to me.  I can envision two possible fixes here: (1)
> > use pthread_cond_broadcast to wake everything up; or (2) always call
> > pthread_cond_wait when we pull a work item off the queue.  Thoughts?
> 
> pthread_cond_broadcast() makes more sense, but I suspect there will
> be other issues with multiple producers that render the throttling
> ineffective. I suspect supporting multiple producers should be a
> separate patchset...

<nod> Making change (2) seems to work for multiple producers, but I
guess I'll keep poking at perf to see if I discover anything exciting.

--D

> 
> Cheers,
> 
> Dave.
> -- 
> Dave Chinner
> david@fromorbit.com
Dave Chinner Oct. 26, 2020, 10:57 p.m. UTC | #6
On Mon, Oct 26, 2020 at 03:40:51PM -0700, Darrick J. Wong wrote:
> On Tue, Oct 27, 2020 at 09:29:43AM +1100, Dave Chinner wrote:
> > On Sat, Oct 24, 2020 at 09:41:14PM -0700, Darrick J. Wong wrote:
> > > On Thu, Oct 22, 2020 at 04:15:31PM +1100, Dave Chinner wrote:
> > > > @@ -140,6 +164,7 @@ workqueue_add(
> > > >  
> > > >  	/* Now queue the new work structure to the work queue. */
> > > >  	pthread_mutex_lock(&wq->lock);
> > > > +restart:
> > > >  	if (wq->next_item == NULL) {
> > > >  		assert(wq->item_count == 0);
> > > >  		ret = -pthread_cond_signal(&wq->wakeup);
> > > > @@ -150,6 +175,16 @@ workqueue_add(
> > > >  		}
> > > >  		wq->next_item = wi;
> > > >  	} else {
> > > > +		/* throttle on a full queue if configured */
> > > > +		if (wq->max_queued && wq->item_count == wq->max_queued) {
> > > > +			pthread_cond_wait(&wq->queue_full, &wq->lock);
> > > 
> > > I ported xfs_scrub to use max_queued for the inode scanner, and got a
> > > hang here.  It uses two workqueues -- the first is an unbouned workqueue
> > > that receives one work item per AG in which each work item calls
> > > INUMBERS, creates a work item for the returned inode chunk, and throws
> > > it at the second workqueue.  The second workqueue is a bounded workqueue
> > > that calls BULKSTAT on the INUMBERS work item and then calls the
> > > iteration function on each bulkstat record returned.
> > > 
> > > The hang happens when the inumbers workqueue has more than one thread
> > > running.
> > 
> > IIUC, that means you have multiple producer threads? IIRC, he usage
> > in this patchset is single producer, so it won't hit this problem...
> 
> Right, there are multiple producers, because that seemed like fun.
> At this stage, I'm merely using this patch in anger (as it were) to
> prototype a fix to scrub.
> 
> I'm not even sure it's a reasonable enhancement for xfs_scrub since each
> of those bulkstat workers will then fight with the inumbers workers for
> the AGI, but so it goes.  It does seem to eliminate the problem of one
> thread working hard on an AG that has one huge fragmented file while the
> other threads sit idle, but otherwise I mostly just see more buffer lock
> contention.

Yeah, that's exactly the problem it solves for phase6, too. i.e. it
stops a single huge directory from preventing other directories in
that AG from being processed even when everything else is done.

> The total runtime doesn't change on more balanced
> filesystems, at least. :P

That's a start :)

> > > I dunno if that's a sane way to structure an inumbers/bulkstat scan, but
> > > it seemed reasonable to me.  I can envision two possible fixes here: (1)
> > > use pthread_cond_broadcast to wake everything up; or (2) always call
> > > pthread_cond_wait when we pull a work item off the queue.  Thoughts?
> > 
> > pthread_cond_broadcast() makes more sense, but I suspect there will
> > be other issues with multiple producers that render the throttling
> > ineffective. I suspect supporting multiple producers should be a
> > separate patchset...
> 
> <nod> Making change (2) seems to work for multiple producers, but I
> guess I'll keep poking at perf to see if I discover anything exciting.

I'll look at the multiple producer thing in a bit more detail later
today...

Cheers,

Dave.
diff mbox series

Patch

diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
index fe3de4289379..e42b2a2f678b 100644
--- a/libfrog/workqueue.c
+++ b/libfrog/workqueue.c
@@ -40,13 +40,21 @@  workqueue_thread(void *arg)
 		}
 
 		/*
-		 *  Dequeue work from the head of the list.
+		 *  Dequeue work from the head of the list. If the queue was
+		 *  full then send a wakeup if we're configured to do so.
 		 */
 		assert(wq->item_count > 0);
+		if (wq->max_queued && wq->item_count == wq->max_queued)
+			pthread_cond_signal(&wq->queue_full);
+
 		wi = wq->next_item;
 		wq->next_item = wi->next;
 		wq->item_count--;
 
+		if (wq->max_queued && wq->next_item) {
+			/* more work, wake up another worker */
+			pthread_cond_signal(&wq->wakeup);
+		}
 		pthread_mutex_unlock(&wq->lock);
 
 		(wi->function)(wi->queue, wi->index, wi->arg);
@@ -58,10 +66,11 @@  workqueue_thread(void *arg)
 
 /* Allocate a work queue and threads.  Returns zero or negative error code. */
 int
-workqueue_create(
+workqueue_create_bound(
 	struct workqueue	*wq,
 	void			*wq_ctx,
-	unsigned int		nr_workers)
+	unsigned int		nr_workers,
+	unsigned int		max_queue)
 {
 	unsigned int		i;
 	int			err = 0;
@@ -70,12 +79,16 @@  workqueue_create(
 	err = -pthread_cond_init(&wq->wakeup, NULL);
 	if (err)
 		return err;
+	err = -pthread_cond_init(&wq->queue_full, NULL);
+	if (err)
+		goto out_wake;
 	err = -pthread_mutex_init(&wq->lock, NULL);
 	if (err)
 		goto out_cond;
 
 	wq->wq_ctx = wq_ctx;
 	wq->thread_count = nr_workers;
+	wq->max_queued = max_queue;
 	wq->threads = malloc(nr_workers * sizeof(pthread_t));
 	if (!wq->threads) {
 		err = -errno;
@@ -102,10 +115,21 @@  workqueue_create(
 out_mutex:
 	pthread_mutex_destroy(&wq->lock);
 out_cond:
+	pthread_cond_destroy(&wq->queue_full);
+out_wake:
 	pthread_cond_destroy(&wq->wakeup);
 	return err;
 }
 
+int
+workqueue_create(
+	struct workqueue	*wq,
+	void			*wq_ctx,
+	unsigned int		nr_workers)
+{
+	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
+}
+
 /*
  * Create a work item consisting of a function and some arguments and schedule
  * the work item to be run via the thread pool.  Returns zero or a negative
@@ -140,6 +164,7 @@  workqueue_add(
 
 	/* Now queue the new work structure to the work queue. */
 	pthread_mutex_lock(&wq->lock);
+restart:
 	if (wq->next_item == NULL) {
 		assert(wq->item_count == 0);
 		ret = -pthread_cond_signal(&wq->wakeup);
@@ -150,6 +175,16 @@  workqueue_add(
 		}
 		wq->next_item = wi;
 	} else {
+		/* throttle on a full queue if configured */
+		if (wq->max_queued && wq->item_count == wq->max_queued) {
+			pthread_cond_wait(&wq->queue_full, &wq->lock);
+			/*
+			 * Queue might be empty or even still full by the time
+			 * we get the lock back, so restart the lookup so we do
+			 * the right thing with the current state of the queue.
+			 */
+			goto restart;
+		}
 		wq->last_item->next = wi;
 	}
 	wq->last_item = wi;
@@ -201,5 +236,6 @@  workqueue_destroy(
 	free(wq->threads);
 	pthread_mutex_destroy(&wq->lock);
 	pthread_cond_destroy(&wq->wakeup);
+	pthread_cond_destroy(&wq->queue_full);
 	memset(wq, 0, sizeof(*wq));
 }
diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h
index a56d1cf14081..a9c108d0e66a 100644
--- a/libfrog/workqueue.h
+++ b/libfrog/workqueue.h
@@ -31,10 +31,14 @@  struct workqueue {
 	unsigned int		thread_count;
 	bool			terminate;
 	bool			terminated;
+	int			max_queued;
+	pthread_cond_t		queue_full;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
 		unsigned int nr_workers);
+int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
+		unsigned int nr_workers, unsigned int max_queue);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
 		uint32_t index, void *arg);
 int workqueue_terminate(struct workqueue *wq);