diff mbox series

[4/7] workqueue: bound maximum queue depth

Message ID 20181030112043.6034-5-david@fromorbit.com
State Superseded
Headers show
Series xfs_repair: scale to 150,000 iops | expand

Commit Message

Dave Chinner Oct. 30, 2018, 11:20 a.m. UTC
From: Dave Chinner <dchinner@redhat.com>

Existing users of workqueues have bound maximum queue depths in
their external algorithms (e.g. prefetch counts). For parallelising
work that doesn't have an external bound, allow workqueues to
throttle incoming requests at a maximum bound. bounded workqueues
also need to distribute work over all worker threads themselves as
there is no external bounding or worker function throttling
provided.

Existing callers are not throttled and retain direct control of
worker threads, only users of the new create interface will be
throttled and concurrency managed.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 include/workqueue.h |  4 ++++
 libfrog/workqueue.c | 30 +++++++++++++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)

Comments

Darrick J. Wong Oct. 30, 2018, 5:58 p.m. UTC | #1
On Tue, Oct 30, 2018 at 10:20:40PM +1100, Dave Chinner wrote:
> From: Dave Chinner <dchinner@redhat.com>
> 
> Existing users of workqueues have bound maximum queue depths in
> their external algorithms (e.g. prefetch counts). For parallelising
> work that doesn't have an external bound, allow workqueues to
> throttle incoming requests at a maximum bound. bounded workqueues
> also need to distribute work over all worker threads themselves as
> there is no external bounding or worker function throttling
> provided.
> 
> Existing callers are not throttled and retain direct control of
> worker threads, only users of the new create interface will be
> throttled and concurrency managed.
> 
> Signed-off-by: Dave Chinner <dchinner@redhat.com>
> ---
>  include/workqueue.h |  4 ++++
>  libfrog/workqueue.c | 30 +++++++++++++++++++++++++++---
>  2 files changed, 31 insertions(+), 3 deletions(-)
> 
> diff --git a/include/workqueue.h b/include/workqueue.h
> index c45dc4fbcf64..504da9403b85 100644
> --- a/include/workqueue.h
> +++ b/include/workqueue.h
> @@ -30,10 +30,14 @@ struct workqueue {
>  	unsigned int		item_count;
>  	unsigned int		thread_count;
>  	bool			terminate;
> +	int			max_queued;
> +	pthread_cond_t		queue_full;
>  };
>  
>  int workqueue_create(struct workqueue *wq, void *wq_ctx,
>  		unsigned int nr_workers);
> +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
> +		unsigned int nr_workers, int max_queue);

What does negative max_queue mean?

>  int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
>  		uint32_t index, void *arg);
>  void workqueue_destroy(struct workqueue *wq);
> diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> index 7311477374b4..8fe0dc7249f5 100644
> --- a/libfrog/workqueue.c
> +++ b/libfrog/workqueue.c
> @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
>  		}
>  
>  		/*
> -		 *  Dequeue work from the head of the list.
> +		 *  Dequeue work from the head of the list. If the queue was
> +		 *  full then send a wakeup if we're configured to do so.
>  		 */
>  		assert(wq->item_count > 0);
> +		if (wq->max_queued && wq->item_count == wq->max_queued)
> +			pthread_cond_signal(&wq->queue_full);
> +
>  		wi = wq->next_item;
>  		wq->next_item = wi->next;
>  		wq->item_count--;
>  
> +		if (wq->max_queued && wq->next_item) {
> +			/* more work, wake up another worker */
> +			pthread_cond_signal(&wq->wakeup);
> +		}

It seems a little funny to me that the worker thread wakes up other
worker threads when there is more work to do (vs. workqueue_add which
actually added more work)...

--D

