Re: [PATCH dwarves v2 10/10] dwarf_loader: multithreading with a job/worker model

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote:

[...]

> +static void *dwarf_loader__worker_thread(void *arg)
> +{
> +	struct cu_processing_job *job;
> +	struct dwarf_cus *dcus = arg;
> +	bool stop = false;
> +	struct cu *cu;
> +
> +	while (!stop) {
> +		job = cus_queue__dequeue_job();
> +
> +		switch (job->type) {
> +
> +		case JOB_DECODE:
> +			cu = dwarf_loader__decode_next_cu(dcus);
> +
> +			if (cu == NULL) {
> +				free(job);
> +				stop = true;
> +				break;
> +			}
> +
> +			/* Create and enqueue a new JOB_STEAL for this decoded CU */
> +			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> +
> +			steal_job->type = JOB_STEAL;
> +			steal_job->cu = cu;
> +			cus_queue__enqueue_job(steal_job);
> +
> +			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> +			cus_queue__enqueue_job(job);
> +			break;
> +
> +		case JOB_STEAL:
> +			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
> +				goto out_abort;
> +			cus_queue__inc_next_cu_id();
> +			/* Free the job struct as it's no longer
> +			 * needed after CU has been stolen.
> +			 * dwarf_loader work for this CU is done.
> +			 */
> +			free(job);
>  			break;
>  
> -		if (dwarf_cus__process_cu(dcus, cu_die, dcu->cu, dthr->data) == DWARF_CB_ABORT)
> +		default:
> +			fprintf(stderr, "Unknown dwarf_loader job type %d\n", job->type);
>  			goto out_abort;
> +		}
>  	}
>  
> -	if (dcus->conf->thread_exit &&
> -	    dcus->conf->thread_exit(dcus->conf, dthr->data) != 0)
> +	if (dcus->error)
>  		goto out_abort;
>  
>  	return (void *)DWARF_CB_OK;
> @@ -3566,29 +3736,29 @@ out_abort:
>  	return (void *)DWARF_CB_ABORT;
>  }

There is no real need to use two conditional variables to achieve what is done here.
The "JOB_DECODE" item is already used as a "ticket" to do the decoding.
So it is possible to "emit" a fixed amount of tickets and alternate their state
between "decode"/"steal", w/o allocating new tickets.
This would allow to remove "job_taken" conditional variable and decode counters.
E.g. as in the patch below applied on top of this patch-set.

---

diff --git a/dwarf_loader.c b/dwarf_loader.c
index 6d22648..40ad27d 100644
--- a/dwarf_loader.c
+++ b/dwarf_loader.c
@@ -3453,23 +3453,10 @@ struct dwarf_cus {
 static struct {
 	pthread_mutex_t mutex;
 	pthread_cond_t job_added;
-	pthread_cond_t job_taken;
 	/* next_cu_id determines the next CU ready to be stealed
 	 * This enforces the order of CU stealing.
 	 */
 	uint32_t next_cu_id;
-	/* max_decoded_cus is a soft limit on the number of JOB_STEAL
-	 * jobs currently in the queue (this number is equal to the
-	 * number of decoded CUs held in memory). It's soft, because a
-	 * worker thread may finish decoding it's current CU after
-	 * this limit has already been reached. In such situation,
-	 * JOB_STEAL with this CU is still added to the queue,
-	 * although a worker will not pick up a new JOB_DECODE.
-	 * So the real hard limit is max_decoded_cus + nr_workers.
-	 * This variable indirectly limits the memory usage.
-	 */
-	uint16_t max_decoded_cus;
-	uint16_t nr_decoded_cus;
 	struct list_head jobs;
 } cus_processing_queue;
 
@@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus)
 {
 	pthread_mutex_init(&cus_processing_queue.mutex, NULL);
 	pthread_cond_init(&cus_processing_queue.job_added, NULL);
-	pthread_cond_init(&cus_processing_queue.job_taken, NULL);
 	INIT_LIST_HEAD(&cus_processing_queue.jobs);
-	cus_processing_queue.max_decoded_cus = max_decoded_cus;
-	cus_processing_queue.nr_decoded_cus = 0;
 	cus_processing_queue.next_cu_id = 0;
 }
 
