Re: [RFC PATCH 2/3] run-commands: add an async queue processor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Stefan Beller <sbeller@xxxxxxxxxx> writes:

> diff --git a/builtin/index-pack.c b/builtin/index-pack.c
> index 3f10840..159ee36 100644
> --- a/builtin/index-pack.c
> +++ b/builtin/index-pack.c
> @@ -11,6 +11,7 @@
>  #include "exec_cmd.h"
>  #include "streaming.h"
>  #include "thread-utils.h"
> +#include "run-command.h"
>
>  static const char index_pack_usage[] =
>  "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
> [--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
> [<pack-file>])";
> @@ -1075,7 +1076,7 @@ static void resolve_base(struct object_entry *obj)
>  }
>
>  #ifndef NO_PTHREADS
> -static void *threaded_second_pass(void *data)
> +static int threaded_second_pass(struct task_queue *tq, void *data)
>  {
>         set_thread_data(data);
>         for (;;) {
> @@ -1096,7 +1097,7 @@ static void *threaded_second_pass(void *data)
>
>                 resolve_base(&objects[i]);
>         }
> -       return NULL;
> +       return 0;
>  }
>  #endif
>
> @@ -1195,18 +1196,18 @@ static void resolve_deltas(void)
>                                           nr_ref_deltas + nr_ofs_deltas);
>
>  #ifndef NO_PTHREADS
> -       nr_dispatched = 0;
>         if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
> +               nr_dispatched = 0;
>                 init_thread();
> -               for (i = 0; i < nr_threads; i++) {
> -                       int ret = pthread_create(&thread_data[i].thread, NULL,
> -                                                threaded_second_pass,
> thread_data + i);
> -                       if (ret)
> -                               die(_("unable to create thread: %s"),
> -                                   strerror(ret));
> -               }
> +
> +               tq = create_task_queue(nr_threads);
> +
>                 for (i = 0; i < nr_threads; i++)
> -                       pthread_join(thread_data[i].thread, NULL);
> +                       add_task(tq, threaded_second_pass, thread_data + i);
> +
> +               if (finish_task_queue(tq))
> +                       die("Not all threads have finished");
> +
>                 cleanup_thread();
>                 return;
>         }

This looks quite straight-forward, but that is not too surprising,
as the "dispatcher" side naturally should have a similar logic to
manage threads by creating and joining them ;-)

> @@ -1075,28 +1067,24 @@ static void resolve_base(struct object_entry *obj)
>  }
>
>  #ifndef NO_PTHREADS
> -static void *threaded_second_pass(void *data)
> +static int threaded_second_pass(struct task_queue *tq, void *data)
>  {
> - set_thread_data(data);
> - for (;;) {
> - int i;
> - counter_lock();
> - display_progress(progress, nr_resolved_deltas);
> - counter_unlock();
> - work_lock();
> - while (nr_dispatched < nr_objects &&
> -       is_delta_type(objects[nr_dispatched].type))
> - nr_dispatched++;
> - if (nr_dispatched >= nr_objects) {
> - work_unlock();
> - break;
> - }
> - i = nr_dispatched++;
> - work_unlock();
> + if (!get_thread_data()) {
> + struct thread_local *t = xmalloc(sizeof(*t));
> + t->pack_fd = open(curr_pack, O_RDONLY);
> + if (t->pack_fd == -1)
> + die_errno(_("unable to open %s"), curr_pack);
>
> - resolve_base(&objects[i]);
> + set_thread_data(t);
> + /* TODO: I haven't figured out where to free this memory */

Sorry but it is hard to grok what is going on in the code with
squished indentation.

> Why did I not pick upload-pack?
> ========================
>
> I could not spot easily how to make it a typical queuing problem.
> We start in the threads, and once in a while we're like: "Uhg, this
> thread has more load than the other, let's shove a bit over there"
>
> So what we would need there is splitting the work in smallest chunks
> from the beginning and just load it into the queue via add_task

... or a way for the overload and tasks to communicate with each
other and rebalance.  If I am not mistaken, it has a big negative
consequence for pack-objects to split the work to too small a chunk,
as the chunk boundary also becomes boundary of find delta bases.

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]