diff mbox series

[dwarves,v2,10/10] dwarf_loader: multithreading with a job/worker model

Message ID 20241213223641.564002-11-ihor.solodrai@pm.me (mailing list archive)
State Not Applicable
Headers show
Series pahole: shared ELF and faster reproducible BTF encoding | expand

Checks

Context Check Description
netdev/tree_selection success Not a local patch

Commit Message

Ihor Solodrai Dec. 13, 2024, 10:37 p.m. UTC
This is a re-implementation of an idea described in a RFC [1], and
tried in a patch there [2].

The gist of this patch is that multithreading is now contained in
dwarf_loader.c, and is implemented using a jobs queue and a pool of
worker threads. As a consequence, multithreading-related code is
removed from pahole.c.

A single-thread special case is removed: queueing setup works fine
with a single worker, which will switch between jobs as appropriate.

Code supporting previous version of the multithreading, such as
cu_state, thread_data and related functions, is also removed.

reproducible_build flag is now moot: the BTF encoding is always
reproducible with these changes.

The goal outlined in the RFC [1] - making parallel reproducible BTF
generation as fast as non-reproducible - is achieved by implementing
the requirement of ordered CU encoding (stealing) directly in the job
queue in dwarf_loader.c

The synchronization in the queue is implemented by a mutex (which
ensures consistency of queue state) and two condition variables:
job_added and job_taken. Motivation behind using condition variables
is a classic one: we want to avoid the threads checking the state of
the queue in a busy loop, competing for a single mutex.

[1] https://lore.kernel.org/dwarves/20241128012341.4081072-1-ihor.solodrai@pm.me/
[2] https://lore.kernel.org/dwarves/20241128012341.4081072-10-ihor.solodrai@pm.me/

Signed-off-by: Ihor Solodrai <ihor.solodrai@pm.me>
---
 btf_encoder.c               |   8 +-
 btf_encoder.h               |   6 +-
 btf_loader.c                |   2 +-
 ctf_loader.c                |   2 +-
 dwarf_loader.c              | 342 +++++++++++++++++++++++++-----------
 dwarves.c                   |  44 -----
 dwarves.h                   |  21 +--
 pahole.c                    | 236 +++----------------------
 pdwtags.c                   |   3 +-
 pfunct.c                    |   3 +-
 tests/reproducible_build.sh |   5 +-
 11 files changed, 281 insertions(+), 391 deletions(-)

Comments

Eduard Zingerman Dec. 17, 2024, 12:57 a.m. UTC | #1
On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote:

[...]

> +static void *dwarf_loader__worker_thread(void *arg)
> +{
> +	struct cu_processing_job *job;
> +	struct dwarf_cus *dcus = arg;
> +	bool stop = false;
> +	struct cu *cu;
> +
> +	while (!stop) {
> +		job = cus_queue__dequeue_job();
> +
> +		switch (job->type) {
> +
> +		case JOB_DECODE:
> +			cu = dwarf_loader__decode_next_cu(dcus);
> +
> +			if (cu == NULL) {
> +				free(job);
> +				stop = true;
> +				break;
> +			}
> +
> +			/* Create and enqueue a new JOB_STEAL for this decoded CU */
> +			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> +
> +			steal_job->type = JOB_STEAL;
> +			steal_job->cu = cu;
> +			cus_queue__enqueue_job(steal_job);
> +
> +			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> +			cus_queue__enqueue_job(job);
> +			break;
> +
> +		case JOB_STEAL:
> +			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
> +				goto out_abort;
> +			cus_queue__inc_next_cu_id();
> +			/* Free the job struct as it's no longer
> +			 * needed after CU has been stolen.
> +			 * dwarf_loader work for this CU is done.
> +			 */
> +			free(job);
>  			break;
>  
> -		if (dwarf_cus__process_cu(dcus, cu_die, dcu->cu, dthr->data) == DWARF_CB_ABORT)
> +		default:
> +			fprintf(stderr, "Unknown dwarf_loader job type %d\n", job->type);
>  			goto out_abort;
> +		}
>  	}
>  
> -	if (dcus->conf->thread_exit &&
> -	    dcus->conf->thread_exit(dcus->conf, dthr->data) != 0)
> +	if (dcus->error)
>  		goto out_abort;
>  
>  	return (void *)DWARF_CB_OK;
> @@ -3566,29 +3736,29 @@ out_abort:
>  	return (void *)DWARF_CB_ABORT;
>  }

There is no real need to use two conditional variables to achieve what is done here.
The "JOB_DECODE" item is already used as a "ticket" to do the decoding.
So it is possible to "emit" a fixed amount of tickets and alternate their state
between "decode"/"steal", w/o allocating new tickets.
This would allow to remove "job_taken" conditional variable and decode counters.
E.g. as in the patch below applied on top of this patch-set.

---

diff --git a/dwarf_loader.c b/dwarf_loader.c
index 6d22648..40ad27d 100644
--- a/dwarf_loader.c
+++ b/dwarf_loader.c
@@ -3453,23 +3453,10 @@ struct dwarf_cus {
 static struct {
 	pthread_mutex_t mutex;
 	pthread_cond_t job_added;
-	pthread_cond_t job_taken;
 	/* next_cu_id determines the next CU ready to be stealed
 	 * This enforces the order of CU stealing.
 	 */
 	uint32_t next_cu_id;
-	/* max_decoded_cus is a soft limit on the number of JOB_STEAL
-	 * jobs currently in the queue (this number is equal to the
-	 * number of decoded CUs held in memory). It's soft, because a
-	 * worker thread may finish decoding it's current CU after
-	 * this limit has already been reached. In such situation,
-	 * JOB_STEAL with this CU is still added to the queue,
-	 * although a worker will not pick up a new JOB_DECODE.
-	 * So the real hard limit is max_decoded_cus + nr_workers.
-	 * This variable indirectly limits the memory usage.
-	 */
-	uint16_t max_decoded_cus;
-	uint16_t nr_decoded_cus;
 	struct list_head jobs;
 } cus_processing_queue;
 
@@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus)
 {
 	pthread_mutex_init(&cus_processing_queue.mutex, NULL);
 	pthread_cond_init(&cus_processing_queue.job_added, NULL);
-	pthread_cond_init(&cus_processing_queue.job_taken, NULL);
 	INIT_LIST_HEAD(&cus_processing_queue.jobs);
-	cus_processing_queue.max_decoded_cus = max_decoded_cus;
-	cus_processing_queue.nr_decoded_cus = 0;
 	cus_processing_queue.next_cu_id = 0;
 }
 
@@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void)
 {
 	pthread_mutex_destroy(&cus_processing_queue.mutex);
 	pthread_cond_destroy(&cus_processing_queue.job_added);
-	pthread_cond_destroy(&cus_processing_queue.job_taken);
 }
 
 static inline void cus_queue__inc_next_cu_id(void)
@@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job *job)
 	/* JOB_STEAL have higher priority, add them to the head so
 	 * they can be found faster
 	 */
-	if (job->type == JOB_STEAL) {
+	if (job->type == JOB_STEAL)
 		list_add(&job->node, &cus_processing_queue.jobs);
-		cus_processing_queue.nr_decoded_cus++;
-	} else {
+	else
 		list_add_tail(&job->node, &cus_processing_queue.jobs);
-	}
 
 	pthread_cond_signal(&cus_processing_queue.job_added);
 	pthread_mutex_unlock(&cus_processing_queue.mutex);
@@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void)
 	struct list_head *pos, *tmp;
 
 	pthread_mutex_lock(&cus_processing_queue.mutex);
-	while (list_empty(&cus_processing_queue.jobs))
-		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
-
-	/* First, try to find JOB_STEAL for the next CU */
+retry:
 	list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
 		job = list_entry(pos, struct cu_processing_job, node);
 		if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
-			list_del(&job->node);
-			cus_processing_queue.nr_decoded_cus--;
 			dequeued_job = job;
 			break;
 		}
-	}
-
-	/* If no JOB_STEAL is found, check if we are allowed to decode
-	 * more CUs.  If not, it means that the CU with next_cu_id is
-	 * still being decoded while the queue is "full". Wait.
-	 * job_taken will signal that another thread was able to pick
-	 * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
-	 */
-	if (dequeued_job == NULL) {
-		while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
-			pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
-
-		/* We can decode now. */
-		list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
-			job = list_entry(pos, struct cu_processing_job, node);
-			if (job->type == JOB_DECODE) {
-				list_del(&job->node);
-				dequeued_job = job;
-				break;
-			}
+		if (job->type == JOB_DECODE) {
+			/* all JOB_STEALs are added to the head, so no viable JOB_STEAL available */
+			dequeued_job = job;
+			break;
 		}
 	}
-
-	pthread_cond_signal(&cus_processing_queue.job_taken);
+	/* No jobs or only steals out of order */
+	if (!dequeued_job) {
+		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
+		goto retry;
+	}
+	list_del(&dequeued_job->node);
 	pthread_mutex_unlock(&cus_processing_queue.mutex);
 
-	return dequeued_job;
+	return job;
 }
 
 static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
@@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void *arg)
 				break;
 			}
 
-			/* Create and enqueue a new JOB_STEAL for this decoded CU */
-			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
-
-			steal_job->type = JOB_STEAL;
-			steal_job->cu = cu;
-			cus_queue__enqueue_job(steal_job);
-
-			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
+			job->type = JOB_STEAL;
+			job->cu = cu;
 			cus_queue__enqueue_job(job);
 			break;
 