@@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void)
 {
 	pthread_mutex_destroy(&cus_processing_queue.mutex);
 	pthread_cond_destroy(&cus_processing_queue.job_added);
-	pthread_cond_destroy(&cus_processing_queue.job_taken);
 }
 
 static inline void cus_queue__inc_next_cu_id(void)
@@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job *job)
 	/* JOB_STEAL have higher priority, add them to the head so
 	 * they can be found faster
 	 */
-	if (job->type == JOB_STEAL) {
+	if (job->type == JOB_STEAL)
 		list_add(&job->node, &cus_processing_queue.jobs);
-		cus_processing_queue.nr_decoded_cus++;
-	} else {
+	else
 		list_add_tail(&job->node, &cus_processing_queue.jobs);
-	}
 
 	pthread_cond_signal(&cus_processing_queue.job_added);
 	pthread_mutex_unlock(&cus_processing_queue.mutex);
@@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void)
 	struct list_head *pos, *tmp;
 
 	pthread_mutex_lock(&cus_processing_queue.mutex);
-	while (list_empty(&cus_processing_queue.jobs))
-		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
-
-	/* First, try to find JOB_STEAL for the next CU */
+retry:
 	list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
 		job = list_entry(pos, struct cu_processing_job, node);
 		if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
-			list_del(&job->node);
-			cus_processing_queue.nr_decoded_cus--;
 			dequeued_job = job;
 			break;
 		}
-	}
-
-	/* If no JOB_STEAL is found, check if we are allowed to decode
-	 * more CUs.  If not, it means that the CU with next_cu_id is
-	 * still being decoded while the queue is "full". Wait.
-	 * job_taken will signal that another thread was able to pick
-	 * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
-	 */
-	if (dequeued_job == NULL) {
-		while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
-			pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
-
-		/* We can decode now. */
-		list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
-			job = list_entry(pos, struct cu_processing_job, node);
-			if (job->type == JOB_DECODE) {
-				list_del(&job->node);
-				dequeued_job = job;
-				break;
-			}
+		if (job->type == JOB_DECODE) {
+			/* all JOB_STEALs are added to the head, so no viable JOB_STEAL available */
+			dequeued_job = job;
+			break;
 		}
 	}
-
-	pthread_cond_signal(&cus_processing_queue.job_taken);
+	/* No jobs or only steals out of order */
+	if (!dequeued_job) {
+		pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
+		goto retry;
+	}
+	list_del(&dequeued_job->node);
 	pthread_mutex_unlock(&cus_processing_queue.mutex);
 
-	return dequeued_job;
+	return job;
 }
 
 static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
@@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void *arg)
 				break;
 			}
 
-			/* Create and enqueue a new JOB_STEAL for this decoded CU */
-			struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
-
-			steal_job->type = JOB_STEAL;
-			steal_job->cu = cu;
-			cus_queue__enqueue_job(steal_job);
-
-			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
+			job->type = JOB_STEAL;
+			job->cu = cu;
 			cus_queue__enqueue_job(job);
 			break;
 
@@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg)
 			if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
 				goto out_abort;
 			cus_queue__inc_next_cu_id();
-			/* Free the job struct as it's no longer
-			 * needed after CU has been stolen.
-			 * dwarf_loader work for this CU is done.
-			 */
-			free(job);
+			/* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
+			job->type = JOB_DECODE;
+			job->cu = NULL;
+			cus_queue__enqueue_job(job);
 			break;
 
 		default:
@@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
 	pthread_t workers[nr_workers];
 	struct cu_processing_job *job;
 
-	cus_queue__init(nr_workers * 4);
+	cus_queue__init(nr_workers);
 
 	/* fill up the queue with nr_workers JOB_DECODE jobs */
-	for (int i = 0; i < nr_workers; i++) {
+	for (int i = 0; i < nr_workers * 4; i++) {
 		job = calloc(1, sizeof(*job));
 		job->type = JOB_DECODE;
 		/* no need for locks, workers were not started yet */






[Index of Archives]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux