Re: [RFC PATCH 2/3] run-commands: add an async queue processor

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, Aug 21, 2015 at 12:44 PM, Jeff King <peff@xxxxxxxx> wrote:
> On Fri, Aug 21, 2015 at 12:05:13PM -0700, Junio C Hamano wrote:
>
>> The primary reason I suspect is because you sent to a wrong set of
>> people.  Submodule folks have largely been working in the scripted
>> ones, and may not necessarily be the ones who are most familiar with
>> the run-command infrastructure.
>>
>> "shortlog --no-merges" tells me that the obvious suspects are j6t
>> and peff.
>
> No good deed goes unpunished. ;)
>
> Before even looking at the implementation, my first question would be
> whether this pattern is applicable in several places in git (i.e., is it
> worth the extra complexity of abstracting out in the first place). I
> think there are a few task-queue patterns already in git; for example
> the delta search in pack-objects. Is the interface given here sufficient
> to convert pack-objects? Is the result nicer to read? Is it as
> efficient?

I have converted index-pack threading now, and it looks quite easy:

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..159ee36 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"

 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
[--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
[<pack-file>])";
@@ -1075,7 +1076,7 @@ static void resolve_base(struct object_entry *obj)
 }

 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
        set_thread_data(data);
        for (;;) {
@@ -1096,7 +1097,7 @@ static void *threaded_second_pass(void *data)

                resolve_base(&objects[i]);
        }
-       return NULL;
+       return 0;
 }
 #endif

@@ -1195,18 +1196,18 @@ static void resolve_deltas(void)
                                          nr_ref_deltas + nr_ofs_deltas);

 #ifndef NO_PTHREADS
-       nr_dispatched = 0;
        if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+               nr_dispatched = 0;
                init_thread();
-               for (i = 0; i < nr_threads; i++) {
-                       int ret = pthread_create(&thread_data[i].thread, NULL,
-                                                threaded_second_pass,
thread_data + i);
-                       if (ret)
-                               die(_("unable to create thread: %s"),
-                                   strerror(ret));
-               }
+
+               tq = create_task_queue(nr_threads);
+
                for (i = 0; i < nr_threads; i++)
-                       pthread_join(thread_data[i].thread, NULL);
+                       add_task(tq, threaded_second_pass, thread_data + i);
+
+               if (finish_task_queue(tq))
+                       die("Not all threads have finished");
+
                cleanup_thread();
                return;
        }
---
(tests pass)
This was cheating as I picked to convert index-pack as opposed to upload-pack
(index-pack is very similar to a queued workload. This was just moving
the thread
creation into the new proposed queue processor.)

I realize now this can be adapted a bit more, to show off the queue features
but would require a larger rewrite. So instead of just creating the threads and
then locking, we get rid of the worker lock like this:

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..797efea 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"

 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>]
[--verify] [--strict] (<pack-file> | --stdin [--fix-thin]
[<pack-file>])";
@@ -106,10 +107,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock() lock_mutex(&counter_mutex)
 #define counter_unlock() unlock_mutex(&counter_mutex)

-static pthread_mutex_t work_mutex;
-#define work_lock() lock_mutex(&work_mutex)
-#define work_unlock() unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock() lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock() unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +137,6 @@ static void init_thread(void)
  int i;
  init_recursive_mutex(&read_mutex);
  pthread_mutex_init(&counter_mutex, NULL);
- pthread_mutex_init(&work_mutex, NULL);
  pthread_mutex_init(&type_cas_mutex, NULL);
  if (show_stat)
  pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +159,6 @@ static void cleanup_thread(void)
  threads_active = 0;
  pthread_mutex_destroy(&read_mutex);
  pthread_mutex_destroy(&counter_mutex);
- pthread_mutex_destroy(&work_mutex);
  pthread_mutex_destroy(&type_cas_mutex);
  if (show_stat)
  pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +176,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()

-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()

@@ -1075,28 +1067,24 @@ static void resolve_base(struct object_entry *obj)
 }

 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
- set_thread_data(data);
- for (;;) {
- int i;
- counter_lock();
- display_progress(progress, nr_resolved_deltas);
- counter_unlock();
- work_lock();
- while (nr_dispatched < nr_objects &&
-       is_delta_type(objects[nr_dispatched].type))
- nr_dispatched++;
- if (nr_dispatched >= nr_objects) {
- work_unlock();
- break;
- }
- i = nr_dispatched++;
- work_unlock();
+ if (!get_thread_data()) {
+ struct thread_local *t = xmalloc(sizeof(*t));
+ t->pack_fd = open(curr_pack, O_RDONLY);
+ if (t->pack_fd == -1)
+ die_errno(_("unable to open %s"), curr_pack);

- resolve_base(&objects[i]);
+ set_thread_data(t);
+ /* TODO: I haven't figured out where to free this memory */
  }
- return NULL;
+
+ resolve_base(data);
+
+ counter_lock();
+ display_progress(progress, nr_resolved_deltas);
+ counter_unlock();
+ return 0;
 }
 #endif

@@ -1195,18 +1183,21 @@ static void resolve_deltas(void)
   nr_ref_deltas + nr_ofs_deltas);

 #ifndef NO_PTHREADS
- nr_dispatched = 0;
  if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+ struct task_queue *tq;
+
  init_thread();
- for (i = 0; i < nr_threads; i++) {
- int ret = pthread_create(&thread_data[i].thread, NULL,
- threaded_second_pass, thread_data + i);
- if (ret)
- die(_("unable to create thread: %s"),
-    strerror(ret));
- }
- for (i = 0; i < nr_threads; i++)
- pthread_join(thread_data[i].thread, NULL);
+ tq = create_task_queue(nr_threads);
+
+ for (nr_dispatched = 0; nr_dispatched < nr_objects; nr_dispatched++)
+ if (!is_delta_type(objects[nr_dispatched].type))
+ add_task(tq, threaded_second_pass, &objects[nr_dispatched]);
+
+ if (finish_task_queue(tq))
+ die("Not all threads have finished");
+
+ /* Here might be a good place to free the thread local storage caches */
+
  cleanup_thread();
  return;
  }
----

This looks very pleasant to me as we have no lock contention all the time,
with the worker lock, but the main thread can load the work queue easily
while we directly start processing in the worker threads.
This also reduces the lines of code which usually is a good sign.

However there is a memory leak in this second solution.

How do you think these two approaches look like?

(I cannot really say objectively if it's easier to read, as I wrote the code,
so of course it is easy for me)

Why did I not pick upload-pack?
========================

I could not spot easily how to make it a typical queuing problem.
We start in the threads, and once in a while we're like: "Uhg, this
thread has more load than the other, let's shove a bit over there"

So what we would need there is splitting the work in smallest chunks
from the beginning and just load it into the queue via add_task



>
> We do not need to convert all possible call-sites to the new abstracted
> code at once. But I find that converting at least _one_ is a good litmus
> test to confirm that a new interface is generally useful.
>
> -Peff
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]