>  		pthread_mutex_unlock(&wq->lock);
>  
>  		(wi->function)(wi->queue, wi->index, wi->arg);
> @@ -58,22 +66,25 @@ workqueue_thread(void *arg)
>  
>  /* Allocate a work queue and threads. */
>  int
> -workqueue_create(
> +workqueue_create_bound(
>  	struct workqueue	*wq,
>  	void			*wq_ctx,
> -	unsigned int		nr_workers)
> +	unsigned int		nr_workers,
> +	int			max_queue)
>  {
>  	unsigned int		i;
>  	int			err = 0;
>  
>  	memset(wq, 0, sizeof(*wq));
>  	pthread_cond_init(&wq->wakeup, NULL);
> +	pthread_cond_init(&wq->queue_full, NULL);
>  	pthread_mutex_init(&wq->lock, NULL);
>  
>  	wq->wq_ctx = wq_ctx;
>  	wq->thread_count = nr_workers;
>  	wq->threads = malloc(nr_workers * sizeof(pthread_t));
>  	wq->terminate = false;
> +	wq->max_queued = max_queue;
>  
>  	for (i = 0; i < nr_workers; i++) {
>  		err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
> @@ -87,6 +98,15 @@ workqueue_create(
>  	return err;
>  }
>  
> +int
> +workqueue_create(
> +	struct workqueue	*wq,
> +	void			*wq_ctx,
> +	unsigned int		nr_workers)
> +{
> +	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
> +}
> +
>  /*
>   * Create a work item consisting of a function and some arguments and
>   * schedule the work item to be run via the thread pool.
> @@ -122,6 +142,9 @@ workqueue_add(
>  		assert(wq->item_count == 0);
>  		pthread_cond_signal(&wq->wakeup);
>  	} else {
> +		/* throttle on a full queue if configured */
> +		if (wq->max_queued && wq->item_count == wq->max_queued)
> +			pthread_cond_wait(&wq->queue_full, &wq->lock);
>  		wq->last_item->next = wi;
>  	}
>  	wq->last_item = wi;
> @@ -153,5 +176,6 @@ workqueue_destroy(
>  	free(wq->threads);
>  	pthread_mutex_destroy(&wq->lock);
>  	pthread_cond_destroy(&wq->wakeup);
> +	pthread_cond_destroy(&wq->queue_full);
>  	memset(wq, 0, sizeof(*wq));
>  }
> -- 
> 2.19.1
>
Dave Chinner Oct. 30, 2018, 8:53 p.m. UTC | #2
On Tue, Oct 30, 2018 at 10:58:39AM -0700, Darrick J. Wong wrote:
> On Tue, Oct 30, 2018 at 10:20:40PM +1100, Dave Chinner wrote:
> > From: Dave Chinner <dchinner@redhat.com>
> > 
> > Existing users of workqueues have bound maximum queue depths in
> > their external algorithms (e.g. prefetch counts). For parallelising
> > work that doesn't have an external bound, allow workqueues to
> > throttle incoming requests at a maximum bound. bounded workqueues
> > also need to distribute work over all worker threads themselves as
> > there is no external bounding or worker function throttling
> > provided.
> > 
> > Existing callers are not throttled and retain direct control of
> > worker threads, only users of the new create interface will be
> > throttled and concurrency managed.
> > 
> > Signed-off-by: Dave Chinner <dchinner@redhat.com>
> > ---
> >  include/workqueue.h |  4 ++++
> >  libfrog/workqueue.c | 30 +++++++++++++++++++++++++++---
> >  2 files changed, 31 insertions(+), 3 deletions(-)
> > 
> > diff --git a/include/workqueue.h b/include/workqueue.h
> > index c45dc4fbcf64..504da9403b85 100644
> > --- a/include/workqueue.h
> > +++ b/include/workqueue.h
> > @@ -30,10 +30,14 @@ struct workqueue {
> >  	unsigned int		item_count;
> >  	unsigned int		thread_count;
> >  	bool			terminate;
> > +	int			max_queued;
> > +	pthread_cond_t		queue_full;
> >  };
> >  
> >  int workqueue_create(struct workqueue *wq, void *wq_ctx,
> >  		unsigned int nr_workers);
> > +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
> > +		unsigned int nr_workers, int max_queue);
> 
> What does negative max_queue mean?

