Re: [PATCH dwarves v2 10/10] dwarf_loader: multithreading with a job/worker model

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Monday, December 16th, 2024 at 4:57 PM, Eduard Zingerman <eddyz87@xxxxxxxxx> wrote:

> 
> 
> On Fri, 2024-12-13 at 22:37 +0000, Ihor Solodrai wrote:
> 
> [...]
> 
> There is no real need to use two conditional variables to achieve what is done here.
> The "JOB_DECODE" item is already used as a "ticket" to do the decoding.
> So it is possible to "emit" a fixed amount of tickets and alternate their state
> between "decode"/"steal", w/o allocating new tickets.
> This would allow to remove "job_taken" conditional variable and decode counters.
> E.g. as in the patch below applied on top of this patch-set.

Your suggestion makes sense, I haven't thought about utilizing jobs as
"tickets". This simplifies synchronization. 

I'll incorporate this in the next version.

Thank you!

> 
> ---
> 
> diff --git a/dwarf_loader.c b/dwarf_loader.c
> index 6d22648..40ad27d 100644
> --- a/dwarf_loader.c
> +++ b/dwarf_loader.c
> @@ -3453,23 +3453,10 @@ struct dwarf_cus {
> static struct {
> pthread_mutex_t mutex;
> pthread_cond_t job_added;
> - pthread_cond_t job_taken;
> /* next_cu_id determines the next CU ready to be stealed
> * This enforces the order of CU stealing.
> /
> uint32_t next_cu_id;
> - / max_decoded_cus is a soft limit on the number of JOB_STEAL
> - * jobs currently in the queue (this number is equal to the
> - * number of decoded CUs held in memory). It's soft, because a
> - * worker thread may finish decoding it's current CU after
> - * this limit has already been reached. In such situation,
> - * JOB_STEAL with this CU is still added to the queue,
> - * although a worker will not pick up a new JOB_DECODE.
> - * So the real hard limit is max_decoded_cus + nr_workers.
> - * This variable indirectly limits the memory usage.
> - */
> - uint16_t max_decoded_cus;
> - uint16_t nr_decoded_cus;
> struct list_head jobs;
> } cus_processing_queue;
> 
> @@ -3489,10 +3476,7 @@ static void cus_queue__init(uint16_t max_decoded_cus)
> {
> pthread_mutex_init(&cus_processing_queue.mutex, NULL);
> pthread_cond_init(&cus_processing_queue.job_added, NULL);
> - pthread_cond_init(&cus_processing_queue.job_taken, NULL);
> INIT_LIST_HEAD(&cus_processing_queue.jobs);
> - cus_processing_queue.max_decoded_cus = max_decoded_cus;
> - cus_processing_queue.nr_decoded_cus = 0;
> cus_processing_queue.next_cu_id = 0;
> }
> 
> @@ -3500,7 +3484,6 @@ static void cus_queue__destroy(void)
> {
> pthread_mutex_destroy(&cus_processing_queue.mutex);
> pthread_cond_destroy(&cus_processing_queue.job_added);
> - pthread_cond_destroy(&cus_processing_queue.job_taken);
> }
> 
> static inline void cus_queue__inc_next_cu_id(void)
> @@ -3520,12 +3503,10 @@ static void cus_queue__enqueue_job(struct cu_processing_job job)
> / JOB_STEAL have higher priority, add them to the head so
> * they can be found faster
> */
> - if (job->type == JOB_STEAL) {
> 
> + if (job->type == JOB_STEAL)
> 
> list_add(&job->node, &cus_processing_queue.jobs);
> 
> - cus_processing_queue.nr_decoded_cus++;
> - } else {
> + else
> list_add_tail(&job->node, &cus_processing_queue.jobs);
> 
> - }
> 
> pthread_cond_signal(&cus_processing_queue.job_added);
> pthread_mutex_unlock(&cus_processing_queue.mutex);
> @@ -3537,45 +3518,28 @@ static struct cu_processing_job *cus_queue__dequeue_job(void)
> struct list_head *pos, tmp;
> 
> pthread_mutex_lock(&cus_processing_queue.mutex);
> - while (list_empty(&cus_processing_queue.jobs))
> - pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> -
> - / First, try to find JOB_STEAL for the next CU */
> +retry:
> list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
> job = list_entry(pos, struct cu_processing_job, node);
> if (job->type == JOB_STEAL && job->cu->id == cus_processing_queue.next_cu_id) {
> 
> - list_del(&job->node);
> 
> - cus_processing_queue.nr_decoded_cus--;
> dequeued_job = job;
> break;
> }
> - }
> -
> - /* If no JOB_STEAL is found, check if we are allowed to decode
> - * more CUs. If not, it means that the CU with next_cu_id is
> - * still being decoded while the queue is "full". Wait.
> - * job_taken will signal that another thread was able to pick
> - * up a JOB_STEAL, so we might be able to proceed with JOB_DECODE.
> - */
> - if (dequeued_job == NULL) {
> - while (cus_processing_queue.nr_decoded_cus >= cus_processing_queue.max_decoded_cus)
> 
> - pthread_cond_wait(&cus_processing_queue.job_taken, &cus_processing_queue.mutex);
> -
> - /* We can decode now. */
> - list_for_each_safe(pos, tmp, &cus_processing_queue.jobs) {
> - job = list_entry(pos, struct cu_processing_job, node);
> - if (job->type == JOB_DECODE) {
> 
> - list_del(&job->node);
> 
> - dequeued_job = job;
> - break;
> - }
> + if (job->type == JOB_DECODE) {
> 
> + /* all JOB_STEALs are added to the head, so no viable JOB_STEAL available /
> + dequeued_job = job;
> + break;
> }
> }
> -
> - pthread_cond_signal(&cus_processing_queue.job_taken);
> + / No jobs or only steals out of order */
> + if (!dequeued_job) {
> + pthread_cond_wait(&cus_processing_queue.job_added, &cus_processing_queue.mutex);
> + goto retry;
> + }
> + list_del(&dequeued_job->node);
> 
> pthread_mutex_unlock(&cus_processing_queue.mutex);
> 
> - return dequeued_job;
> + return job;
> }
> 
> static struct dwarf_cu *dwarf_cus__create_cu(struct dwarf_cus *dcus, Dwarf_Die *cu_die, uint8_t pointer_size)
> @@ -3700,14 +3664,8 @@ static void *dwarf_loader__worker_thread(void arg)
> break;
> }
> 
> - / Create and enqueue a new JOB_STEAL for this decoded CU */
> - struct cu_processing_job *steal_job = calloc(1, sizeof(*steal_job));
> -
> - steal_job->type = JOB_STEAL;
> 
> - steal_job->cu = cu;
> 
> - cus_queue__enqueue_job(steal_job);
> -
> - /* re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> + job->type = JOB_STEAL;
> 
> + job->cu = cu;
> 
> cus_queue__enqueue_job(job);
> break;
> 
> @@ -3715,11 +3673,10 @@ static void *dwarf_loader__worker_thread(void *arg)
> if (cus__steal_now(dcus->cus, job->cu, dcus->conf) == LSK__STOP_LOADING)
> 
> goto out_abort;
> cus_queue__inc_next_cu_id();
> - /* Free the job struct as it's no longer
> - * needed after CU has been stolen.
> - * dwarf_loader work for this CU is done.
> - /
> - free(job);
> + / re-enqueue JOB_DECODE so that next CU is decoded from DWARF */
> + job->type = JOB_DECODE;
> 
> + job->cu = NULL;
> 
> + cus_queue__enqueue_job(job);
> break;
> 
> default:
> @@ -3742,10 +3699,10 @@ static int dwarf_cus__process_cus(struct dwarf_cus *dcus)
> pthread_t workers[nr_workers];
> struct cu_processing_job job;
> 
> - cus_queue__init(nr_workers * 4);
> + cus_queue__init(nr_workers);
> 
> / fill up the queue with nr_workers JOB_DECODE jobs */
> - for (int i = 0; i < nr_workers; i++) {
> + for (int i = 0; i < nr_workers * 4; i++) {
> job = calloc(1, sizeof(*job));
> job->type = JOB_DECODE;
> 
> /* no need for locks, workers were not started yet */





[Index of Archives]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux