From: Darrick J. Wong <darrick.wong@xxxxxxxxxx> Create a thread pool that queues and runs discrete work items. This will be a namespaced version of the pool in repair/threads.c; a subsequent patch will switch repair over. xfs_scrub will use the generic thread pool. Signed-off-by: Darrick J. Wong <darrick.wong@xxxxxxxxxx> --- include/workqueue.h | 55 ++++++++++++++++ libfrog/Makefile | 3 + libfrog/workqueue.c | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 include/workqueue.h create mode 100644 libfrog/workqueue.c diff --git a/include/workqueue.h b/include/workqueue.h new file mode 100644 index 0000000..b4b3541 --- /dev/null +++ b/include/workqueue.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2017 Oracle. All Rights Reserved. + * + * Author: Darrick J. Wong <darrick.wong@xxxxxxxxxx> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it would be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This code was adapted from repair/threads.h. + */ +#ifndef _WORKQUEUE_H_ +#define _WORKQUEUE_H_ + +struct workqueue; + +typedef void workqueue_func_t(struct workqueue *wq, uint32_t index, void *arg); + +struct workqueue_item { + struct workqueue *queue; + struct workqueue_item *next; + workqueue_func_t *function; + void *arg; + uint32_t index; +}; + +struct workqueue { + void *wq_ctx; + pthread_t *threads; + struct workqueue_item *next_item; + struct workqueue_item *last_item; + pthread_mutex_t lock; + pthread_cond_t wakeup; + unsigned int item_count; + unsigned int thread_count; + bool terminate; +}; + +int workqueue_create(struct workqueue *wq, void *wq_ctx, + unsigned int nr_workers); +int workqueue_add(struct workqueue *wq, workqueue_func_t fn, + uint32_t index, void *arg); +void workqueue_destroy(struct workqueue *wq); + +#endif /* _WORKQUEUE_H_ */ diff --git a/libfrog/Makefile b/libfrog/Makefile index 3fd42a4..9a43621 100644 --- a/libfrog/Makefile +++ b/libfrog/Makefile @@ -14,7 +14,8 @@ CFILES = \ avl64.c \ list_sort.c \ radix-tree.c \ -util.c +util.c \ +workqueue.c default: ltdepend $(LTLIBRARY) diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c new file mode 100644 index 0000000..d9217de --- /dev/null +++ b/libfrog/workqueue.c @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2017 Oracle. All Rights Reserved. + * + * Author: Darrick J. Wong <darrick.wong@xxxxxxxxxx> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it would be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. + * + * This code was adapted from repair/threads.c. + */ +#include <pthread.h> +#include <signal.h> +#include <stdlib.h> +#include <string.h> +#include <stdint.h> +#include <stdbool.h> +#include <errno.h> +#include <assert.h> +#include "workqueue.h" + +/* Main processing thread */ +static void * +workqueue_thread(void *arg) +{ + struct workqueue *wq; + struct workqueue_item *wi; + + wq = (struct workqueue*)arg; + + /* + * Loop pulling work from the passed in work queue. + * Check for notification to exit after every chunk of work. + */ + while (1) { + pthread_mutex_lock(&wq->lock); + + /* + * Wait for work. + */ + while (wq->next_item == NULL && !wq->terminate) { + assert(wq->item_count == 0); + pthread_cond_wait(&wq->wakeup, &wq->lock); + } + if (wq->next_item == NULL && wq->terminate) { + pthread_mutex_unlock(&wq->lock); + break; + } + + /* + * Dequeue work from the head of the list. + */ + assert(wq->item_count > 0); + wi = wq->next_item; + wq->next_item = wi->next; + wq->item_count--; + + pthread_mutex_unlock(&wq->lock); + + (wi->function)(wi->queue, wi->index, wi->arg); + free(wi); + } + + return NULL; +} + +/* Allocate a work queue and threads. */ +int +workqueue_create( + struct workqueue *wq, + void *wq_ctx, + unsigned int nr_workers) +{ + unsigned int i; + int err = 0; + + memset(wq, 0, sizeof(*wq)); + pthread_cond_init(&wq->wakeup, NULL); + pthread_mutex_init(&wq->lock, NULL); + + wq->wq_ctx = wq_ctx; + wq->thread_count = nr_workers; + wq->threads = malloc(nr_workers * sizeof(pthread_t)); + wq->terminate = false; + + for (i = 0; i < nr_workers; i++) { + err = pthread_create(&wq->threads[i], NULL, workqueue_thread, + wq); + if (err) + break; + } + + if (err) + workqueue_destroy(wq); + return err; +} + +/* + * Create a work item consisting of a function and some arguments and + * schedule the work item to be run via the thread pool. + */ +int +workqueue_add( + struct workqueue *wq, + workqueue_func_t func, + uint32_t index, + void *arg) +{ + struct workqueue_item *wi; + + if (wq->thread_count == 0) { + func(wq, index, arg); + return 0; + } + + wi = malloc(sizeof(struct workqueue_item)); + if (wi == NULL) + return ENOMEM; + + wi->function = func; + wi->index = index; + wi->arg = arg; + wi->queue = wq; + wi->next = NULL; + + /* + * Now queue the new work structure to the work queue. + */ + pthread_mutex_lock(&wq->lock); + if (wq->next_item == NULL) { + wq->next_item = wi; + assert(wq->item_count == 0); + pthread_cond_signal(&wq->wakeup); + } else { + wq->last_item->next = wi; + } + wq->last_item = wi; + wq->item_count++; + pthread_mutex_unlock(&wq->lock); + + return 0; +} + +/* + * Wait for all pending work items to be processed and tear down the + * workqueue. + */ +void +workqueue_destroy( + struct workqueue *wq) +{ + unsigned int i; + + pthread_mutex_lock(&wq->lock); + wq->terminate = 1; + pthread_mutex_unlock(&wq->lock); + + pthread_cond_broadcast(&wq->wakeup); + + for (i = 0; i < wq->thread_count; i++) + pthread_join(wq->threads[i], NULL); + + free(wq->threads); + pthread_mutex_destroy(&wq->lock); + pthread_cond_destroy(&wq->wakeup); + memset(wq, 0, sizeof(*wq)); +} -- To unsubscribe from this list: send the line "unsubscribe linux-xfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html