Re: Parallel checkout (Was Re: 0 bot for Git)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Apr 15, 2016 at 2:51 AM, Duy Nguyen <pclouds@xxxxxxxxx> wrote:
> On Fri, Apr 15, 2016 at 12:04:49AM +0200, Christian Couder wrote:
>> On Tue, Apr 12, 2016 at 4:59 PM, Stefan Beller <sbeller@xxxxxxxxxx> wrote:
>> > On Tue, Apr 12, 2016 at 2:42 AM, Duy Nguyen <pclouds@xxxxxxxxx> wrote:
>> >>> On Mon, Apr 11, 2016 at 7:51 AM, Stefan Beller <sbeller@xxxxxxxxxx> wrote:
>> >>>> Hi Greg,
>> >>>>
>> >>>> Thanks for your talk at the Git Merge 2016!
>> >>
>> >> Huh? It already happened?? Any interesting summary to share with us?
>>
>> There is a draft of an article about the first part of the Contributor
>> Summit in the draft of the next Git Rev News edition:
>>
>> https://github.com/git/git.github.io/blob/master/rev_news/drafts/edition-14.md
>
> Thanks. I read the sentence "This made people mention potential
> problems with parallelizing git checkout" and wondered what these
> problems were. And because it's easier to think while you test
> something, to flesh out your thoughts, I wrote the below patch, which
> does parallel checkout with multiple worker processes. I wonder if the
> same set of problems apply to it.

I mentioned it, as Jonathan mentioned it a while ago. (So I was just
referring to hear say).

>
> The idea is simple, you offload some work to process workers. In this
> patch, only entry.c:write_entry() is moved to workers. We still do
> directory creation and all sort of checks and stat refresh in the main
> process. Some more work may be moved away, for example, the entire
> builtin/checkout.c:checkout_merged().
>
> Multi process is less efficient than multi thread model. But I doubt
> we could make object db access thread-safe soon. The last discussion
> was 2 years ago [1] and nothing much has happened.
>
> Numbers are encouraging though. On linux-2.6 repo running on linux and
> ext4 filesystem, checkout_paths() would dominate "git checkout :/".
> Unmodified git takes about 31s.

Please also benchmark "make build" or another read heavy operation
with these 2 different checkouts. IIRC that was the problem. (checkout
improved, but due to file ordering on the fs, the operation afterwards
slowed down, such that it became a net negative)

>
>
> 16:26:00.114029 builtin/checkout.c:1299 performance: 31.184973659 s: checkout_paths
> 16:26:00.114225 trace.c:420             performance: 31.256412935 s: git command: 'git' 'checkout' '.'
>
> When doing write_entry() on 8 processes, it takes 22s (shortened by ~30%)
>
> 16:27:39.973730 trace.c:420             performance: 5.610255442 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:40.956812 trace.c:420             performance: 6.595082013 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:41.397621 trace.c:420             performance: 7.032024175 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:47.453999 trace.c:420             performance: 13.078537207 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:48.986433 trace.c:420             performance: 14.612951643 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:53.149378 trace.c:420             performance: 18.781762536 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:54.884044 trace.c:420             performance: 20.514473730 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:55.319990 trace.c:420             performance: 20.948326263 s: git command: 'git' 'checkout-index' '--worker'
> 16:27:55.863211 builtin/checkout.c:1299 performance: 22.723118420 s: checkout_paths
> 16:27:55.863398 trace.c:420             performance: 22.854547640 s: git command: 'git' 'checkout' '--parallel' '.'
>
> I suspect on nfs or windows, the gain may be higher due to IO blocking
> the main process more.
>
> Note that this for-fun patch is not optmized at all (and definitely
> not portable). I could have sent a group of paths to the worker in a
> single system call instead of one per call. The trace above also shows
> unbalance issues with workers, where some workers exit early because
> of my naive work distribution. Numbers could get a bit better.
>
> [1] http://thread.gmane.org/gmane.comp.version-control.git/241965/focus=242020

Would it make sense to use the parallel processing infrastructure from
run-command.h
instead of doing all setup and teardown yourself?
(As you call it for-fun patch, I'd assume the answer is: Writing code
is more fun than
using other peoples code ;)


