Signed-off-by: Ben Peart <benpeart@xxxxxxxxxxxxx> --- Makefile | 1 + mpmcqueue.c | 49 ++++++++++++++++++++++++++++++++ mpmcqueue.h | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+) create mode 100644 mpmcqueue.c create mode 100644 mpmcqueue.h diff --git a/Makefile b/Makefile index 0cb6590f24..fdaabf0252 100644 --- a/Makefile +++ b/Makefile @@ -890,6 +890,7 @@ LIB_OBJS += merge.o LIB_OBJS += merge-blobs.o LIB_OBJS += merge-recursive.o LIB_OBJS += mergesort.o +LIB_OBJS += mpmcqueue.o LIB_OBJS += name-hash.o LIB_OBJS += notes.o LIB_OBJS += notes-cache.o diff --git a/mpmcqueue.c b/mpmcqueue.c new file mode 100644 index 0000000000..22411af1b0 --- /dev/null +++ b/mpmcqueue.c @@ -0,0 +1,49 @@ +#include "mpmcqueue.h" + +void mpmcq_init(struct mpmcq *queue) +{ + queue->head = NULL; + queue->cancel = 0; + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->condition, NULL); +} + +void mpmcq_destroy(struct mpmcq *queue) +{ + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->condition); +} + +void mpmcq_push(struct mpmcq *queue, struct mpmcq_entry *entry) +{ + pthread_mutex_lock(&queue->mutex); + entry->next = queue->head; + queue->head = entry; + pthread_cond_signal(&queue->condition); + pthread_mutex_unlock(&queue->mutex); +} + +struct mpmcq_entry *mpmcq_pop(struct mpmcq *queue) +{ + struct mpmcq_entry *entry = NULL; + + pthread_mutex_lock(&queue->mutex); + while (!queue->head && !queue->cancel) + pthread_cond_wait(&queue->condition, &queue->mutex); + if (!queue->cancel) { + entry = queue->head; + queue->head = entry->next; + } + pthread_mutex_unlock(&queue->mutex); + return entry; +} + +void mpmcq_cancel(struct mpmcq *queue) +{ + struct mpmcq_entry *entry; + + pthread_mutex_lock(&queue->mutex); + queue->cancel = 1; + pthread_cond_broadcast(&queue->condition); + pthread_mutex_unlock(&queue->mutex); +} diff --git a/mpmcqueue.h b/mpmcqueue.h new file mode 100644 index 0000000000..7421e06aad --- /dev/null +++ b/mpmcqueue.h @@ -0,0 +1,80 @@ +#ifndef MPMCQUEUE_H +#define MPMCQUEUE_H + +#include "git-compat-util.h" +#include <pthread.h> + +/* + * Generic implementation of an unbounded Multi-Producer-Multi-Consumer + * queue. + */ + +/* + * struct mpmcq_entry is an opaque structure representing an entry in the + * queue. + */ +struct mpmcq_entry { + struct mpmcq_entry *next; +}; + +/* + * struct mpmcq is the concurrent queue structure. Members should not be + * modified directly. + */ +struct mpmcq { + struct mpmcq_entry *head; + pthread_mutex_t mutex; + pthread_cond_t condition; + int cancel; +}; + +/* + * Initializes a mpmcq_entry structure. + * + * `entry` points to the entry to initialize. + * + * The mpmcq_entry structure does not hold references to external resources, + * and it is safe to just discard it once you are done with it (i.e. if + * your structure was allocated with xmalloc(), you can just free() it, + * and if it is on stack, you can just let it go out of scope). + */ +static inline void mpmcq_entry_init(struct mpmcq_entry *entry) +{ + entry->next = NULL; +} + +/* + * Initializes a mpmcq structure. + */ +extern void mpmcq_init(struct mpmcq *queue); + +/* + * Destroys a mpmcq structure. + */ +extern void mpmcq_destroy(struct mpmcq *queue); + +/* + * Pushes an entry on to the queue. + * + * `queue` is the mpmcq structure. + * `entry` is the entry to push. + */ +extern void mpmcq_push(struct mpmcq *queue, struct mpmcq_entry *entry); + +/* + * Pops an entry off the queue. + * + * `queue` is the mpmcq structure. + * + * Returns mpmcq_entry on success, NULL on cancel; + */ +extern struct mpmcq_entry *mpmcq_pop(struct mpmcq *queue); + +/* + * Cancels any pending pop requests. + * + * `queue` is the mpmcq structure. + */ +extern void mpmcq_cancel(struct mpmcq *queue); + +#endif -- 2.17.0.gvfs.1.123.g449c066