Nothing. it can be made unsigned.

> 
> >  int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
> >  		uint32_t index, void *arg);
> >  void workqueue_destroy(struct workqueue *wq);
> > diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> > index 7311477374b4..8fe0dc7249f5 100644
> > --- a/libfrog/workqueue.c
> > +++ b/libfrog/workqueue.c
> > @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
> >  		}
> >  
> >  		/*
> > -		 *  Dequeue work from the head of the list.
> > +		 *  Dequeue work from the head of the list. If the queue was
> > +		 *  full then send a wakeup if we're configured to do so.
> >  		 */
> >  		assert(wq->item_count > 0);
> > +		if (wq->max_queued && wq->item_count == wq->max_queued)
> > +			pthread_cond_signal(&wq->queue_full);
> > +
> >  		wi = wq->next_item;
> >  		wq->next_item = wi->next;
> >  		wq->item_count--;
> >  
> > +		if (wq->max_queued && wq->next_item) {
> > +			/* more work, wake up another worker */
> > +			pthread_cond_signal(&wq->wakeup);
> > +		}
> 
> It seems a little funny to me that the worker thread wakes up other
> worker threads when there is more work to do (vs. workqueue_add which
> actually added more work)...

The problem is that workqueue_add() delegates all concurrency and
queue throttling to the worker thread callback function. The work
queue doesn't function as a "queue" at all - it functions as a
method of starting long running functions that do there own work
queuing and throttling.  Hence these externally co-ordinated worker
threads only require kicking off when the first work item is queued,
otherwise they completely manage themselves and never return to the
worker thread itself until they are done.

This is one of the reasons the prefetch code is so damn complex - it
has to do all this queue throttling and worker thread co-ordination
itself with it's own infrastructure, rather than just having a
thread walking the block maps calling "queue_work" on each object it
needs read. Instead it's got counting semaphores,
start/done/restart/maybe start/maybe stop logic to manage the queue
depth, etc.

What the above change does is enable us to use workqueues for
queuing small pieces of work that need to be processed, and allows
them to be processed concurrently without the caller having to do
anything to manage that concurrency. This way the concurrency will
grow automatically to the maximum bound of the workqueue and we
don't have to worry about doing any extra wakeups or tracking
anything in workqueue_add...

Cheers,

Dave.
Brian Foster Oct. 31, 2018, 5:14 p.m. UTC | #3
On Wed, Oct 31, 2018 at 07:53:20AM +1100, Dave Chinner wrote:
> On Tue, Oct 30, 2018 at 10:58:39AM -0700, Darrick J. Wong wrote:
> > On Tue, Oct 30, 2018 at 10:20:40PM +1100, Dave Chinner wrote:
> > > From: Dave Chinner <dchinner@redhat.com>
> > > 
> > > Existing users of workqueues have bound maximum queue depths in
> > > their external algorithms (e.g. prefetch counts). For parallelising
> > > work that doesn't have an external bound, allow workqueues to
> > > throttle incoming requests at a maximum bound. bounded workqueues
> > > also need to distribute work over all worker threads themselves as
> > > there is no external bounding or worker function throttling
> > > provided.
> > > 
> > > Existing callers are not throttled and retain direct control of
> > > worker threads, only users of the new create interface will be
> > > throttled and concurrency managed.
> > > 

Might be helpful to add a sentence or two on what this is for.

