@@ -82,6 +82,7 @@ workqueue_create(
goto out_mutex;
}
wq->terminate = false;
+ wq->terminated = false;
for (i = 0; i < nr_workers; i++) {
err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -119,6 +120,8 @@ workqueue_add(
struct workqueue_item *wi;
int ret;
+ assert(!wq->terminated);
+
if (wq->thread_count == 0) {
func(wq, index, arg);
return 0;
@@ -157,22 +160,42 @@ workqueue_add(
/*
* Wait for all pending work items to be processed and tear down the
- * workqueue.
+ * workqueue thread pool.
*/
-void
-workqueue_destroy(
+int
+workqueue_terminate(
struct workqueue *wq)
{
unsigned int i;
+ int ret;
+
+ pthread_mutex_lock(&wq->lock);
+ wq->terminate = true;
+ pthread_mutex_unlock(&wq->lock);
+
+ ret = pthread_cond_broadcast(&wq->wakeup);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < wq->thread_count; i++) {
+ ret = pthread_join(wq->threads[i], NULL);
+ if (ret)
+ return ret;
+ }
pthread_mutex_lock(&wq->lock);
- wq->terminate = 1;
+ wq->terminated = true;
pthread_mutex_unlock(&wq->lock);
- pthread_cond_broadcast(&wq->wakeup);
+ return 0;
+}
- for (i = 0; i < wq->thread_count; i++)
- pthread_join(wq->threads[i], NULL);
+/* Tear down the workqueue. */
+void
+workqueue_destroy(
+ struct workqueue *wq)
+{
+ assert(wq->terminated);
free(wq->threads);
pthread_mutex_destroy(&wq->lock);
@@ -30,12 +30,14 @@ struct workqueue {
unsigned int item_count;
unsigned int thread_count;
bool terminate;
+ bool terminated;
};
int workqueue_create(struct workqueue *wq, void *wq_ctx,
unsigned int nr_workers);
int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
uint32_t index, void *arg);
+int workqueue_terminate(struct workqueue *wq);
void workqueue_destroy(struct workqueue *wq);
#endif /* __LIBFROG_WORKQUEUE_H__ */
@@ -56,5 +56,11 @@ void
destroy_work_queue(
struct workqueue *wq)
{
+ int err;
+
+ err = workqueue_terminate(wq);
+ if (err)
+ do_error(_("cannot terminate worker item, error = [%d] %s\n"),
+ err, strerror(err));
workqueue_destroy(wq);
}
@@ -102,7 +102,7 @@ xfs_count_all_inodes(
struct xfs_count_inodes *ci;
xfs_agnumber_t agno;
struct workqueue wq;
- bool moveon;
+ bool moveon = true;
int ret;
ci = calloc(1, sizeof(struct xfs_count_inodes) +
@@ -126,8 +126,17 @@ xfs_count_all_inodes(
break;
}
}
+
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ moveon = false;
+ str_liberror(ctx, ret, _("finishing icount work"));
+ }
workqueue_destroy(&wq);
+ if (!moveon)
+ goto out_free;
+
for (agno = 0; agno < ctx->mnt.fsgeom.agcount; agno++)
*count += ci->counters[agno];
moveon = ci->moveon;
@@ -256,6 +256,11 @@ xfs_scan_all_inodes(
}
}
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ si.moveon = false;
+ str_liberror(ctx, ret, _("finishing bulkstat work"));
+ }
workqueue_destroy(&wq);
return si.moveon;
@@ -161,6 +161,11 @@ xfs_scan_metadata(
}
out:
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ moveon = false;
+ str_liberror(ctx, ret, _("finishing scrub work"));
+ }
workqueue_destroy(&wq);
return moveon;
}
@@ -90,6 +90,12 @@ xfs_process_action_items(
if (!moveon)
break;
}
+
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ moveon = false;
+ str_liberror(ctx, ret, _("finishing repair work"));
+ }
workqueue_destroy(&wq);
pthread_mutex_lock(&ctx->lock);
@@ -120,6 +120,7 @@ void
read_verify_pool_flush(
struct read_verify_pool *rvp)
{
+ workqueue_terminate(&rvp->wq);
workqueue_destroy(&rvp->wq);
}
@@ -230,6 +230,11 @@ xfs_scan_all_spacemaps(
}
}
out:
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ sbx.moveon = false;
+ str_liberror(ctx, ret, _("finishing fsmap work"));
+ }
workqueue_destroy(&wq);
return sbx.moveon;
@@ -250,6 +250,11 @@ scan_fs_tree(
assert(sft.nr_dirs == 0);
pthread_mutex_unlock(&sft.lock);
+ ret = workqueue_terminate(&wq);
+ if (ret) {
+ sft.moveon = false;
+ str_liberror(ctx, ret, _("finishing directory scan work"));
+ }
out_wq:
workqueue_destroy(&wq);
return sft.moveon;