On Monday, December 16th, 2024 at 4:57 PM, Eduard Zingerman <eddyz87@xxxxxxxxx> wrote: > > > On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote: > > [...] > > There is no real need to use two conditional variables to achieve what is done here. > The "JOB_DECODE" item is already used as a "ticket" to do the decoding. > So it is possible to "emit" a fixed amount of tickets and alternate their state > between "decode"/"steal", w/o allocating new tickets. > This would allow to remove "job_taken" conditional variable and decode counters. > E.g. as in the patch below applied on top of this patch-set. Your suggestion makes sense, I haven't thought about utilizing jobs as "tickets". This simplifies synchronization. I'll incorporate this in the next version. Thank you! > > --- > > diff --git a/dwarf_loader.c b/dwarf_loader.c > index 6d22648..40ad27d 100644 > --- a/dwarf_loader.c > +++ b/dwarf_loader.c > @@ -3453,23 +3453,10 @@ struct dwarf_cus { > static struct { > pthread_mutex_t mutex; > pthread_cond_t job_added; > - pthread_cond_t job_taken; > /* next_cu_id determines the next CU ready to be stealed > * This enforces the order of CU stealing. > / > uint32_t next_cu_id; > - / max_decoded_cus is a soft limit on the number of JOB_STEAL > - * jobs currently in the queue (this number is equal to the > - * number of decoded CUs held in memory). It's soft, because a > - * worker thread may finish decoding it's current CU after > - * this limit has already been reached. In such situation, > - * JOB_STEAL with this CU is still added to the queue, > - * although a worker will not pick up a new JOB_DECODE. > - * So the real hard limit is max_decoded_cus + nr_workers. > - * This variable indirectly limits the memory usage. > - */ > - uint16_t max_decoded_cus; > - uint16_t nr_decoded_cus; > struct list_head jobs; > } cus_processing_queue; > > @@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus) > { > pthread_mutex_init(&cus_processing_queue.mutex, NULL); > pthread_cond_init(&cus_processing_queue.job_added, NULL); > - pthread_cond_init(&cus_processing_queue.job_taken, NULL); > INIT_LIST_HEAD(&cus_processing_queue.jobs); > - cus_processing_queue.max_decoded_cus = max_decoded_cus; > - cus_processing_queue.nr_decoded_cus = 0; > cus_processing_queue.next_cu_id = 0; > } > > @@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void) > { > pthread_mutex_destroy(&cus_processing_queue.mutex); > pthread_cond_destroy(&cus_processing_queue.job_added); > - pthread_cond_destroy(&cus_processing_queue.job_taken); > } > > static inline void cus_queue__inc_next_cu_id(void) > @@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job job) > / JOB_STEAL have higher priority, add them to the head so > * they can be found faster > */ > - if (job->type == JOB_STEAL) { > > + if (job->type == JOB_STEAL) > > list_add(&job->node, &cus_processing_queue.jobs); > > - cus_processing_queue.nr_decoded_cus++; > - } else { > + else > list_add_tail(&job->node, &cus_processing_queue.jobs); > > - } > > pthread_cond_signal(&cus_processing_queue.job_added); > pthread_mutex_unlock(&cus_processing_queue.mutex); > @@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void) > struct list_head *pos, tmp; > > pthread_mutex_lock(&cus_processing_queue.mutex); > - while (list_empty(&cus_processing_queue.jobs)) > - pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex); > - > - / First, try to find JOB_STEAL for the next CU */ > +retry: > list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) { > job = list_entry(pos, struct cu_processing_job, node); > if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) { > > - list_del(&job->node); > > - cus_processing_queue.nr_decoded_cus--; > dequeued_job = job; > break; > } > - } > - > - /* If no JOB_STEAL is found, check if we are allowed to decode > - * more CUs. If not, it means that the CU with next_cu_id is > - * still being decoded while the queue is "full". Wait. > - * job_taken will signal that another thread was able to pick > - * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE. > - */ > - if (dequeued_job == NULL) { > - while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus) > > - pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex); > - > - /* We can decode now. */ > - list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) { > - job = list_entry(pos, struct cu_processing_job, node); > - if (job->type == JOB_DECODE) { > > - list_del(&job->node); > > - dequeued_job = job; > - break; > - } > + if (job->type == JOB_DECODE) { > > + /* all JOB_STEALs are added to the head, so no viable JOB_STEAL available / > + dequeued_job = job; > + break; > } > } > - > - pthread_cond_signal(&cus_processing_queue.job_taken); > + / No jobs or only steals out of order */ > + if (!dequeued_job) { > + pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex); > + goto retry; > + } > + list_del(&dequeued_job->node); > > pthread_mutex_unlock(&cus_processing_queue.mutex); > > - return dequeued_job; > + return job; > } > > static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size) > @@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void arg) > break; > } > > - / Create and enqueue a new JOB_STEAL for this decoded CU */ > - struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job)); > - > - steal_job->type = JOB_STEAL; > > - steal_job->cu = cu; > > - cus_queue__enqueue_job(steal_job); > - > - /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */ > + job->type = JOB_STEAL; > > + job->cu = cu; > > cus_queue__enqueue_job(job); > break; > > @@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg) > if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING) > > goto out_abort; > cus_queue__inc_next_cu_id(); > - /* Free the job struct as it's no longer > - * needed after CU has been stolen. > - * dwarf_loader work for this CU is done. > - / > - free(job); > + / re-enqueue JOB_DECODE so that next CU is decoded from DWARF */ > + job->type = JOB_DECODE; > > + job->cu = NULL; > > + cus_queue__enqueue_job(job); > break; > > default: > @@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus) > pthread_t workers[nr_workers]; > struct cu_processing_job job; > > - cus_queue__init(nr_workers * 4); > + cus_queue__init(nr_workers); > > / fill up the queue with nr_workers JOB_DECODE jobs */ > - for (int i = 0; i < nr_workers; i++) { > + for (int i = 0; i < nr_workers * 4; i++) { > job = calloc(1, sizeof(*job)); > job->type = JOB_DECODE; > > /* no need for locks, workers were not started yet */