> > > Signed-off-by: Dave Chinner <dchinner@redhat.com>
> > > ---
> > >  include/workqueue.h |  4 ++++
> > >  libfrog/workqueue.c | 30 +++++++++++++++++++++++++++---
> > >  2 files changed, 31 insertions(+), 3 deletions(-)
> > > 
> > > diff --git a/include/workqueue.h b/include/workqueue.h
> > > index c45dc4fbcf64..504da9403b85 100644
> > > --- a/include/workqueue.h
> > > +++ b/include/workqueue.h
> > > @@ -30,10 +30,14 @@ struct workqueue {
> > >  	unsigned int		item_count;
> > >  	unsigned int		thread_count;
> > >  	bool			terminate;
> > > +	int			max_queued;
> > > +	pthread_cond_t		queue_full;
> > >  };
> > >  
> > >  int workqueue_create(struct workqueue *wq, void *wq_ctx,
> > >  		unsigned int nr_workers);
> > > +int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
> > > +		unsigned int nr_workers, int max_queue);
> > 
> > What does negative max_queue mean?
> 
> Nothing. it can be made unsigned.
> 
> > 
> > >  int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
> > >  		uint32_t index, void *arg);
> > >  void workqueue_destroy(struct workqueue *wq);
> > > diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
> > > index 7311477374b4..8fe0dc7249f5 100644
> > > --- a/libfrog/workqueue.c
> > > +++ b/libfrog/workqueue.c
> > > @@ -40,13 +40,21 @@ workqueue_thread(void *arg)
> > >  		}
> > >  
> > >  		/*
> > > -		 *  Dequeue work from the head of the list.
> > > +		 *  Dequeue work from the head of the list. If the queue was
> > > +		 *  full then send a wakeup if we're configured to do so.
> > >  		 */
> > >  		assert(wq->item_count > 0);
> > > +		if (wq->max_queued && wq->item_count == wq->max_queued)
> > > +			pthread_cond_signal(&wq->queue_full);
> > > +
> > >  		wi = wq->next_item;
> > >  		wq->next_item = wi->next;
> > >  		wq->item_count--;
> > >  
> > > +		if (wq->max_queued && wq->next_item) {
> > > +			/* more work, wake up another worker */
> > > +			pthread_cond_signal(&wq->wakeup);
> > > +		}
> > 
> > It seems a little funny to me that the worker thread wakes up other
> > worker threads when there is more work to do (vs. workqueue_add which
> > actually added more work)...
> 

FWIW, I had the same thought when looking at this patch..

> The problem is that workqueue_add() delegates all concurrency and
> queue throttling to the worker thread callback function. The work
> queue doesn't function as a "queue" at all - it functions as a
> method of starting long running functions that do there own work
> queuing and throttling.  Hence these externally co-ordinated worker
> threads only require kicking off when the first work item is queued,
> otherwise they completely manage themselves and never return to the
> worker thread itself until they are done.
> 

Ok, so the existing workqueue client code doesn't need additional
wakeups..

> This is one of the reasons the prefetch code is so damn complex - it
> has to do all this queue throttling and worker thread co-ordination
> itself with it's own infrastructure, rather than just having a
> thread walking the block maps calling "queue_work" on each object it
> needs read. Instead it's got counting semaphores,
> start/done/restart/maybe start/maybe stop logic to manage the queue
> depth, etc.
> 

It's been a while since I've looked at that cache prefetch code, but
that makes sense..

> What the above change does is enable us to use workqueues for
> queuing small pieces of work that need to be processed, and allows
> them to be processed concurrently without the caller having to do
> anything to manage that concurrency. This way the concurrency will
> grow automatically to the maximum bound of the workqueue and we
> don't have to worry about doing any extra wakeups or tracking
> anything in workqueue_add...
> 

Also makes sense, but I'm not sure this answers the original question:
why not do this in workqueue_add()? It looks like the current
workqueue_add() does exactly what you describe here in that it only
kicks the worker thread when the initial item is added. The above
explains why that's a problem, but why can't workqueue_add() kick the
worker on every add to a bounded queue (or any queue, if that doesn't
cause problems for !bounded)?

Also, have you considered whether pthread_cond_broadcast() may be more
appropriate than pthread_cond_signal() in the described use case with
multiple workers? The man page for the latter says it "unblocks at least
one of the threads that are blocked on the specified condition
variable," but that isn't exactly the most helpful description. :P

