[PATCH 4/5] index-pack: Use the new worker pool

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



By treating each object as its own task the workflow is easier to follow
as the function used in the worker threads doesn't need any control logic
any more.

Signed-off-by: Stefan Beller <sbeller@xxxxxxxxxx>
---
 builtin/index-pack.c | 71 +++++++++++++++++++++++-----------------------------
 1 file changed, 32 insertions(+), 39 deletions(-)

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..826bd22 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -95,7 +96,6 @@ static const char *curr_pack;
 #ifndef NO_PTHREADS
 
 static struct thread_local *thread_data;
-static int nr_dispatched;
 static int threads_active;
 
 static pthread_mutex_t read_mutex;
@@ -106,10 +106,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock()		lock_mutex(&counter_mutex)
 #define counter_unlock()	unlock_mutex(&counter_mutex)
 
-static pthread_mutex_t work_mutex;
-#define work_lock()		lock_mutex(&work_mutex)
-#define work_unlock()		unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock()	lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock()	unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +136,6 @@ static void init_thread(void)
 	int i;
 	init_recursive_mutex(&read_mutex);
 	pthread_mutex_init(&counter_mutex, NULL);
-	pthread_mutex_init(&work_mutex, NULL);
 	pthread_mutex_init(&type_cas_mutex, NULL);
 	if (show_stat)
 		pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +158,6 @@ static void cleanup_thread(void)
 	threads_active = 0;
 	pthread_mutex_destroy(&read_mutex);
 	pthread_mutex_destroy(&counter_mutex);
-	pthread_mutex_destroy(&work_mutex);
 	pthread_mutex_destroy(&type_cas_mutex);
 	if (show_stat)
 		pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +175,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()
 
-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()
 
@@ -1075,28 +1066,29 @@ static void resolve_base(struct object_entry *obj)
 }
 
 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
-	set_thread_data(data);
-	for (;;) {
-		int i;
-		counter_lock();
-		display_progress(progress, nr_resolved_deltas);
-		counter_unlock();
-		work_lock();
-		while (nr_dispatched < nr_objects &&
-		       is_delta_type(objects[nr_dispatched].type))
-			nr_dispatched++;
-		if (nr_dispatched >= nr_objects) {
-			work_unlock();
-			break;
-		}
-		i = nr_dispatched++;
-		work_unlock();
+	if (!get_thread_data()) {
+		struct thread_local *t = xmalloc(sizeof(*t));
+		t->pack_fd = open(curr_pack, O_RDONLY);
+		if (t->pack_fd == -1)
+			die_errno(_("unable to open %s"), curr_pack);
 
-		resolve_base(&objects[i]);
+		set_thread_data(t);
 	}
-	return NULL;
+
+	resolve_base(data);
+
+	counter_lock();
+	display_progress(progress, nr_resolved_deltas);
+	counter_unlock();
+	return 0;
+}
+
+void cleanup_threaded_second_pass(struct task_queue *aq)
+{
+	struct thread_local *t = get_thread_data();
+	free(t);
 }
 #endif
 
@@ -1195,18 +1187,19 @@ static void resolve_deltas(void)
 					  nr_ref_deltas + nr_ofs_deltas);
 
 #ifndef NO_PTHREADS
-	nr_dispatched = 0;
 	if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+		struct task_queue *tq;
+
 		init_thread();
-		for (i = 0; i < nr_threads; i++) {
-			int ret = pthread_create(&thread_data[i].thread, NULL,
-						 threaded_second_pass, thread_data + i);
-			if (ret)
-				die(_("unable to create thread: %s"),
-				    strerror(ret));
-		}
-		for (i = 0; i < nr_threads; i++)
-			pthread_join(thread_data[i].thread, NULL);
+		tq = create_task_queue(nr_threads);
+
+		for (i = 0; i < nr_objects; i++)
+			if (!is_delta_type(objects[i].type))
+				add_task(tq, threaded_second_pass, &objects[i]);
+
+		if (finish_task_queue(tq, cleanup_threaded_second_pass))
+			die("Not all threads have finished");
+
 		cleanup_thread();
 		return;
 	}
-- 
2.5.0.400.gff86faf

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]