@@ -40,13 +40,21 @@ workqueue_thread(void *arg)
}
/*
- * Dequeue work from the head of the list.
+ * Dequeue work from the head of the list. If the queue was
+ * full then send a wakeup if we're configured to do so.
*/
assert(wq->item_count > 0);
+ if (wq->max_queued)
+ pthread_cond_broadcast(&wq->queue_full);
+
wi = wq->next_item;
wq->next_item = wi->next;
wq->item_count--;
+ if (wq->max_queued && wq->next_item) {
+ /* more work, wake up another worker */
+ pthread_cond_signal(&wq->wakeup);
+ }
pthread_mutex_unlock(&wq->lock);
(wi->function)(wi->queue, wi->index, wi->arg);
@@ -58,10 +66,11 @@ workqueue_thread(void *arg)
/* Allocate a work queue and threads. Returns zero or negative error code. */
int
-workqueue_create(
+workqueue_create_bound(
struct workqueue *wq,
void *wq_ctx,
- unsigned int nr_workers)
+ unsigned int nr_workers,
+ unsigned int max_queue)
{
unsigned int i;
int err = 0;
@@ -70,12 +79,16 @@ workqueue_create(
err = -pthread_cond_init(&wq->wakeup, NULL);
if (err)
return err;
+ err = -pthread_cond_init(&wq->queue_full, NULL);
+ if (err)
+ goto out_wake;
err = -pthread_mutex_init(&wq->lock, NULL);
if (err)
goto out_cond;
wq->wq_ctx = wq_ctx;
wq->thread_count = nr_workers;
+ wq->max_queued = max_queue;
wq->threads = malloc(nr_workers * sizeof(pthread_t));
if (!wq->threads) {
err = -errno;
@@ -102,10 +115,21 @@ workqueue_create(
out_mutex:
pthread_mutex_destroy(&wq->lock);
out_cond:
+ pthread_cond_destroy(&wq->queue_full);
+out_wake:
pthread_cond_destroy(&wq->wakeup);
return err;
}
+int
+workqueue_create(
+ struct workqueue *wq,
+ void *wq_ctx,
+ unsigned int nr_workers)
+{
+ return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
+}
+
/*
* Create a work item consisting of a function and some arguments and schedule
* the work item to be run via the thread pool. Returns zero or a negative
@@ -140,6 +164,7 @@ workqueue_add(
/* Now queue the new work structure to the work queue. */
pthread_mutex_lock(&wq->lock);
+restart:
if (wq->next_item == NULL) {
assert(wq->item_count == 0);
ret = -pthread_cond_signal(&wq->wakeup);
@@ -150,6 +175,16 @@ workqueue_add(
}
wq->next_item = wi;
} else {
+ /* throttle on a full queue if configured */
+ if (wq->max_queued && wq->item_count == wq->max_queued) {
+ pthread_cond_wait(&wq->queue_full, &wq->lock);
+ /*
+ * Queue might be empty or even still full by the time
+ * we get the lock back, so restart the lookup so we do
+ * the right thing with the current state of the queue.
+ */
+ goto restart;
+ }
wq->last_item->next = wi;
}
wq->last_item = wi;
@@ -201,5 +236,6 @@ workqueue_destroy(
free(wq->threads);
pthread_mutex_destroy(&wq->lock);
pthread_cond_destroy(&wq->wakeup);
+ pthread_cond_destroy(&wq->queue_full);
memset(wq, 0, sizeof(*wq));
}
@@ -31,10 +31,14 @@ struct workqueue {
unsigned int thread_count;
bool terminate;
bool terminated;
+ int max_queued;
+ pthread_cond_t queue_full;
};
int workqueue_create(struct workqueue *wq, void *wq_ctx,
unsigned int nr_workers);
+int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
+ unsigned int nr_workers, unsigned int max_queue);
int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
uint32_t index, void *arg);
int workqueue_terminate(struct workqueue *wq);