[PATCH 03/13] libfrog: split workqueue destroy functions

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Darrick J. Wong <darrick.wong@xxxxxxxxxx>

Split the workqueue destroy function into two parts -- one to signal all
the threads to exit and wait for them, and a second one that actually
destroys all the memory associated with the workqueue.  This mean we can
report latent workqueue errors independent of the freeing function.

Signed-off-by: Darrick J. Wong <darrick.wong@xxxxxxxxxx>
---
 include/workqueue.h |    2 ++
 libfrog/workqueue.c |   45 +++++++++++++++++++++++++++++++++++++--------
 repair/threads.c    |    6 ++++++
 scrub/fscounters.c  |   11 ++++++++++-
 scrub/inodes.c      |    5 +++++
 scrub/phase2.c      |    5 +++++
 scrub/phase4.c      |    6 ++++++
 scrub/read_verify.c |    1 +
 scrub/spacemap.c    |    5 +++++
 scrub/vfs.c         |    5 +++++
 10 files changed, 82 insertions(+), 9 deletions(-)


diff --git a/include/workqueue.h b/include/workqueue.h
index c45dc4fb..024ce144 100644
--- a/include/workqueue.h
+++ b/include/workqueue.h
@@ -30,12 +30,14 @@ struct workqueue {
 	unsigned int		item_count;
 	unsigned int		thread_count;
 	bool			terminate;
+	bool			terminated;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
 		unsigned int nr_workers);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
 		uint32_t index, void *arg);
+int workqueue_terminate(struct workqueue *wq);
 void workqueue_destroy(struct workqueue *wq);
 
 #endif	/* _WORKQUEUE_H_ */
diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
index 24b22bf6..503fe227 100644
--- a/libfrog/workqueue.c
+++ b/libfrog/workqueue.c
@@ -82,6 +82,7 @@ workqueue_create(
 		goto out_mutex;
 	}
 	wq->terminate = false;