Brian

> Cheers,
> 
> Dave.
> -- 
> Dave Chinner
> david@fromorbit.com
diff mbox series

Patch

diff --git a/include/workqueue.h b/include/workqueue.h
index c45dc4fbcf64..504da9403b85 100644
--- a/include/workqueue.h
+++ b/include/workqueue.h
@@ -30,10 +30,14 @@  struct workqueue {
 	unsigned int		item_count;
 	unsigned int		thread_count;
 	bool			terminate;
+	int			max_queued;
+	pthread_cond_t		queue_full;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
 		unsigned int nr_workers);
+int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
+		unsigned int nr_workers, int max_queue);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
 		uint32_t index, void *arg);
 void workqueue_destroy(struct workqueue *wq);
diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
index 7311477374b4..8fe0dc7249f5 100644
--- a/libfrog/workqueue.c
+++ b/libfrog/workqueue.c
@@ -40,13 +40,21 @@  workqueue_thread(void *arg)
 		}
 
 		/*
-		 *  Dequeue work from the head of the list.
+		 *  Dequeue work from the head of the list. If the queue was
+		 *  full then send a wakeup if we're configured to do so.
 		 */
 		assert(wq->item_count > 0);
+		if (wq->max_queued && wq->item_count == wq->max_queued)
+			pthread_cond_signal(&wq->queue_full);
+
 		wi = wq->next_item;
 		wq->next_item = wi->next;
 		wq->item_count--;
 
+		if (wq->max_queued && wq->next_item) {
+			/* more work, wake up another worker */
+			pthread_cond_signal(&wq->wakeup);
+		}
 		pthread_mutex_unlock(&wq->lock);
 
 		(wi->function)(wi->queue, wi->index, wi->arg);
@@ -58,22 +66,25 @@  workqueue_thread(void *arg)
 
 /* Allocate a work queue and threads. */
 int
-workqueue_create(
+workqueue_create_bound(
 	struct workqueue	*wq,
 	void			*wq_ctx,
-	unsigned int		nr_workers)
+	unsigned int		nr_workers,
+	int			max_queue)
 {
 	unsigned int		i;
 	int			err = 0;
 
 	memset(wq, 0, sizeof(*wq));
 	pthread_cond_init(&wq->wakeup, NULL);
+	pthread_cond_init(&wq->queue_full, NULL);
 	pthread_mutex_init(&wq->lock, NULL);
 
 	wq->wq_ctx = wq_ctx;
 	wq->thread_count = nr_workers;
 	wq->threads = malloc(nr_workers * sizeof(pthread_t));
 	wq->terminate = false;
+	wq->max_queued = max_queue;
 
 	for (i = 0; i < nr_workers; i++) {
 		err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -87,6 +98,15 @@  workqueue_create(
 	return err;
 }
 
+int
+workqueue_create(
+	struct workqueue	*wq,
+	void			*wq_ctx,
+	unsigned int		nr_workers)
+{
+	return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
+}
+
 /*
  * Create a work item consisting of a function and some arguments and
  * schedule the work item to be run via the thread pool.
@@ -122,6 +142,9 @@  workqueue_add(
 		assert(wq->item_count == 0);
 		pthread_cond_signal(&wq->wakeup);
 	} else {
+		/* throttle on a full queue if configured */
+		if (wq->max_queued && wq->item_count == wq->max_queued)
+			pthread_cond_wait(&wq->queue_full, &wq->lock);
 		wq->last_item->next = wi;
 	}
 	wq->last_item = wi;
@@ -153,5 +176,6 @@  workqueue_destroy(
 	free(wq->threads);
 	pthread_mutex_destroy(&wq->lock);
 	pthread_cond_destroy(&wq->wakeup);
+	pthread_cond_destroy(&wq->queue_full);
 	memset(wq, 0, sizeof(*wq));
 }