On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote: [...] > +static void *dwarf_loader__worker_thread(void *arg) > +{ > + struct cu_processing_job *job; > + struct dwarf_cus *dcus = arg; > + bool stop = false; > + struct cu *cu; > + > + while (!stop) { > + job = cus_queue__dequeue_job(); > + > + switch (job->type) { > + > + case JOB_DECODE: > + cu = dwarf_loader__decode_next_cu(dcus); > + > + if (cu == NULL) { > + free(job); > + stop = true; > + break; > + } > + > + /* Create and enqueue a new JOB_STEAL for this decoded CU */ > + struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job)); > + > + steal_job->type = JOB_STEAL; > + steal_job->cu = cu; > + cus_queue__enqueue_job(steal_job); > + > + /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */ > + cus_queue__enqueue_job(job); > + break; > + > + case JOB_STEAL: > + if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING) > + goto out_abort; > + cus_queue__inc_next_cu_id(); > + /* Free the job struct as it's no longer > + * needed after CU has been stolen. > + * dwarf_loader work for this CU is done. > + */ > + free(job); > break; > > - if (dwarf_cus__process_cu(dcus, cu_die, dcu->cu, dthr->data) == DWARF_CB_ABORT) > + default: > + fprintf(stderr, "Unknown dwarf_loader job type %d\n", job->type); > goto out_abort; > + } > } > > - if (dcus->conf->thread_exit && > - dcus->conf->thread_exit(dcus->conf, dthr->data) != 0) > + if (dcus->error) > goto out_abort; > > return (void *)DWARF_CB_OK; > @@ -3566,29 +3736,29 @@ out_abort: > return (void *)DWARF_CB_ABORT; > } There is no real need to use two conditional variables to achieve what is done here. The "JOB_DECODE" item is already used as a "ticket" to do the decoding. So it is possible to "emit" a fixed amount of tickets and alternate their state between "decode"/"steal", w/o allocating new tickets. This would allow to remove "job_taken" conditional variable and decode counters. E.g. as in the patch below applied on top of this patch-set. --- diff --git a/dwarf_loader.c b/dwarf_loader.c index 6d22648..40ad27d 100644 --- a/dwarf_loader.c +++ b/dwarf_loader.c @@ -3453,23 +3453,10 @@ struct dwarf_cus { static struct { pthread_mutex_t mutex; pthread_cond_t job_added; - pthread_cond_t job_taken; /* next_cu_id determines the next CU ready to be stealed * This enforces the order of CU stealing. */ uint32_t next_cu_id; - /* max_decoded_cus is a soft limit on the number of JOB_STEAL - * jobs currently in the queue (this number is equal to the - * number of decoded CUs held in memory). It's soft, because a - * worker thread may finish decoding it's current CU after - * this limit has already been reached. In such situation, - * JOB_STEAL with this CU is still added to the queue, - * although a worker will not pick up a new JOB_DECODE. - * So the real hard limit is max_decoded_cus + nr_workers. - * This variable indirectly limits the memory usage. - */ - uint16_t max_decoded_cus; - uint16_t nr_decoded_cus; struct list_head jobs; } cus_processing_queue; @@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus) { pthread_mutex_init(&cus_processing_queue.mutex, NULL); pthread_cond_init(&cus_processing_queue.job_added, NULL); - pthread_cond_init(&cus_processing_queue.job_taken, NULL); INIT_LIST_HEAD(&cus_processing_queue.jobs); - cus_processing_queue.max_decoded_cus = max_decoded_cus; - cus_processing_queue.nr_decoded_cus = 0; cus_processing_queue.next_cu_id = 0; } @@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void) { pthread_mutex_destroy(&cus_processing_queue.mutex); pthread_cond_destroy(&cus_processing_queue.job_added); - pthread_cond_destroy(&cus_processing_queue.job_taken); } static inline void cus_queue__inc_next_cu_id(void) @@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job *job) /* JOB_STEAL have higher priority, add them to the head so * they can be found faster */ - if (job->type == JOB_STEAL) { + if (job->type == JOB_STEAL) list_add(&job->node, &cus_processing_queue.jobs); - cus_processing_queue.nr_decoded_cus++; - } else { + else list_add_tail(&job->node, &cus_processing_queue.jobs); - } pthread_cond_signal(&cus_processing_queue.job_added); pthread_mutex_unlock(&cus_processing_queue.mutex); @@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void) struct list_head *pos, *tmp; pthread_mutex_lock(&cus_processing_queue.mutex); - while (list_empty(&cus_processing_queue.jobs)) - pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex); - - /* First, try to find JOB_STEAL for the next CU */ +retry: list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) { job = list_entry(pos, struct cu_processing_job, node); if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) { - list_del(&job->node); - cus_processing_queue.nr_decoded_cus--; dequeued_job = job; break; } - } - - /* If no JOB_STEAL is found, check if we are allowed to decode - * more CUs. If not, it means that the CU with next_cu_id is - * still being decoded while the queue is "full". Wait. - * job_taken will signal that another thread was able to pick - * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE. - */ - if (dequeued_job == NULL) { - while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus) - pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex); - - /* We can decode now. */ - list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) { - job = list_entry(pos, struct cu_processing_job, node); - if (job->type == JOB_DECODE) { - list_del(&job->node); - dequeued_job = job; - break; - } + if (job->type == JOB_DECODE) { + /* all JOB_STEALs are added to the head, so no viable JOB_STEAL available */ + dequeued_job = job; + break; } } - - pthread_cond_signal(&cus_processing_queue.job_taken); + /* No jobs or only steals out of order */ + if (!dequeued_job) { + pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex); + goto retry; + } + list_del(&dequeued_job->node); pthread_mutex_unlock(&cus_processing_queue.mutex); - return dequeued_job; + return job; } static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size) @@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void *arg) break; } - /* Create and enqueue a new JOB_STEAL for this decoded CU */ - struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job)); - - steal_job->type = JOB_STEAL; - steal_job->cu = cu; - cus_queue__enqueue_job(steal_job); - - /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */ + job->type = JOB_STEAL; + job->cu = cu; cus_queue__enqueue_job(job); break; @@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg) if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING) goto out_abort; cus_queue__inc_next_cu_id(); - /* Free the job struct as it's no longer - * needed after CU has been stolen. - * dwarf_loader work for this CU is done. - */ - free(job); + /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */ + job->type = JOB_DECODE; + job->cu = NULL; + cus_queue__enqueue_job(job); break; default: @@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus) pthread_t workers[nr_workers]; struct cu_processing_job *job; - cus_queue__init(nr_workers * 4); + cus_queue__init(nr_workers); /* fill up the queue with nr_workers JOB_DECODE jobs */ - for (int i = 0; i < nr_workers; i++) { + for (int i = 0; i < nr_workers * 4; i++) { job = calloc(1, sizeof(*job)); job->type = JOB_DECODE; /* no need for locks, workers were not started yet */