>
> -- 8< --
> diff --git a/builtin/checkout-index.c b/builtin/checkout-index.c
> index 92c6967..7163216 100644
> --- a/builtin/checkout-index.c
> +++ b/builtin/checkout-index.c
> @@ -9,6 +9,7 @@
>  #include "quote.h"
>  #include "cache-tree.h"
>  #include "parse-options.h"
> +#include "entry.h"
>
>  #define CHECKOUT_ALL 4
>  static int nul_term_line;
> @@ -179,6 +180,9 @@ int cmd_checkout_index(int argc, const char **argv, const char *prefix)
>                 OPT_END()
>         };
>
> +       if (argc == 2 && !strcmp(argv[1], "--worker"))
> +               return parallel_checkout_worker();
> +
>         if (argc == 2 && !strcmp(argv[1], "-h"))
>                 usage_with_options(builtin_checkout_index_usage,
>                                    builtin_checkout_index_options);
> diff --git a/builtin/checkout.c b/builtin/checkout.c
> index efcbd8f..51caad2 100644
> --- a/builtin/checkout.c
> +++ b/builtin/checkout.c
> @@ -20,6 +20,7 @@
>  #include "resolve-undo.h"
>  #include "submodule-config.h"
>  #include "submodule.h"
> +#include "entry.h"
>
>  static const char * const checkout_usage[] = {
>         N_("git checkout [<options>] <branch>"),
> @@ -236,7 +237,8 @@ static int checkout_merged(int pos, struct checkout *state)
>  }
>
>  static int checkout_paths(const struct checkout_opts *opts,
> -                         const char *revision)
> +                         const char *revision,
> +                         int parallel)
>  {
>         int pos;
>         struct checkout state;
> @@ -357,6 +359,8 @@ static int checkout_paths(const struct checkout_opts *opts,
>         state.force = 1;
>         state.refresh_cache = 1;
>         state.istate = &the_index;
> +       if (parallel)
> +               start_parallel_checkout(&state);
>         for (pos = 0; pos < active_nr; pos++) {
>                 struct cache_entry *ce = active_cache[pos];
>                 if (ce->ce_flags & CE_MATCHED) {
> @@ -367,11 +371,18 @@ static int checkout_paths(const struct checkout_opts *opts,
>                         if (opts->writeout_stage)
>                                 errs |= checkout_stage(opts->writeout_stage, ce, pos, &state);
>                         else if (opts->merge)
> +                               /*
> +                                * XXX: in parallel mode, we may want
> +                                * to let worker perform the merging
> +                                * instead and send SHA-1 result back
> +                                */
>                                 errs |= checkout_merged(pos, &state);
>                         pos = skip_same_name(ce, pos) - 1;
>                 }
>         }
>
> +       errs |= run_parallel_checkout();
> +
>         if (write_locked_index(&the_index, lock_file, COMMIT_LOCK))
>                 die(_("unable to write new index file"));
>
> @@ -1132,6 +1143,7 @@ int cmd_checkout(int argc, const char **argv, const char *prefix)
>         struct branch_info new;
>         char *conflict_style = NULL;
>         int dwim_new_local_branch = 1;
> +       int parallel = 0;
>         struct option options[] = {
>                 OPT__QUIET(&opts.quiet, N_("suppress progress reporting")),
>                 OPT_STRING('b', NULL, &opts.new_branch, N_("branch"),
> @@ -1159,6 +1171,8 @@ int cmd_checkout(int argc, const char **argv, const char *prefix)
>                                 N_("second guess 'git checkout <no-such-branch>'")),
>                 OPT_BOOL(0, "ignore-other-worktrees", &opts.ignore_other_worktrees,
>                          N_("do not check if another worktree is holding the given ref")),
> +               OPT_BOOL(0, "parallel", &parallel,
> +                        N_("parallel checkout")),
>                 OPT_BOOL(0, "progress", &opts.show_progress, N_("force progress reporting")),
>                 OPT_END(),
>         };
> @@ -1279,8 +1293,12 @@ int cmd_checkout(int argc, const char **argv, const char *prefix)
>                 strbuf_release(&buf);
>         }
>
> -       if (opts.patch_mode || opts.pathspec.nr)
> -               return checkout_paths(&opts, new.name);
> +       if (opts.patch_mode || opts.pathspec.nr) {
> +               uint64_t start = getnanotime();
> +               int ret = checkout_paths(&opts, new.name, parallel);
> +               trace_performance_since(start, "checkout_paths");
> +               return ret;
> +       }
>         else
>                 return checkout_branch(&opts, &new);
>  }
> diff --git a/entry.c b/entry.c
> index a410957..5e0eb1c 100644
> --- a/entry.c
> +++ b/entry.c
> @@ -3,6 +3,36 @@
>  #include "dir.h"
>  #include "streaming.h"
>
> +#include <sys/epoll.h>
> +#include "pkt-line.h"
> +#include "argv-array.h"
> +#include "run-command.h"
> +
> +struct checkout_item {
> +       struct cache_entry *ce;
> +       struct checkout_item *next;
> +};
> +
> +struct checkout_worker {
> +       struct child_process cp;
> +       struct checkout_item *to_complete;
> +       struct checkout_item *to_send;
> +};
> +
> +struct parallel_checkout {
> +       struct checkout state;
> +       struct checkout_worker *workers;
> +       struct checkout_item *items;
> +       int nr_items, alloc_items;
> +       int nr_workers;
> +};
> +
> +static struct parallel_checkout *parallel_checkout;
> +
> +static int queue_checkout(struct parallel_checkout *,
> +                         const struct checkout *,
> +                         struct cache_entry *);
> +
>  static void create_directories(const char *path, int path_len,
>                                const struct checkout *state)
>  {
> @@ -290,5 +320,299 @@ int checkout_entry(struct cache_entry *ce,
>                 return 0;
>
>         create_directories(path.buf, path.len, state);
> +
> +       if (!queue_checkout(parallel_checkout, state, ce))
> +               /*
> +                * write_entry() will be done by parallel_checkout_worker() in
> +                * a separate process
> +                */
> +               return 0;
> +
>         return write_entry(ce, path.buf, state, 0);
>  }
> +
> +int start_parallel_checkout(const struct checkout *state)
> +{
> +       if (parallel_checkout)
> +               die("BUG: parallel checkout already initiated");
> +       if (0 && state->force)
> +               die("BUG: not support --force yet");
> +       parallel_checkout = xmalloc(sizeof(*parallel_checkout));
> +       memset(parallel_checkout, 0, sizeof(*parallel_checkout));
> +       memcpy(&parallel_checkout->state, state, sizeof(*state));
> +
> +       return 0;
> +}
> +
> +static int queue_checkout(struct parallel_checkout *pc,
> +                         const struct checkout *state,
> +                         struct cache_entry *ce)
> +{
> +       struct checkout_item *ci;
> +
> +       if (!pc ||
> +           !S_ISREG(ce->ce_mode) ||
> +           memcmp(&pc->state, state, sizeof(*state)))
> +               return -1;
> +
> +       ALLOC_GROW(pc->items, pc->nr_items + 1, pc->alloc_items);
> +       ci = pc->items + pc->nr_items++;
> +       ci->ce = ce;
> +       return 0;
> +}
> +
> +static int item_cmp(const void *a_, const void *b_)
> +{
> +       const struct checkout_item *a = a_;
> +       const struct checkout_item *b = b_;
> +       return strcmp(a->ce->name, b->ce->name);
> +}
> +
> +static int setup_workers(struct parallel_checkout *pc, int epoll_fd)
> +{
> +       int from, nr_per_worker, i;
> +
> +       pc->workers = xmalloc(sizeof(*pc->workers) * pc->nr_workers);
> +       memset(pc->workers, 0, sizeof(*pc->workers) * pc->nr_workers);
> +
> +       nr_per_worker = pc->nr_items / pc->nr_workers;
> +       from = 0;
> +
> +       for (i = 0; i < pc->nr_workers; i++) {
> +               struct checkout_worker *worker = pc->workers + i;
> +               struct child_process *cp = &worker->cp;
> +               struct checkout_item *item;
> +               struct epoll_event ev;
> +               int to;
> +
> +               to = from + nr_per_worker;
> +               if (i == pc->nr_workers - 1)
> +                       to = pc->nr_items;
> +               item = NULL;
> +               while (from < to) {
> +                       pc->items[from].next = item;
> +                       item = pc->items + from;
> +                       from++;
> +               }
> +               worker->to_send = item;
> +               worker->to_complete = item;
> +
> +               cp->git_cmd = 1;
> +               cp->in = -1;
> +               cp->out = -1;
> +               argv_array_push(&cp->args, "checkout-index");
> +               argv_array_push(&cp->args, "--worker");
> +               if (start_command(cp))
> +                       die(_("failed to run checkout worker"));
> +
> +               ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
> +               ev.data.u32 = i * 2;
> +               if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, cp->in, &ev) == -1)
> +                       die_errno("epoll_ctl");
> +
> +               ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
> +               ev.data.u32 = i * 2 + 1;
> +               if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, cp->out, &ev) == -1)
> +                       die_errno("epoll_ctl");
> +       }
> +       return 0;
> +}
> +
> +static int send_to_worker(struct checkout_worker *worker, int epoll_fd)
> +{
> +       if (!worker->to_send) {
> +               struct epoll_event ev;
> +
> +               packet_flush(worker->cp.in);
> +
> +               epoll_ctl(epoll_fd, EPOLL_CTL_DEL, worker->cp.in, &ev);
> +               close(worker->cp.in);
> +               worker->cp.in = -1;
> +               return 0;
> +       }
> +
> +       /*
> +        * XXX: put the fd in non-blocking mode and send as many files
> +        * as possible in one go.
> +        */
> +       packet_write(worker->cp.in, "%s %s",
> +                    sha1_to_hex(worker->to_send->ce->sha1),
> +                    worker->to_send->ce->name);
> +       worker->to_send = worker->to_send->next;
> +       return 0;
> +}
> +
> +int parallel_checkout_worker(void)
> +{
> +       struct checkout state;
> +       struct cache_entry *ce = NULL;
> +
> +       memset(&state, 0, sizeof(state));
> +       /* FIXME: pass 'force' over */
> +       for (;;) {
> +               int len;
> +               unsigned char sha1[20];
> +               char *line = packet_read_line(0, &len);
> +
> +               if (!line)
> +                       return 0;
> +
> +               if (len < 40)
> +                       return 1;
> +               if (get_sha1_hex(line, sha1))
> +                       return 1;
> +               line += 40;
> +               len -= 40;
> +               if (*line != ' ')
> +                       return 1;
> +               line++;
> +               len--;
> +               if (!ce || ce_namelen(ce) < len) {
> +                       free(ce);
> +                       ce = xcalloc(1, cache_entry_size(len));
> +                       ce->ce_mode = S_IFREG | ce_permissions(0644);
> +               }
> +               ce->ce_namelen = len;
> +               hashcpy(ce->sha1, sha1);
> +               memcpy(ce->name, line, len + 1);
> +
> +               if (write_entry(ce, ce->name, &state, 0))
> +                       return 1;
> +               /*
> +                * XXX process in batch and send bigger number of
> +                * checked out entries back
> +                */
> +               packet_write(1, "1");
> +       }
> +}
> +
> +static int receive_from_worker(struct checkout_worker *worker,
> +                              int refresh_cache)
> +{
> +       int len, val;
> +       char *line;
> +
> +       line = packet_read_line(worker->cp.out, &len);
> +       val = atoi(line);
> +       if (val <= 0)
> +               die("BUG: invalid value");
> +       while (val && worker->to_complete &&
> +              worker->to_complete != worker->to_send) {
> +               if (refresh_cache) {
> +                       struct stat st;
> +                       struct cache_entry *ce = worker->to_complete->ce;
> +
> +                       lstat(ce->name, &st);
> +                       fill_stat_cache_info(ce, &st);
> +                       ce->ce_flags |= CE_UPDATE_IN_BASE;
> +               }
> +               worker->to_complete = worker->to_complete->next;
> +               val--;
> +       }
> +       if (val)
> +               die("BUG: invalid value");
> +       return 0;
> +}
> +
> +static int finish_worker(struct checkout_worker *worker, int epoll_fd)
> +{
> +       struct epoll_event ev;
> +       char buf[1];
> +       int ret;
> +
> +       assert(worker->to_send == NULL);
> +       assert(worker->to_complete == NULL);
> +
> +       ret = xread(worker->cp.out, buf, sizeof(buf));
> +       if (ret != 0)
> +               die("BUG: expect eof");
> +       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, worker->cp.out, &ev);
> +       close(worker->cp.out);
> +       worker->cp.out = -1;
> +       if (finish_command(&worker->cp))
> +               die("worker had a problem");
> +       return 0;
> +}
> +
> +static int really_finished(struct parallel_checkout *pc)
> +{
> +       int i;
> +
> +       for (i = 0; i < pc->nr_workers; i++)
> +               if (pc->workers[i].to_complete)
> +                       return 0;
> +       return 1;
> +}
> +
> +/* XXX progress support for unpack-trees */
> +int run_parallel_checkout(void)
> +{
> +       struct parallel_checkout *pc = parallel_checkout;
> +       int ret, i;
> +       int epoll_fd;
> +       struct epoll_event *events;
> +
> +       if (!pc || !pc->nr_items) {
> +               free(pc);
> +               parallel_checkout = NULL;
> +               return 0;
> +       }
> +
> +       qsort(pc->items, pc->nr_items, sizeof(*pc->items), item_cmp);
> +       pc->nr_workers = 8;
> +       epoll_fd = epoll_create(pc->nr_workers * 2);
> +       if (epoll_fd == -1)
> +               die_errno("epoll_create");
> +       ret = setup_workers(pc, epoll_fd);
> +       events = xmalloc(sizeof(*events) * pc->nr_workers * 2);
> +
> +       ret = 0;
> +       while (!ret) {
> +               int maybe_all_done = 0, nr;
> +
> +               nr = epoll_wait(epoll_fd, events, pc->nr_workers * 2, -1);
> +               if (nr == -1 && errno == EINTR)
> +                       continue;
> +               if (nr == -1) {
> +                       ret = nr;
> +                       break;
> +               }
> +               for (i = 0; i < nr; i++) {
> +                       int is_in = events[i].data.u32 & 1;
> +                       int worker_id = events[i].data.u32 / 2;
> +                       struct checkout_worker *worker = pc->workers + worker_id;
> +
> +                       if (!is_in && (events[i].events & EPOLLOUT))
> +                               ret = send_to_worker(worker, epoll_fd);
> +                       else if (events[i].events & EPOLLIN) {
> +                               if (worker->to_complete) {
> +                                       int refresh = pc->state.refresh_cache;
> +                                       ret = receive_from_worker(worker, refresh);
> +                                       pc->state.istate->cache_changed |= CE_ENTRY_CHANGED;
> +                               } else {
> +                                       ret = finish_worker(worker, epoll_fd);
> +                                       maybe_all_done = 1;
> +                               }
> +                       } else if (events[i].events & (EPOLLERR | EPOLLHUP)) {
> +                               if (is_in && !worker->to_complete) {
> +                                       ret = finish_worker(worker, epoll_fd);
> +                                       maybe_all_done = 1;
> +                               } else
> +                                       ret = -1;
> +                       } else
> +                               die("BUG: what??");
> +                       if (ret)
> +                               break;
> +               }
> +
> +               if (maybe_all_done && really_finished(pc))
> +                       break;
> +       }
> +
> +       close(epoll_fd);
> +       free(pc->workers);
> +       free(events);
> +       free(pc);
> +       parallel_checkout = NULL;
> +       return ret;
> +}
> diff --git a/unpack-trees.c b/unpack-trees.c
> index 9f55cc2..433c54e 100644
> --- a/unpack-trees.c
> +++ b/unpack-trees.c
> @@ -220,6 +220,7 @@ static int check_updates(struct unpack_trees_options *o)
>         remove_marked_cache_entries(&o->result);
>         remove_scheduled_dirs();
>
> +       /* start_parallel_checkout() */
>         for (i = 0; i < index->cache_nr; i++) {
>                 struct cache_entry *ce = index->cache[i];
>
> @@ -234,6 +235,7 @@ static int check_updates(struct unpack_trees_options *o)
>                         }
>                 }
>         }
> +       /* run_parallel_checkout() */
>         stop_progress(&progress);
>         if (o->update)
>                 git_attr_set_direction(GIT_ATTR_CHECKIN, NULL);
> -- 8< --
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]