+	wq->terminated = false;
 
 	for (i = 0; i < nr_workers; i++) {
 		err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -119,6 +120,8 @@ workqueue_add(
 	struct workqueue_item	*wi;
 	int			ret;
 
+	assert(!wq->terminated);
+
 	if (wq->thread_count == 0) {
 		func(wq, index, arg);
 		return 0;
@@ -160,22 +163,48 @@ workqueue_add(
 
 /*
  * Wait for all pending work items to be processed and tear down the
- * workqueue.
+ * workqueue thread pool.
  */
-void
-workqueue_destroy(
+int
+workqueue_terminate(
 	struct workqueue	*wq)
 {
 	unsigned int		i;
+	int			ret;
+
+	ret = pthread_mutex_lock(&wq->lock);
+	if (ret)
+		return ret;
+
+	wq->terminate = true;
+	pthread_mutex_unlock(&wq->lock);
+
+	ret = pthread_cond_broadcast(&wq->wakeup);
+	if (ret)
+		return ret;
 
-	pthread_mutex_lock(&wq->lock);
-	wq->terminate = 1;
+	for (i = 0; i < wq->thread_count; i++) {
+		ret = pthread_join(wq->threads[i], NULL);
+		if (ret)
+			return ret;
+	}
+
+	ret = pthread_mutex_lock(&wq->lock);
+	if (ret)
+		return ret;
+
+	wq->terminated = true;
 	pthread_mutex_unlock(&wq->lock);
 
-	pthread_cond_broadcast(&wq->wakeup);
+	return 0;
+}
 
-	for (i = 0; i < wq->thread_count; i++)
-		pthread_join(wq->threads[i], NULL);
+/* Tear down the workqueue. */
+void
+workqueue_destroy(
+	struct workqueue	*wq)
+{
+	assert(wq->terminated);
 
 	free(wq->threads);
 	pthread_mutex_destroy(&wq->lock);
diff --git a/repair/threads.c b/repair/threads.c
index d2190920..9b7241e3 100644
--- a/repair/threads.c
+++ b/repair/threads.c
@@ -56,5 +56,11 @@ void
 destroy_work_queue(
 	struct workqueue	*wq)
 {
+	int			err;
+
+	err = workqueue_terminate(wq);
+	if (err)
+		do_error(_("cannot terminate worker item, error = [%d] %s\n"),
+				err, strerror(err));
 	workqueue_destroy(wq);
 }
diff --git a/scrub/fscounters.c b/scrub/fscounters.c
index ee07da9e..712a2b37 100644
--- a/scrub/fscounters.c
+++ b/scrub/fscounters.c
@@ -102,7 +102,7 @@ xfs_count_all_inodes(
 	struct xfs_count_inodes	*ci;
 	xfs_agnumber_t		agno;
 	struct workqueue	wq;
-	bool			moveon;
+	bool			moveon = true;
 	int			ret;
 
 	ci = calloc(1, sizeof(struct xfs_count_inodes) +
@@ -126,8 +126,17 @@ xfs_count_all_inodes(
 			break;
 		}
 	}
+
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		moveon = false;
+		str_liberror(ctx, ret, _("finishing icount work"));
+	}
 	workqueue_destroy(&wq);
 
+	if (!moveon)
+		goto out_free;
+
 	for (agno = 0; agno < ctx->mnt.fsgeom.agcount; agno++)
 		*count += ci->counters[agno];
 	moveon = ci->moveon;
diff --git a/scrub/inodes.c b/scrub/inodes.c
index dcce2df0..ef12a692 100644
--- a/scrub/inodes.c
+++ b/scrub/inodes.c
@@ -253,6 +253,11 @@ xfs_scan_all_inodes(
 		}
 	}
 
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		si.moveon = false;
+		str_liberror(ctx, ret, _("finishing bulkstat work"));
+	}
 	workqueue_destroy(&wq);
 
 	return si.moveon;
diff --git a/scrub/phase2.c b/scrub/phase2.c
index 8c8aad97..6b4755a1 100644
--- a/scrub/phase2.c
+++ b/scrub/phase2.c
@@ -161,6 +161,11 @@ xfs_scan_metadata(
 	}
 
 out:
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		moveon = false;
+		str_liberror(ctx, ret, _("finishing scrub work"));
+	}
 	workqueue_destroy(&wq);
 	return moveon;
 }
diff --git a/scrub/phase4.c b/scrub/phase4.c
index 10199ca1..79c8a6b8 100644
--- a/scrub/phase4.c
+++ b/scrub/phase4.c
@@ -90,6 +90,12 @@ xfs_process_action_items(
 		if (!moveon)
 			break;
 	}
+
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		moveon = false;
+		str_liberror(ctx, ret, _("finishing repair work"));
+	}
 	workqueue_destroy(&wq);
 
 	pthread_mutex_lock(&ctx->lock);
diff --git a/scrub/read_verify.c b/scrub/read_verify.c
index c7e34cf5..7d95ab00 100644
--- a/scrub/read_verify.c
+++ b/scrub/read_verify.c
@@ -120,6 +120,7 @@ void
 read_verify_pool_flush(
 	struct read_verify_pool		*rvp)
 {
+	workqueue_terminate(&rvp->wq);
 	workqueue_destroy(&rvp->wq);
 }
 
diff --git a/scrub/spacemap.c b/scrub/spacemap.c
index 03d05eed..7fe03163 100644
--- a/scrub/spacemap.c
+++ b/scrub/spacemap.c
@@ -230,6 +230,11 @@ xfs_scan_all_spacemaps(
 		}
 	}
 out:
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		sbx.moveon = false;
+		str_liberror(ctx, ret, _("finishing fsmap work"));
+	}
 	workqueue_destroy(&wq);
 
 	return sbx.moveon;
diff --git a/scrub/vfs.c b/scrub/vfs.c
index e5ed5d83..8aa58d6e 100644
--- a/scrub/vfs.c
+++ b/scrub/vfs.c
@@ -239,6 +239,11 @@ scan_fs_tree(
 	assert(sft.nr_dirs == 0);
 	pthread_mutex_unlock(&sft.lock);
 
+	ret = workqueue_terminate(&wq);
+	if (ret) {
+		sft.moveon = false;
+		str_liberror(ctx, ret, _("finishing directory scan work"));
+	}
 out_wq:
 	workqueue_destroy(&wq);
 	return sft.moveon;




[Index of Archives]     [XFS Filesystem Development (older mail)]     [Linux Filesystem Development]     [Linux Audio Users]     [Yosemite Trails]     [Linux Kernel]     [Linux RAID]     [Linux SCSI]


  Powered by Linux