@@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg)
 			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
 				goto out_abort;
 			cus_queue__inc_next_cu_id();
-			/* Free the job struct as it's no longer
-			 * needed after CU has been stolen.
-			 * dwarf_loader work for this CU is done.
-			 */
-			free(job);
+			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
+			job->type = JOB_DECODE;
+			job->cu = NULL;
+			cus_queue__enqueue_job(job);
 			break;
 
 		default:
@@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
 	pthread_t workers[nr_workers];
 	struct cu_processing_job *job;
 
-	cus_queue__init(nr_workers * 4);
+	cus_queue__init(nr_workers);
 
 	/* fill up the queue with nr_workers JOB_DECODE jobs */
-	for (int i = 0; i < nr_workers; i++) {
+	for (int i = 0; i < nr_workers * 4; i++) {
 		job = calloc(1, sizeof(*job));
 		job->type = JOB_DECODE;
 		/* no need for locks, workers were not started yet */
Eduard Zingerman Dec. 17, 2024, 2:14 a.m. UTC | #2
On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote:

Also a small nit.
Aside from my previous email, I think this is a good simplification.

[...]

> @@ -3250,24 +3250,20 @@ static void cu__sort_types_by_offset(struct cu *cu, struct conf_load *conf)
>  	cu__for_all_tags(cu, type__sort_by_offset, conf);
>  }
>  
> -static int cu__finalize(struct cu *cu, struct cus *cus, struct conf_load *conf, void *thr_data)
> +static void cu__finalize(struct cu *cu, struct cus *cus, struct conf_load *conf)
>  {
>  	cu__for_all_tags(cu, class_member__cache_byte_size, conf);
>  
>  	if (cu__language_reorders_offsets(cu))
>  		cu__sort_types_by_offset(cu, conf);
> -
> -	cus__set_cu_state(cus, cu, CU__LOADED);
> -
> -	if (conf && conf->steal) {
> -		return conf->steal(cu, conf, thr_data);
> -	}
> -	return LSK__KEEPIT;
>  }
>  
> -static int cus__finalize(struct cus *cus, struct cu *cu, struct conf_load *conf, void *thr_data)
> +static int cus__steal_now(struct cus *cus, struct cu *cu, struct conf_load *conf)
>  {
> -	int lsk = cu__finalize(cu, cus, conf, thr_data);
> +	if (!conf || !conf->steal)
> +		return 0;

Nit: the function returns either 0 or an enum literal,
     but 0 is a valid literal value for that enum.
     This is a bit confusing.

> +
> +	int lsk = conf->steal(cu, conf);
>  	switch (lsk) {
>  	case LSK__DELETE:
>  		cus__remove(cus, cu);

[...]
Ihor Solodrai Dec. 17, 2024, 6:12 p.m. UTC | #3
On Monday, December 16th, 2024 at 4:57 PM, Eduard Zingerman <eddyz87@gmail.com> wrote:

> 
> 
> On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote:
> 
> [...]
> 
> There is no real need to use two conditional variables to achieve what is done here.
> The "JOB_DECODE" item is already used as a "ticket" to do the decoding.
> So it is possible to "emit" a fixed amount of tickets and alternate their state
> between "decode"/"steal", w/o allocating new tickets.
> This would allow to remove "job_taken" conditional variable and decode counters.
> E.g. as in the patch below applied on top of this patch-set.

Your suggestion makes sense, I haven't thought about utilizing jobs as
"tickets". This simplifies synchronization. 

I'll incorporate this in the next version.

Thank you!

> 
> ---
> 
> diff --git a/dwarf_loader.c b/dwarf_loader.c
> index 6d22648..40ad27d 100644
> --- a/dwarf_loader.c
> +++ b/dwarf_loader.c
> @@ -3453,23 +3453,10 @@ struct dwarf_cus {
> static struct {
> pthread_mutex_t mutex;
> pthread_cond_t job_added;
> - pthread_cond_t job_taken;
> /* next_cu_id determines the next CU ready to be stealed
> * This enforces the order of CU stealing.
> /
> uint32_t next_cu_id;
> - / max_decoded_cus is a soft limit on the number of JOB_STEAL
> - * jobs currently in the queue (this number is equal to the
> - * number of decoded CUs held in memory). It's soft, because a
> - * worker thread may finish decoding it's current CU after
> - * this limit has already been reached. In such situation,
> - * JOB_STEAL with this CU is still added to the queue,
> - * although a worker will not pick up a new JOB_DECODE.
> - * So the real hard limit is max_decoded_cus + nr_workers.
> - * This variable indirectly limits the memory usage.
> - */
> - uint16_t max_decoded_cus;
> - uint16_t nr_decoded_cus;
> struct list_head jobs;
> } cus_processing_queue;
> 
> @@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus)
> {
> pthread_mutex_init(&cus_processing_queue.mutex, NULL);
> pthread_cond_init(&cus_processing_queue.job_added, NULL);
> - pthread_cond_init(&cus_processing_queue.job_taken, NULL);
> INIT_LIST_HEAD(&cus_processing_queue.jobs);
> - cus_processing_queue.max_decoded_cus = max_decoded_cus;
> - cus_processing_queue.nr_decoded_cus = 0;
> cus_processing_queue.next_cu_id = 0;
> }
> 
> @@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void)
> {
> pthread_mutex_destroy(&cus_processing_queue.mutex);
> pthread_cond_destroy(&cus_processing_queue.job_added);
> - pthread_cond_destroy(&cus_processing_queue.job_taken);
> }
> 
> static inline void cus_queue__inc_next_cu_id(void)
> @@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job job)
> / JOB_STEAL have higher priority, add them to the head so
> * they can be found faster
> */
> - if (job->type == JOB_STEAL) {
> 
> + if (job->type == JOB_STEAL)
> 
> list_add(&job->node, &cus_processing_queue.jobs);
> 
> - cus_processing_queue.nr_decoded_cus++;
> - } else {
> + else
> list_add_tail(&job->node, &cus_processing_queue.jobs);
> 
> - }
> 
> pthread_cond_signal(&cus_processing_queue.job_added);
> pthread_mutex_unlock(&cus_processing_queue.mutex);
> @@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void)
> struct list_head *pos, tmp;
> 
> pthread_mutex_lock(&cus_processing_queue.mutex);
> - while (list_empty(&cus_processing_queue.jobs))
> - pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> -
> - / First, try to find JOB_STEAL for the next CU */
> +retry:
> list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
> job = list_entry(pos, struct cu_processing_job, node);
> if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
> 
> - list_del(&job->node);
> 
> - cus_processing_queue.nr_decoded_cus--;
> dequeued_job = job;
> break;
> }
> - }
> -
> - /* If no JOB_STEAL is found, check if we are allowed to decode
> - * more CUs. If not, it means that the CU with next_cu_id is
> - * still being decoded while the queue is "full". Wait.
> - * job_taken will signal that another thread was able to pick
> - * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
> - */
> - if (dequeued_job == NULL) {
> - while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
> 
> - pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
> -
> - /* We can decode now. */
> - list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
> - job = list_entry(pos, struct cu_processing_job, node);
> - if (job->type == JOB_DECODE) {
> 
> - list_del(&job->node);
> 
> - dequeued_job = job;
> - break;
> - }
> + if (job->type == JOB_DECODE) {
> 
> + /* all JOB_STEALs are added to the head, so no viable JOB_STEAL available /
> + dequeued_job = job;
> + break;
> }
> }
> -
> - pthread_cond_signal(&cus_processing_queue.job_taken);
> + / No jobs or only steals out of order */
> + if (!dequeued_job) {
> + pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> + goto retry;
> + }
> + list_del(&dequeued_job->node);
> 
> pthread_mutex_unlock(&cus_processing_queue.mutex);
> 
> - return dequeued_job;
> + return job;
> }
> 
> static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
> @@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void arg)
> break;
> }
> 
> - / Create and enqueue a new JOB_STEAL for this decoded CU */
> - struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> -
> - steal_job->type = JOB_STEAL;
> 
> - steal_job->cu = cu;
> 
> - cus_queue__enqueue_job(steal_job);
> -
> - /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> + job->type = JOB_STEAL;
> 
> + job->cu = cu;
> 
> cus_queue__enqueue_job(job);
> break;
> 
> @@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg)
> if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
> 
> goto out_abort;
> cus_queue__inc_next_cu_id();
> - /* Free the job struct as it's no longer
> - * needed after CU has been stolen.
> - * dwarf_loader work for this CU is done.
> - /
> - free(job);
> + / re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> + job->type = JOB_DECODE;
> 
> + job->cu = NULL;
> 
> + cus_queue__enqueue_job(job);
> break;
> 
> default:
> @@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
> pthread_t workers[nr_workers];
> struct cu_processing_job job;
> 
> - cus_queue__init(nr_workers * 4);
> + cus_queue__init(nr_workers);
> 
> / fill up the queue with nr_workers JOB_DECODE jobs */
> - for (int i = 0; i < nr_workers; i++) {
> + for (int i = 0; i < nr_workers * 4; i++) {
> job = calloc(1, sizeof(*job));
> job->type = JOB_DECODE;
> 
> /* no need for locks, workers were not started yet */
Jiri Olsa Dec. 19, 2024, 2:59 p.m. UTC | #4
On Fri, Dec 13, 2024 at 10:37:34PM +0000, Ihor Solodrai wrote:

SNIP

> +static void *dwarf_loader__worker_thread(void *arg)
> +{
> +	struct cu_processing_job *job;
> +	struct dwarf_cus *dcus = arg;
> +	bool stop = false;
> +	struct cu *cu;
> +
> +	while (!stop) {
> +		job = cus_queue__dequeue_job();
> +
> +		switch (job->type) {
> +
> +		case JOB_DECODE:
> +			cu = dwarf_loader__decode_next_cu(dcus);
> +
> +			if (cu == NULL) {
> +				free(job);
> +				stop = true;
> +				break;
> +			}
> +
> +			/* Create and enqueue a new JOB_STEAL for this decoded CU */
> +			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));

missing steal_job != NULL check

SNIP

> -static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
> +static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
>  {
> -	pthread_t threads[dcus->conf->nr_jobs];
> -	struct dwarf_thread dthr[dcus->conf->nr_jobs];
> -	void *thread_data[dcus->conf->nr_jobs];
> -	int res;
> -	int i;
> +	int nr_workers = dcus->conf->nr_jobs > 0 ? dcus->conf->nr_jobs : 1;
> +	pthread_t workers[nr_workers];
> +	struct cu_processing_job *job;
>  
> -	if (dcus->conf->threads_prepare) {
> -		res = dcus->conf->threads_prepare(dcus->conf, dcus->conf->nr_jobs, thread_data);
> -		if (res != 0)
> -			return res;
> -	} else {
> -		memset(thread_data, 0, sizeof(void *) * dcus->conf->nr_jobs);
> +	cus_queue__init(nr_workers * 4);

why '* 4' ?

> +
> +	/* fill up the queue with nr_workers JOB_DECODE jobs */
> +	for (int i = 0; i < nr_workers; i++) {
> +		job = calloc(1, sizeof(*job));

missing job != NULL check

> +		job->type = JOB_DECODE;
> +		/* no need for locks, workers were not started yet */
> +		list_add(&job->node, &cus_processing_queue.jobs);
>  	}
>  
> -	for (i = 0; i < dcus->conf->nr_jobs; ++i) {
> -		dthr[i].dcus = dcus;
> -		dthr[i].data = thread_data[i];
> +	if (dcus->error)
> +		return dcus->error;
>  
> -		dcus->error = pthread_create(&threads[i], NULL,
> -					     dwarf_cus__process_cu_thread,
> -					     &dthr[i]);
> +	for (int i = 0; i < nr_workers; ++i) {
> +		dcus->error = pthread_create(&workers[i], NULL,
> +					     dwarf_loader__worker_thread,
> +					     dcus);
>  		if (dcus->error)
>  			goto out_join;
>  	}
> @@ -3596,54 +3766,19 @@ static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
>  	dcus->error = 0;
>  
>  out_join:
> -	while (--i >= 0) {
> +	for (int i = 0; i < nr_workers; ++i) {

I think you should keep the original while loop to cleanup/wait only for
threads that we actually created

>  		void *res;
> -		int err = pthread_join(threads[i], &res);
> +		int err = pthread_join(workers[i], &res);
>  
>  		if (err == 0 && res != NULL)
>  			dcus->error = (long)res;
>  	}
>  
> -	if (dcus->conf->threads_collect) {
> -		res = dcus->conf->threads_collect(dcus->conf, dcus->conf->nr_jobs,
> -						  thread_data, dcus->error);
> -		if (dcus->error == 0)
> -			dcus->error = res;
> -	}
> +	cus_queue__destroy();
>  
>  	return dcus->error;
>  }
>  

SNIP
Jiri Olsa Dec. 19, 2024, 2:59 p.m. UTC | #5
On Mon, Dec 16, 2024 at 04:57:33PM -0800, Eduard Zingerman wrote:

SNIP

> >  	}
> >  
> > -	if (dcus->conf->thread_exit &&
> > -	    dcus->conf->thread_exit(dcus->conf, dthr->data) != 0)
> > +	if (dcus->error)
> >  		goto out_abort;
> >  
> >  	return (void *)DWARF_CB_OK;
> > @@ -3566,29 +3736,29 @@ out_abort:
> >  	return (void *)DWARF_CB_ABORT;
> >  }
> 
> There is no real need to use two conditional variables to achieve what is done here.
> The "JOB_DECODE" item is already used as a "ticket" to do the decoding.
> So it is possible to "emit" a fixed amount of tickets and alternate their state
> between "decode"/"steal", w/o allocating new tickets.
> This would allow to remove "job_taken" conditional variable and decode counters.
> E.g. as in the patch below applied on top of this patch-set.

+1 , looks much easier

jirka

> 
> ---
> 
> diff --git a/dwarf_loader.c b/dwarf_loader.c
> index 6d22648..40ad27d 100644
> --- a/dwarf_loader.c
> +++ b/dwarf_loader.c
> @@ -3453,23 +3453,10 @@ struct dwarf_cus {
>  static struct {
>  	pthread_mutex_t mutex;
>  	pthread_cond_t job_added;
> -	pthread_cond_t job_taken;
>  	/* next_cu_id determines the next CU ready to be stealed
>  	 * This enforces the order of CU stealing.
>  	 */
>  	uint32_t next_cu_id;
> -	/* max_decoded_cus is a soft limit on the number of JOB_STEAL
> -	 * jobs currently in the queue (this number is equal to the
> -	 * number of decoded CUs held in memory). It's soft, because a
> -	 * worker thread may finish decoding it's current CU after
> -	 * this limit has already been reached. In such situation,
> -	 * JOB_STEAL with this CU is still added to the queue,
> -	 * although a worker will not pick up a new JOB_DECODE.
> -	 * So the real hard limit is max_decoded_cus + nr_workers.
> -	 * This variable indirectly limits the memory usage.
> -	 */
> -	uint16_t max_decoded_cus;
> -	uint16_t nr_decoded_cus;
>  	struct list_head jobs;
>  } cus_processing_queue;
>  
> @@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus)
>  {
>  	pthread_mutex_init(&cus_processing_queue.mutex, NULL);
>  	pthread_cond_init(&cus_processing_queue.job_added, NULL);
> -	pthread_cond_init(&cus_processing_queue.job_taken, NULL);
>  	INIT_LIST_HEAD(&cus_processing_queue.jobs);
> -	cus_processing_queue.max_decoded_cus = max_decoded_cus;
> -	cus_processing_queue.nr_decoded_cus = 0;
>  	cus_processing_queue.next_cu_id = 0;
>  }
>  
> @@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void)
>  {
>  	pthread_mutex_destroy(&cus_processing_queue.mutex);
>  	pthread_cond_destroy(&cus_processing_queue.job_added);
> -	pthread_cond_destroy(&cus_processing_queue.job_taken);
>  }
>  
>  static inline void cus_queue__inc_next_cu_id(void)
> @@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job *job)
>  	/* JOB_STEAL have higher priority, add them to the head so
>  	 * they can be found faster
>  	 */
> -	if (job->type == JOB_STEAL) {
> +	if (job->type == JOB_STEAL)
>  		list_add(&job->node, &cus_processing_queue.jobs);
> -		cus_processing_queue.nr_decoded_cus++;
> -	} else {
> +	else
>  		list_add_tail(&job->node, &cus_processing_queue.jobs);
> -	}
>  
>  	pthread_cond_signal(&cus_processing_queue.job_added);
>  	pthread_mutex_unlock(&cus_processing_queue.mutex);
> @@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void)
>  	struct list_head *pos, *tmp;
>  
>  	pthread_mutex_lock(&cus_processing_queue.mutex);
> -	while (list_empty(&cus_processing_queue.jobs))
> -		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> -
> -	/* First, try to find JOB_STEAL for the next CU */
> +retry:
>  	list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
>  		job = list_entry(pos, struct cu_processing_job, node);
>  		if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
> -			list_del(&job->node);
> -			cus_processing_queue.nr_decoded_cus--;
>  			dequeued_job = job;
>  			break;
>  		}
> -	}
> -
> -	/* If no JOB_STEAL is found, check if we are allowed to decode
> -	 * more CUs.  If not, it means that the CU with next_cu_id is
> -	 * still being decoded while the queue is "full". Wait.
> -	 * job_taken will signal that another thread was able to pick
> -	 * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
> -	 */
> -	if (dequeued_job == NULL) {
> -		while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
> -			pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
> -
> -		/* We can decode now. */
> -		list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
> -			job = list_entry(pos, struct cu_processing_job, node);
> -			if (job->type == JOB_DECODE) {
> -				list_del(&job->node);
> -				dequeued_job = job;
> -				break;
> -			}
> +		if (job->type == JOB_DECODE) {
> +			/* all JOB_STEALs are added to the head, so no viable JOB_STEAL available */
> +			dequeued_job = job;
> +			break;
>  		}
>  	}
> -
> -	pthread_cond_signal(&cus_processing_queue.job_taken);
> +	/* No jobs or only steals out of order */
> +	if (!dequeued_job) {
> +		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> +		goto retry;
> +	}
> +	list_del(&dequeued_job->node);
>  	pthread_mutex_unlock(&cus_processing_queue.mutex);
>  
> -	return dequeued_job;
> +	return job;
>  }
>  
>  static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
> @@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void *arg)
>  				break;
>  			}
>  
> -			/* Create and enqueue a new JOB_STEAL for this decoded CU */
> -			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> -
> -			steal_job->type = JOB_STEAL;
> -			steal_job->cu = cu;
> -			cus_queue__enqueue_job(steal_job);
> -
> -			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> +			job->type = JOB_STEAL;
> +			job->cu = cu;
>  			cus_queue__enqueue_job(job);
>  			break;
>  
> @@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg)
>  			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
>  				goto out_abort;
>  			cus_queue__inc_next_cu_id();
> -			/* Free the job struct as it's no longer
> -			 * needed after CU has been stolen.
> -			 * dwarf_loader work for this CU is done.
> -			 */
> -			free(job);
> +			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> +			job->type = JOB_DECODE;
> +			job->cu = NULL;
> +			cus_queue__enqueue_job(job);
>  			break;
>  
>  		default:
> @@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
>  	pthread_t workers[nr_workers];
>  	struct cu_processing_job *job;
>  
> -	cus_queue__init(nr_workers * 4);
> +	cus_queue__init(nr_workers);
>  
>  	/* fill up the queue with nr_workers JOB_DECODE jobs */
> -	for (int i = 0; i < nr_workers; i++) {
> +	for (int i = 0; i < nr_workers * 4; i++) {
>  		job = calloc(1, sizeof(*job));
>  		job->type = JOB_DECODE;
>  		/* no need for locks, workers were not started yet */
> 
>
Ihor Solodrai Dec. 19, 2024, 7:31 p.m. UTC | #6
On Thursday, December 19th, 2024 at 6:59 AM, Jiri Olsa <olsajiri@gmail.com> wrote:

> 
> 
> On Fri, Dec 13, 2024 at 10:37:34PM +0000, Ihor Solodrai wrote:
> 
> SNIP
> 
> > +static void *dwarf_loader__worker_thread(void *arg)
> > +{
> > + struct cu_processing_job *job;
> > + struct dwarf_cus *dcus = arg;
> > + bool stop = false;
> > + struct cu cu;
> > +
> > + while (!stop) {
> > + job = cus_queue__dequeue_job();
> > +
> > + switch (job->type) {
> > +
> > + case JOB_DECODE:
> > + cu = dwarf_loader__decode_next_cu(dcus);
> > +
> > + if (cu == NULL) {
> > + free(job);
> > + stop = true;
> > + break;
> > + }
> > +
> > + / Create and enqueue a new JOB_STEAL for this decoded CU */
> > + struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> 
> 
> missing steal_job != NULL check

In the next version, job objects are allocated only by the main thread
and are reused when enqueued [1].

> 
> SNIP
> 
> > -static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
> > +static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
> > {
> > - pthread_t threads[dcus->conf->nr_jobs];
> > - struct dwarf_thread dthr[dcus->conf->nr_jobs];
> > - void *thread_data[dcus->conf->nr_jobs];
> > - int res;
> > - int i;
> > + int nr_workers = dcus->conf->nr_jobs > 0 ? dcus->conf->nr_jobs : 1;
> > + pthread_t workers[nr_workers];
> > + struct cu_processing_job *job;
> > 
> > - if (dcus->conf->threads_prepare) {
> > - res = dcus->conf->threads_prepare(dcus->conf, dcus->conf->nr_jobs, thread_data);
> > - if (res != 0)
> > - return res;
> > - } else {
> > - memset(thread_data, 0, sizeof(void *) * dcus->conf->nr_jobs);
> > + cus_queue__init(nr_workers * 4);
> 
> 
> why '* 4' ?

This is an arbitrary limit, described in comments.

If we allow the workers to pick up next cu for decoding as soon as
it's ready, then the memory usage may greatly increase, if the stealer
can't keep up with incoming work.

If we want to avoid this there needs to be a limit on how many
decoded, but not yet stolen, CUs we allow to hold in memory. When
this limit is reached the workers will wait for more CUs to get
stolen.

N x 4 is a number I picked after trying various values and looking at
the resulting memory usage.

We could make it configurable, but this value doesn't look to me as a
reasonable user-facing option. Maybe we could add "I don't care about
memory usage" flag to pahole? wdyt?

> 
> > +
> > + /* fill up the queue with nr_workers JOB_DECODE jobs */
> > + for (int i = 0; i < nr_workers; i++) {
> > + job = calloc(1, sizeof(*job));
> 
> 
> missing job != NULL check
> 
> > + job->type = JOB_DECODE;
> > + /* no need for locks, workers were not started yet */
> > + list_add(&job->node, &cus_processing_queue.jobs);
> > }
> > 
> > - for (i = 0; i < dcus->conf->nr_jobs; ++i) {
> > - dthr[i].dcus = dcus;
> > - dthr[i].data = thread_data[i];
> > + if (dcus->error)
> > + return dcus->error;
> > 
> > - dcus->error = pthread_create(&threads[i], NULL,
> > - dwarf_cus__process_cu_thread,
> > - &dthr[i]);
> > + for (int i = 0; i < nr_workers; ++i) {
> > + dcus->error = pthread_create(&workers[i], NULL,
> > + dwarf_loader__worker_thread,
> > + dcus);
> > if (dcus->error)
> > goto out_join;
> > }
> > @@ -3596,54 +3766,19 @@ static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
> > dcus->error = 0;
> > 
> > out_join:
> > - while (--i >= 0) {
> > + for (int i = 0; i < nr_workers; ++i) {
> 
> 
> I think you should keep the original while loop to cleanup/wait only for
> threads that we actually created

Do you mean in case of an error from pthread_create? Ok.

> 
> > void *res;
> > - int err = pthread_join(threads[i], &res);
> > + int err = pthread_join(workers[i], &res);
> > 
> > if (err == 0 && res != NULL)
> > dcus->error = (long)res;
> > }
> > 
> > - if (dcus->conf->threads_collect) {
> > - res = dcus->conf->threads_collect(dcus->conf, dcus->conf->nr_jobs,
> > - thread_data, dcus->error);
> > - if (dcus->error == 0)
> > - dcus->error = res;
> > - }
> > + cus_queue__destroy();
> > 
> > return dcus->error;
> > }
> 
> 
> SNIP

[1] https://github.com/acmel/dwarves/commit/5278adbe5cb796c7baafb110d8c5cda107ec9d68#diff-77fe7eedce76594a6c71363b22832723fc086a8e72debd0370763e4193704b1eR3706-R3708
Jiri Olsa Dec. 20, 2024, 9:25 a.m. UTC | #7
On Thu, Dec 19, 2024 at 07:31:35PM +0000, Ihor Solodrai wrote:

SNIP

> > 
> > 
> > why '* 4' ?
> 
> This is an arbitrary limit, described in comments.
> 
> If we allow the workers to pick up next cu for decoding as soon as
> it's ready, then the memory usage may greatly increase, if the stealer
> can't keep up with incoming work.
> 
> If we want to avoid this there needs to be a limit on how many
> decoded, but not yet stolen, CUs we allow to hold in memory. When
> this limit is reached the workers will wait for more CUs to get
> stolen.
> 
> N x 4 is a number I picked after trying various values and looking at
> the resulting memory usage.

I think we can pick some number and add reasoning to the comment

> 
> We could make it configurable, but this value doesn't look to me as a
> reasonable user-facing option. Maybe we could add "I don't care about
> memory usage" flag to pahole? wdyt?

--I-don-t-care-about-memory-usage sounds great :-) but I think constant with
some comment will be enough for now and we'll see if we need it in future


> 
> > 
> > > +
> > > + /* fill up the queue with nr_workers JOB_DECODE jobs */
> > > + for (int i = 0; i < nr_workers; i++) {
> > > + job = calloc(1, sizeof(*job));
> > 
> > 
> > missing job != NULL check
> > 
> > > + job->type = JOB_DECODE;
> > > + /* no need for locks, workers were not started yet */
> > > + list_add(&job->node, &cus_processing_queue.jobs);
> > > }
> > > 
> > > - for (i = 0; i < dcus->conf->nr_jobs; ++i) {
> > > - dthr[i].dcus = dcus;
> > > - dthr[i].data = thread_data[i];
> > > + if (dcus->error)
> > > + return dcus->error;
> > > 
> > > - dcus->error = pthread_create(&threads[i], NULL,
> > > - dwarf_cus__process_cu_thread,
> > > - &dthr[i]);
> > > + for (int i = 0; i < nr_workers; ++i) {
> > > + dcus->error = pthread_create(&workers[i], NULL,
> > > + dwarf_loader__worker_thread,
> > > + dcus);
> > > if (dcus->error)
> > > goto out_join;
> > > }
> > > @@ -3596,54 +3766,19 @@ static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
> > > dcus->error = 0;
> > > 
> > > out_join:
> > > - while (--i >= 0) {
> > > + for (int i = 0; i < nr_workers; ++i) {
> > 
> > 
> > I think you should keep the original while loop to cleanup/wait only for
> > threads that we actually created
> 
> Do you mean in case of an error from pthread_create? Ok.

yes, thanks

jirka
diff mbox series

Patch

diff --git a/btf_encoder.c b/btf_encoder.c
index 0b71498..20befd6 100644
--- a/btf_encoder.c
+++ b/btf_encoder.c
@@ -163,8 +163,6 @@  static struct {
 	/* The mutex only needed for add/delete, as this can happen in
 	 * multiple encoding threads.  A btf_encoder is added to this
 	 * list in btf_encoder__new(), and removed in btf_encoder__delete().
-	 * All encoders except the main one (`btf_encoder` in pahole.c)
-	 * are deleted in pahole_threads_collect().
 	 */
 	pthread_mutex_t btf_encoder_list_lock;
 	struct list_head btf_encoder_list;
@@ -1378,7 +1376,7 @@  static void btf_encoder__delete_saved_funcs(struct btf_encoder *encoder)
 	}
 }
 
-int btf_encoder__add_saved_funcs(bool skip_encoding_inconsistent_proto)
+static int btf_encoder__add_saved_funcs(bool skip_encoding_inconsistent_proto)
 {
 	struct btf_encoder_func_state **saved_fns, *s;
 	struct btf_encoder *e = NULL;
@@ -2170,12 +2168,14 @@  out:
 	return err;
 }
 
-int btf_encoder__encode(struct btf_encoder *encoder)
+int btf_encoder__encode(struct btf_encoder *encoder, struct conf_load *conf)
 {
 	bool should_tag_kfuncs;
 	int err;
 	size_t shndx;
 
+	btf_encoder__add_saved_funcs(conf->skip_encoding_btf_inconsistent_proto);
+
 	for (shndx = 1; shndx < encoder->seccnt; shndx++)
 		if (gobuffer__size(&encoder->secinfo[shndx].secinfo))
 			btf_encoder__add_datasec(encoder, shndx);
diff --git a/btf_encoder.h b/btf_encoder.h
index 421cde1..1575b61 100644
--- a/btf_encoder.h
+++ b/btf_encoder.h
@@ -29,15 +29,11 @@  void btf_encoding_context__exit(void);
 struct btf_encoder *btf_encoder__new(struct cu *cu, const char *detached_filename, struct btf *base_btf, bool verbose, struct conf_load *conf_load);
 void btf_encoder__delete(struct btf_encoder *encoder);
 
-int btf_encoder__encode(struct btf_encoder *encoder);
-
+int btf_encoder__encode(struct btf_encoder *encoder, struct conf_load *conf);
 int btf_encoder__encode_cu(struct btf_encoder *encoder, struct cu *cu, struct conf_load *conf_load);
 
 struct btf *btf_encoder__btf(struct btf_encoder *encoder);
 
-int btf_encoder__add_encoder(struct btf_encoder *encoder, struct btf_encoder *other);
-int btf_encoder__add_saved_funcs(bool skip_encoding_inconsistent_proto);
-
 int btf_encoder__pre_load_module(Dwfl_Module *mod, Elf *elf);
 
 #endif /* _BTF_ENCODER_H_ */
diff --git a/btf_loader.c b/btf_loader.c
index 4814f29..cff0011 100644
--- a/btf_loader.c
+++ b/btf_loader.c
@@ -730,7 +730,7 @@  static int cus__load_btf(struct cus *cus, struct conf_load *conf, const char *fi
 	 * The app stole this cu, possibly deleting it,
 	 * so forget about it
 	 */
-	if (conf && conf->steal && conf->steal(cu, conf, NULL))
+	if (conf && conf->steal && conf->steal(cu, conf))
 		return 0;
 
 	cus__add(cus, cu);
diff --git a/ctf_loader.c b/ctf_loader.c
index 944bf6e..501c4ab 100644
--- a/ctf_loader.c
+++ b/ctf_loader.c
@@ -728,7 +728,7 @@  int ctf__load_file(struct cus *cus, struct conf_load *conf,
 	 * The app stole this cu, possibly deleting it,
 	 * so forget about it
 	 */
-	if (conf && conf->steal && conf->steal(cu, conf, NULL))
+	if (conf && conf->steal && conf->steal(cu, conf))
 		return 0;
 
 	cus__add(cus, cu);
diff --git a/dwarf_loader.c b/dwarf_loader.c
index 58b165d..6d22648 100644
--- a/dwarf_loader.c
+++ b/dwarf_loader.c
@@ -3250,24 +3250,20 @@  static void cu__sort_types_by_offset(struct cu *cu, struct conf_load *conf)
 	cu__for_all_tags(cu, type__sort_by_offset, conf);
 }
 
-static int cu__finalize(struct cu *cu, struct cus *cus, struct conf_load *conf, void *thr_data)
+static void cu__finalize(struct cu *cu, struct cus *cus, struct conf_load *conf)
 {
 	cu__for_all_tags(cu, class_member__cache_byte_size, conf);
 
 	if (cu__language_reorders_offsets(cu))
 		cu__sort_types_by_offset(cu, conf);
-
-	cus__set_cu_state(cus, cu, CU__LOADED);
-
-	if (conf && conf->steal) {
-		return conf->steal(cu, conf, thr_data);
-	}
-	return LSK__KEEPIT;
 }
 
-static int cus__finalize(struct cus *cus, struct cu *cu, struct conf_load *conf, void *thr_data)
+static int cus__steal_now(struct cus *cus, struct cu *cu, struct conf_load *conf)
 {
-	int lsk = cu__finalize(cu, cus, conf, thr_data);
+	if (!conf || !conf->steal)
+		return 0;
+
+	int lsk = conf->steal(cu, conf);
 	switch (lsk) {
 	case LSK__DELETE:
 		cus__remove(cus, cu);
@@ -3443,11 +3439,145 @@  struct dwarf_cus {
 	uint32_t	nr_cus_created;
 };
 
-struct dwarf_thread {
-	struct dwarf_cus	*dcus;
-	void			*data;
+/* Multithreading is implemented using a job/worker model.
+ * cus_processing_queue represents a collection of jobs to be
+ * completed by workers.
+ * dwarf_loader__worker_thread is the worker loop, taking jobs from
+ * the queue and executing them.
+ * Implementation of this queue ensures two constraints:
+ *   * JOB_STEAL jobs for a CU are executed in the order of cu->id, as
+ *     a consequence JOB_STEAL jobs always run one at a time.
+ *   * Workers are limited by max_decoded_cus: a worker will not pick
+ *     up a new JOB_DECODE if this limit is exceeded. It'll wait.
+ */
+static struct {
+	pthread_mutex_t mutex;
+	pthread_cond_t job_added;
+	pthread_cond_t job_taken;
+	/* next_cu_id determines the next CU ready to be stealed
+	 * This enforces the order of CU stealing.
+	 */
+	uint32_t next_cu_id;
+	/* max_decoded_cus is a soft limit on the number of JOB_STEAL
+	 * jobs currently in the queue (this number is equal to the
+	 * number of decoded CUs held in memory). It's soft, because a
+	 * worker thread may finish decoding it's current CU after
+	 * this limit has already been reached. In such situation,
+	 * JOB_STEAL with this CU is still added to the queue,
+	 * although a worker will not pick up a new JOB_DECODE.
+	 * So the real hard limit is max_decoded_cus + nr_workers.
+	 * This variable indirectly limits the memory usage.
+	 */
+	uint16_t max_decoded_cus;
+	uint16_t nr_decoded_cus;
+	struct list_head jobs;
+} cus_processing_queue;
+
+enum job_type {
+	JOB_NONE = 0,
+	JOB_DECODE = 1,
+	JOB_STEAL = 2,
 };
 
+struct cu_processing_job {
+	struct list_head node;
+	enum job_type type;
+	struct cu *cu; /* for stealing jobs */
+};
+
+static void cus_queue__init(uint16_t max_decoded_cus)
+{
+	pthread_mutex_init(&cus_processing_queue.mutex, NULL);
+	pthread_cond_init(&cus_processing_queue.job_added, NULL);
+	pthread_cond_init(&cus_processing_queue.job_taken, NULL);
+	INIT_LIST_HEAD(&cus_processing_queue.jobs);
+	cus_processing_queue.max_decoded_cus = max_decoded_cus;
+	cus_processing_queue.nr_decoded_cus = 0;
+	cus_processing_queue.next_cu_id = 0;
+}
+
+static void cus_queue__destroy(void)
+{
+	pthread_mutex_destroy(&cus_processing_queue.mutex);
+	pthread_cond_destroy(&cus_processing_queue.job_added);
+	pthread_cond_destroy(&cus_processing_queue.job_taken);
+}
+
+static inline void cus_queue__inc_next_cu_id(void)
+{
+	pthread_mutex_lock(&cus_processing_queue.mutex);
+	cus_processing_queue.next_cu_id++;
+	pthread_mutex_unlock(&cus_processing_queue.mutex);
+}
+
+static void cus_queue__enqueue_job(struct cu_processing_job *job)
+{
+	if (job == NULL)
+		return;
+
+	pthread_mutex_lock(&cus_processing_queue.mutex);
+
+	/* JOB_STEAL have higher priority, add them to the head so
+	 * they can be found faster
+	 */
+	if (job->type == JOB_STEAL) {
+		list_add(&job->node, &cus_processing_queue.jobs);
+		cus_processing_queue.nr_decoded_cus++;
+	} else {
+		list_add_tail(&job->node, &cus_processing_queue.jobs);
+	}
+
+	pthread_cond_signal(&cus_processing_queue.job_added);
+	pthread_mutex_unlock(&cus_processing_queue.mutex);
+}
+
+static struct cu_processing_job *cus_queue__dequeue_job(void)
+{
+	struct cu_processing_job *job, *dequeued_job = NULL;
+	struct list_head *pos, *tmp;
+
+	pthread_mutex_lock(&cus_processing_queue.mutex);
+	while (list_empty(&cus_processing_queue.jobs))
+		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
+
+	/* First, try to find JOB_STEAL for the next CU */
+	list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
+		job = list_entry(pos, struct cu_processing_job, node);
+		if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
+			list_del(&job->node);
+			cus_processing_queue.nr_decoded_cus--;
+			dequeued_job = job;
+			break;
+		}
+	}
+
+	/* If no JOB_STEAL is found, check if we are allowed to decode
+	 * more CUs.  If not, it means that the CU with next_cu_id is
+	 * still being decoded while the queue is "full". Wait.
+	 * job_taken will signal that another thread was able to pick
+	 * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
+	 */
+	if (dequeued_job == NULL) {
+		while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
+			pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
+
+		/* We can decode now. */
+		list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
+			job = list_entry(pos, struct cu_processing_job, node);
+			if (job->type == JOB_DECODE) {
+				list_del(&job->node);
+				dequeued_job = job;
+				break;
+			}
+		}
+	}
+
+	pthread_cond_signal(&cus_processing_queue.job_taken);
+	pthread_mutex_unlock(&cus_processing_queue.mutex);
+
+	return dequeued_job;
+}
+
 static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
 {
 	/*
@@ -3479,28 +3609,6 @@  static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *
 	return dcu;
 }
 
-static int dwarf_cus__process_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die,
-				 struct cu *cu, void *thr_data)
-{
-	if (die__process_and_recode(cu_die, cu, dcus->conf) != 0 ||
-	    cus__finalize(dcus->cus, cu, dcus->conf, thr_data) == LSK__STOP_LOADING)
-		return DWARF_CB_ABORT;
-
-       return DWARF_CB_OK;
-}
-
-static int dwarf_cus__create_and_process_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
-{
-	struct dwarf_cu *dcu = dwarf_cus__create_cu(dcus, cu_die, pointer_size);
-
-	if (dcu == NULL)
-		return DWARF_CB_ABORT;
-
-	cus__add(dcus->cus, dcu->cu);
-
-	return dwarf_cus__process_cu(dcus, cu_die, dcu->cu, NULL);
-}
-
 static int dwarf_cus__nextcu(struct dwarf_cus *dcus, struct dwarf_cu **dcu,
 			     Dwarf_Die *die_mem, Dwarf_Die **cu_die,
 			     uint8_t *pointer_size, uint8_t *offset_size)
@@ -3541,24 +3649,86 @@  out_unlock:
 	return ret;
 }
 
-static void *dwarf_cus__process_cu_thread(void *arg)
+static struct cu *dwarf_loader__decode_next_cu(struct dwarf_cus *dcus)
 {
-	struct dwarf_thread *dthr = arg;
-	struct dwarf_cus *dcus = dthr->dcus;
 	uint8_t pointer_size, offset_size;
+	struct dwarf_cu *dcu = NULL;
 	Dwarf_Die die_mem, *cu_die;
-	struct dwarf_cu *dcu;
+	int err;
 
-	while (dwarf_cus__nextcu(dcus, &dcu, &die_mem, &cu_die, &pointer_size, &offset_size) == 0) {
-		if (cu_die == NULL)
+	err = dwarf_cus__nextcu(dcus, &dcu, &die_mem, &cu_die, &pointer_size, &offset_size);
+
+	if (err < 0)
+		goto out_error;
+	else if (err == 1) /* no more CUs */
+		return NULL;
+
+	err = die__process_and_recode(cu_die, dcu->cu, dcus->conf);
+	if (err)
+		goto out_error;
+	if (cu_die == NULL)
+		return NULL;
+
+	cu__finalize(dcu->cu, dcus->cus, dcus->conf);
+
+	return dcu->cu;
+
+out_error:
+	dcus->error = err;
+	fprintf(stderr, "error decoding cu %s\n", dcu && dcu->cu ? dcu->cu->name : "");
+	return NULL;
+}
+
+static void *dwarf_loader__worker_thread(void *arg)
+{
+	struct cu_processing_job *job;
+	struct dwarf_cus *dcus = arg;
+	bool stop = false;
+	struct cu *cu;
+
+	while (!stop) {
+		job = cus_queue__dequeue_job();
+
+		switch (job->type) {
+
+		case JOB_DECODE:
+			cu = dwarf_loader__decode_next_cu(dcus);
+
+			if (cu == NULL) {
+				free(job);
+				stop = true;
+				break;
+			}
+
+			/* Create and enqueue a new JOB_STEAL for this decoded CU */
+			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
+
+			steal_job->type = JOB_STEAL;
+			steal_job->cu = cu;
+			cus_queue__enqueue_job(steal_job);
+
+			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
+			cus_queue__enqueue_job(job);
+			break;
+
+		case JOB_STEAL:
+			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
+				goto out_abort;
+			cus_queue__inc_next_cu_id();
+			/* Free the job struct as it's no longer
+			 * needed after CU has been stolen.
+			 * dwarf_loader work for this CU is done.
+			 */
+			free(job);
 			break;
 
-		if (dwarf_cus__process_cu(dcus, cu_die, dcu->cu, dthr->data) == DWARF_CB_ABORT)
+		default:
+			fprintf(stderr, "Unknown dwarf_loader job type %d\n", job->type);
 			goto out_abort;
+		}
 	}
 
-	if (dcus->conf->thread_exit &&
-	    dcus->conf->thread_exit(dcus->conf, dthr->data) != 0)
+	if (dcus->error)
 		goto out_abort;
 
 	return (void *)DWARF_CB_OK;
@@ -3566,29 +3736,29 @@  out_abort:
 	return (void *)DWARF_CB_ABORT;
 }
 
-static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
+static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
 {
-	pthread_t threads[dcus->conf->nr_jobs];
-	struct dwarf_thread dthr[dcus->conf->nr_jobs];
-	void *thread_data[dcus->conf->nr_jobs];
-	int res;
-	int i;
+	int nr_workers = dcus->conf->nr_jobs > 0 ? dcus->conf->nr_jobs : 1;
+	pthread_t workers[nr_workers];
+	struct cu_processing_job *job;
 
-	if (dcus->conf->threads_prepare) {
-		res = dcus->conf->threads_prepare(dcus->conf, dcus->conf->nr_jobs, thread_data);
-		if (res != 0)
-			return res;
-	} else {
-		memset(thread_data, 0, sizeof(void *) * dcus->conf->nr_jobs);
+	cus_queue__init(nr_workers * 4);
+
+	/* fill up the queue with nr_workers JOB_DECODE jobs */
+	for (int i = 0; i < nr_workers; i++) {
+		job = calloc(1, sizeof(*job));
+		job->type = JOB_DECODE;
+		/* no need for locks, workers were not started yet */
+		list_add(&job->node, &cus_processing_queue.jobs);
 	}
 
-	for (i = 0; i < dcus->conf->nr_jobs; ++i) {
-		dthr[i].dcus = dcus;
-		dthr[i].data = thread_data[i];
+	if (dcus->error)
+		return dcus->error;
 
-		dcus->error = pthread_create(&threads[i], NULL,
-					     dwarf_cus__process_cu_thread,
-					     &dthr[i]);
+	for (int i = 0; i < nr_workers; ++i) {
+		dcus->error = pthread_create(&workers[i], NULL,
+					     dwarf_loader__worker_thread,
+					     dcus);
 		if (dcus->error)
 			goto out_join;
 	}
@@ -3596,54 +3766,19 @@  static int dwarf_cus__threaded_process_cus(struct dwarf_cus *dcus)
 	dcus->error = 0;
 
 out_join:
-	while (--i >= 0) {
+	for (int i = 0; i < nr_workers; ++i) {
 		void *res;
-		int err = pthread_join(threads[i], &res);
+		int err = pthread_join(workers[i], &res);
 
 		if (err == 0 && res != NULL)
 			dcus->error = (long)res;
 	}
 
-	if (dcus->conf->threads_collect) {
-		res = dcus->conf->threads_collect(dcus->conf, dcus->conf->nr_jobs,
-						  thread_data, dcus->error);
-		if (dcus->error == 0)
-			dcus->error = res;
-	}
+	cus_queue__destroy();
 
 	return dcus->error;
 }
 
-static int __dwarf_cus__process_cus(struct dwarf_cus *dcus)
-{
-	uint8_t pointer_size, offset_size;
-	Dwarf_Off noff;
-	size_t cuhl;
-
-	while (dwarf_nextcu(dcus->dw, dcus->off, &noff, &cuhl, NULL, &pointer_size, &offset_size) == 0) {
-		Dwarf_Die die_mem;
-		Dwarf_Die *cu_die = dwarf_offdie(dcus->dw, dcus->off + cuhl, &die_mem);
-
-		if (cu_die == NULL)
-			break;
-
-		if (dwarf_cus__create_and_process_cu(dcus, cu_die, pointer_size) == DWARF_CB_ABORT)
-			return DWARF_CB_ABORT;
-
-		dcus->off = noff;
-	}
-
-	return 0;
-}
-
-static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
-{
-	if (dcus->conf->nr_jobs > 1)
-		return dwarf_cus__threaded_process_cus(dcus);
-
-	return __dwarf_cus__process_cus(dcus);
-}
-
 static int cus__merge_and_process_cu(struct cus *cus, struct conf_load *conf,
 				     Dwfl_Module *mod, Dwarf *dw, Elf *elf,
 				     const char *filename,
@@ -3733,7 +3868,8 @@  static int cus__merge_and_process_cu(struct cus *cus, struct conf_load *conf,
 	if (cu__resolve_func_ret_types_optimized(cu) != LSK__KEEPIT)
 		goto out_abort;
 
-	if (cus__finalize(cus, cu, conf, NULL) == LSK__STOP_LOADING)
+	cu__finalize(cu, cus, conf);
+	if (cus__steal_now(cus, cu, conf) == LSK__STOP_LOADING)
 		goto out_abort;
 
 	return 0;
@@ -3765,7 +3901,8 @@  static int cus__load_module(struct cus *cus, struct conf_load *conf,
 	}
 
 	if (type_cu != NULL) {
-		type_lsk = cu__finalize(type_cu, cus, conf, NULL);
+		cu__finalize(type_cu, cus, conf);
+		type_lsk = cus__steal_now(cus, type_cu, conf);
 		if (type_lsk == LSK__DELETE) {
 			cus__remove(cus, type_cu);
 		}
@@ -3787,6 +3924,7 @@  static int cus__load_module(struct cus *cus, struct conf_load *conf,
 			.type_dcu = type_cu ? &type_dcu : NULL,
 			.build_id = build_id,
 			.build_id_len = build_id_len,
+			.nr_cus_created = 0,
 		};
 		res = dwarf_cus__process_cus(&dcus);
 	}
diff --git a/dwarves.c b/dwarves.c
index ae512b9..7c3e878 100644
--- a/dwarves.c
+++ b/dwarves.c
@@ -530,48 +530,6 @@  void cus__unlock(struct cus *cus)
 	pthread_mutex_unlock(&cus->mutex);
 }
 
-void cus__set_cu_state(struct cus *cus, struct cu *cu, enum cu_state state)
-{
-	cus__lock(cus);
-	cu->state = state;
-	cus__unlock(cus);
-}
-
-// Used only when reproducible builds are desired
-struct cu *cus__get_next_processable_cu(struct cus *cus)
-{
-	struct cu *cu;
-
-	cus__lock(cus);
-
-	list_for_each_entry(cu, &cus->cus, node) {
-		switch (cu->state) {
-		case CU__LOADED:
-			cu->state = CU__PROCESSING;
-			goto found;
-		case CU__PROCESSING:
-			// This will happen when we get to parallel
-			// reproducible BTF encoding, libbpf dedup work needed
-			// here. The other possibility is when we're flushing
-			// the DWARF processed CUs when the parallel DWARF
-			// loading stoped and we still have CUs to encode to
-			// BTF because of ordering requirements.
-			continue;
-		case CU__UNPROCESSED:
-			// The first entry isn't loaded, signal the
-			// caller to return and try another day, as we
-			// need to respect the original DWARF CU ordering.
-			goto out;
-		}
-	}
-out:
-	cu = NULL;
-found:
-	cus__unlock(cus);
-
-	return cu;
-}
-
 bool cus__empty(const struct cus *cus)
 {
 	return list_empty(&cus->cus);
@@ -808,8 +766,6 @@  struct cu *cu__new(const char *name, uint8_t addr_size,
 		cu->addr_size = addr_size;
 		cu->extra_dbg_info = 0;
 
-		cu->state = CU__UNPROCESSED;
-
 		cu->nr_inline_expansions   = 0;
 		cu->size_inline_expansions = 0;
 		cu->nr_structures_changed  = 0;
diff --git a/dwarves.h b/dwarves.h
index 7c80b18..70b4ddf 100644
--- a/dwarves.h
+++ b/dwarves.h
@@ -44,12 +44,6 @@  enum load_steal_kind {
 	LSK__STOP_LOADING,
 };
 
-enum cu_state {
-	CU__UNPROCESSED,
-	CU__LOADED,
-	CU__PROCESSING,
-};
-
 /*
  * BTF combines all the types into one big CU using btf_dedup(), so for something
  * like a allyesconfig vmlinux kernel we can get over 65535 types.
@@ -60,7 +54,6 @@  struct btf;
 struct conf_fprintf;
 
 /** struct conf_load - load configuration
- * @thread_exit - called at the end of a thread, 1st user: BTF encoder dedup
  * @extra_dbg_info - keep original debugging format extra info
  *		     (e.g. DWARF's decl_{line,file}, id, etc)
  * @fixup_silly_bitfields - Fixup silly things such as "int foo:32;"
@@ -70,11 +63,9 @@  struct conf_fprintf;
  * @skip_missing - skip missing types rather than bailing out.
  */
 struct conf_load {
-	enum load_steal_kind	(*steal)(struct cu *cu,
-					 struct conf_load *conf,
-					 void *thr_data);
+	enum load_steal_kind	(*steal)(struct cu *cu, struct conf_load *conf);
 	struct cu *		(*early_cu_filter)(struct cu *cu);
-	int			(*thread_exit)(struct conf_load *conf, void *thr_data);
+	int			(*pre_load_module)(Dwfl_Module *mod, Elf *elf);
 	void			*cookie;
 	char			*format_path;
 	int			nr_jobs;
@@ -105,9 +96,6 @@  struct conf_load {
 	const char		*kabi_prefix;
 	struct btf		*base_btf;
 	struct conf_fprintf	*conf_fprintf;
-	int			(*threads_prepare)(struct conf_load *conf, int nr_threads, void **thr_data);
-	int			(*threads_collect)(struct conf_load *conf, int nr_threads, void **thr_data, int error);
-	int			(*pre_load_module)(Dwfl_Module *mod, Elf *elf);
 };
 
 /** struct conf_fprintf - hints to the __fprintf routines
@@ -189,10 +177,6 @@  void cus__add(struct cus *cus, struct cu *cu);
 void __cus__remove(struct cus *cus, struct cu *cu);
 void cus__remove(struct cus *cus, struct cu *cu);
 
-struct cu *cus__get_next_processable_cu(struct cus *cus);
-
-void cus__set_cu_state(struct cus *cus, struct cu *cu, enum cu_state state);
-
 void cus__print_error_msg(const char *progname, const struct cus *cus,
 			  const char *filename, const int err);
 struct cu *cus__find_pair(struct cus *cus, const char *name);
@@ -309,7 +293,6 @@  struct cu {
 	uint8_t		 nr_register_params;
 	int		 register_params[ARCH_MAX_REGISTER_PARAMS];
 	int		 functions_saved;
-	enum cu_state	 state;
 	uint16_t	 language;
 	unsigned long	 nr_inline_expansions;
 	size_t		 size_inline_expansions;
diff --git a/pahole.c b/pahole.c
index 7964a03..4148d7a 100644
--- a/pahole.c
+++ b/pahole.c
@@ -3118,6 +3118,32 @@  out:
 	return ret;
 }
 
+static enum load_steal_kind pahole_stealer__btf_encode(struct cu *cu, struct conf_load *conf_load)
+{
+	int err;
+
+	if (!btf_encoder)
+		btf_encoder = btf_encoder__new(cu,
+				       detached_btf_filename,
+				       conf_load->base_btf,
+				       global_verbose,
+				       conf_load);
+
+	if (!btf_encoder) {
+		fprintf(stderr, "Error creating BTF encoder.\n");
+		return LSK__STOP_LOADING;
+	}
+
+	err = btf_encoder__encode_cu(btf_encoder, cu, conf_load);
+	if (err < 0) {
+		fprintf(stderr, "Error while encoding BTF.\n");
+		return LSK__STOP_LOADING;
+	}
+
+	return LSK__DELETE;
+}
+
+
 static struct type_instance *header;
 
 static bool print_enumeration_with_enumerator(struct cu *cu, const char *name)
@@ -3136,87 +3162,7 @@  static bool print_enumeration_with_enumerator(struct cu *cu, const char *name)
 	return false;
 }
 
-struct thread_data {
-	struct btf *btf;
-	struct btf_encoder *encoder;
-};
-
-static int pahole_threads_prepare_reproducible_build(struct conf_load *conf, int nr_threads, void **thr_data)
-{
-	for (int i = 0; i < nr_threads; i++)
-		thr_data[i] = NULL;
-
-	return 0;
-}
-
-static int pahole_threads_prepare(struct conf_load *conf, int nr_threads, void **thr_data)
-{
-	int i;
-	struct thread_data *threads = calloc(sizeof(struct thread_data), nr_threads);
-
-	for (i = 0; i < nr_threads; i++)
-		thr_data[i] = threads + i;
-
-	return 0;
-}
-
-static int pahole_thread_exit(struct conf_load *conf, void *thr_data)
-{
-	struct thread_data *thread = thr_data;
-
-	if (thread == NULL)
-		return 0;
-
-	/*
-	 * Here we will call btf__dedup() here once we extend
-	 * btf__dedup().
-	 */
-
-	return 0;
-}
-
-static int pahole_threads_collect(struct conf_load *conf, int nr_threads, void **thr_data,
-				  int error)
-{
-	struct thread_data **threads = (struct thread_data **)thr_data;
-	int i;
-	int err = 0;
-
-	if (error)
-		goto out;
-
-	err = btf_encoder__add_saved_funcs(conf_load.skip_encoding_btf_inconsistent_proto);
-	if (err < 0)
-		goto out;
-
-	for (i = 0; i < nr_threads; i++) {
-		/*
-		 * Merge content of the btf instances of worker threads to the btf
-		 * instance of the primary btf_encoder.
-                */
-		if (!threads[i]->encoder || !threads[i]->btf)
-			continue;
-		err = btf_encoder__add_encoder(btf_encoder, threads[i]->encoder);
-		if (err < 0)
-			goto out;
-	}
-	err = 0;
-
-out:
-	for (i = 0; i < nr_threads; i++) {
-		if (threads[i]->encoder && threads[i]->encoder != btf_encoder) {
-			btf_encoder__delete(threads[i]->encoder);
-			threads[i]->encoder = NULL;
-		}
-	}
-	free(threads[0]);
-
-	return err;
-}
-
-static enum load_steal_kind pahole_stealer(struct cu *cu,
-					   struct conf_load *conf_load,
-					   void *thr_data)
+static enum load_steal_kind pahole_stealer(struct cu *cu, struct conf_load *conf_load)
 {
 	int ret = LSK__DELETE;
 
@@ -3238,94 +3184,7 @@  static enum load_steal_kind pahole_stealer(struct cu *cu,
 		return LSK__DELETE; // Maybe we can find this in several CUs, so don't stop it
 
 	if (btf_encode) {
-		static pthread_mutex_t btf_lock = PTHREAD_MUTEX_INITIALIZER;
-		struct btf_encoder *encoder;
-
-		pthread_mutex_lock(&btf_lock);
-		/*
-		 * FIXME:
-		 *
-		 * This should be really done at main(), but since in the current codebase only at this
-		 * point we'll have cu->elf setup...
-		 */
-		if (!btf_encoder) {
-			/*
-			 * btf_encoder is the primary encoder.
-			 * And, it is used by the thread
-			 * create it.
-			 */
-			btf_encoder = btf_encoder__new(cu, detached_btf_filename, conf_load->base_btf,
-						       global_verbose, conf_load);
-			if (btf_encoder && thr_data) {
-				struct thread_data *thread = thr_data;
-
-				thread->encoder = btf_encoder;
-				thread->btf = btf_encoder__btf(btf_encoder);
-			}
-		}
-
-		// Reproducible builds don't have multiple btf_encoders, so we need to keep the lock until we encode BTF for this CU.
-		if (thr_data)
-			pthread_mutex_unlock(&btf_lock);
-
-		if (!btf_encoder) {
-			ret = LSK__STOP_LOADING;
-			goto out_btf;
-		}
-
-		/*
-		 * thr_data keeps per-thread data for worker threads.  Each worker thread
-		 * has an encoder.  The main thread will merge the data collected by all
-		 * these encoders to btf_encoder.  However, the first thread reaching this
-		 * function creates btf_encoder and reuses it as its local encoder.  It
-		 * avoids copying the data collected by the first thread.
-		 */
-		if (thr_data) {
-			struct thread_data *thread = thr_data;
-
-			if (thread->encoder == NULL) {
-				thread->encoder =
-					btf_encoder__new(cu, detached_btf_filename,
-							 NULL,
-							 global_verbose,
-							 conf_load);
-				thread->btf = btf_encoder__btf(thread->encoder);
-			}
-			encoder = thread->encoder;
-		} else {
-			encoder = btf_encoder;
-		}
-
-		// Since we don't have yet a way to parallelize the BTF encoding, we
-		// need to ask the loader for the next CU that we can process, one
-		// that is loaded and is in order, if the next one isn't yet loaded,
-		// then return to let the DWARF loader thread to load the next one,
-		// eventually all will get processed, even if when all DWARF loading
-		// threads finish.
-		if (conf_load->reproducible_build) {
-			ret = LSK__KEEPIT; // we're not processing the cu passed to this
-					  // function, so keep it.
-			cu = cus__get_next_processable_cu(cus);
-			if (cu == NULL)
-				goto out_btf;
-		}
-
-		ret = btf_encoder__encode_cu(encoder, cu, conf_load);
-		if (ret < 0) {
-			fprintf(stderr, "Encountered error while encoding BTF.\n");
-			exit(1);
-		}
-
-		if (conf_load->reproducible_build) {
-			ret = LSK__KEEPIT; // we're not processing the cu passed to this function, so keep it.
-			// Kinda equivalent to LSK__DELETE since we processed this, but we can't delete it
-			// as we stash references to entries in CUs for 'struct function' in btf_encoder__add_saved_funcs()
-			// and btf_encoder__save_func(), so we can't delete them here. - Alan Maguire
-		}
-out_btf:
-		if (!thr_data) // See comment about reproducibe_build above
-			pthread_mutex_unlock(&btf_lock);
-		return ret;
+		return pahole_stealer__btf_encode(cu, conf_load);
 	}
 #if 0
 	if (ctf_encode) {
@@ -3625,24 +3484,6 @@  out_free:
 	return ret;
 }
 
-static int cus__flush_reproducible_build(struct cus *cus, struct btf_encoder *encoder, struct conf_load *conf_load)
-{
-	int err = 0;
-
-	while (true) {
-		struct cu *cu = cus__get_next_processable_cu(cus);
-
-		if (cu == NULL)
-			break;
-
-		err = btf_encoder__encode_cu(encoder, cu, conf_load);
-		if (err < 0)
-			break;
-	}
-
-	return err;
-}
-
 int main(int argc, char *argv[])
 {
 	int err, remaining, rc = EXIT_FAILURE;
@@ -3731,16 +3572,6 @@  int main(int argc, char *argv[])
 	if (languages.exclude)
 		conf_load.early_cu_filter = cu__filter;
 
-	conf_load.thread_exit = pahole_thread_exit;
-
-	if (conf_load.reproducible_build) {
-		conf_load.threads_prepare = pahole_threads_prepare_reproducible_build;
-		conf_load.threads_collect = NULL;
-	} else {
-		conf_load.threads_prepare = pahole_threads_prepare;
-		conf_load.threads_collect = pahole_threads_collect;
-	}
-
 	if (btf_encode) {
 		conf_load.pre_load_module = btf_encoder__pre_load_module;
 		err = btf_encoding_context__init();
@@ -3847,16 +3678,7 @@  try_sole_arg_as_class_names:
 	header = NULL;
 
 	if (btf_encode && btf_encoder) { // maybe all CUs were filtered out and thus we don't have an encoder?
-		if (conf_load.reproducible_build &&
-		    cus__flush_reproducible_build(cus, btf_encoder, &conf_load) < 0) {
-			fprintf(stderr, "Encountered error while encoding BTF.\n");
-			exit(1);
-		}
-
-		if (conf_load.nr_jobs <= 1 || conf_load.reproducible_build)
-			btf_encoder__add_saved_funcs(conf_load.skip_encoding_btf_inconsistent_proto);
-
-		err = btf_encoder__encode(btf_encoder);
+		err = btf_encoder__encode(btf_encoder, &conf_load);
 		btf_encoder__delete(btf_encoder);
 		if (err) {
 			fputs("Failed to encode BTF\n", stderr);
diff --git a/pdwtags.c b/pdwtags.c
index 67982af..962883d 100644
--- a/pdwtags.c
+++ b/pdwtags.c
@@ -91,8 +91,7 @@  static int cu__emit_tags(struct cu *cu)
 }
 
 static enum load_steal_kind pdwtags_stealer(struct cu *cu,
-					    struct conf_load *conf_load __maybe_unused,
-					    void *thr_data __maybe_unused)
+					    struct conf_load *conf_load __maybe_unused)
 {
 	cu__emit_tags(cu);
 	return LSK__DELETE;
diff --git a/pfunct.c b/pfunct.c
index 1d74ece..55eafe8 100644
--- a/pfunct.c
+++ b/pfunct.c
@@ -510,8 +510,7 @@  int elf_symtabs__show(char *filenames[])
 }
 
 static enum load_steal_kind pfunct_stealer(struct cu *cu,
-					   struct conf_load *conf_load __maybe_unused,
-					   void *thr_data __maybe_unused)
+					   struct conf_load *conf_load __maybe_unused)
 {
 
 	if (function_name) {
diff --git a/tests/reproducible_build.sh b/tests/reproducible_build.sh
index f10f834..a940d93 100755
--- a/tests/reproducible_build.sh
+++ b/tests/reproducible_build.sh
@@ -37,10 +37,7 @@  for threads in $(seq $nr_proc) ; do
 	sleep 0.3s
 	# PID part to remove ps output headers
 	nr_threads_started=$(ps -L -C pahole | grep -v PID | wc -l)
-
-	if [ $threads -gt 1 ] ; then
-		((nr_threads_started -= 1))
-	fi
+        ((nr_threads_started -= 1)) # main thread doesn't count, it waits to join
 
 	if [ $threads != $nr_threads_started ] ; then
 		echo "ERROR: pahole asked to start $threads encoding threads, started $nr_threads_started"