[PATCH v1 1/3] add unbounded Multi-Producer-Multi-Consumer queue

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Ben Peart <benpeart@xxxxxxxxxxxxx>
---
 Makefile    |  1 +
 mpmcqueue.c | 49 ++++++++++++++++++++++++++++++++
 mpmcqueue.h | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 130 insertions(+)
 create mode 100644 mpmcqueue.c
 create mode 100644 mpmcqueue.h

diff --git a/Makefile b/Makefile
index 0cb6590f24..fdaabf0252 100644
--- a/Makefile
+++ b/Makefile
@@ -890,6 +890,7 @@ LIB_OBJS += merge.o
 LIB_OBJS += merge-blobs.o
 LIB_OBJS += merge-recursive.o
 LIB_OBJS += mergesort.o
+LIB_OBJS += mpmcqueue.o
 LIB_OBJS += name-hash.o
 LIB_OBJS += notes.o
 LIB_OBJS += notes-cache.o
diff --git a/mpmcqueue.c b/mpmcqueue.c
new file mode 100644
index 0000000000..22411af1b0
--- /dev/null
+++ b/mpmcqueue.c
@@ -0,0 +1,49 @@
+#include "mpmcqueue.h"
+
+void mpmcq_init(struct mpmcq *queue)
+{
+	queue->head = NULL;
+	queue->cancel = 0;
+	pthread_mutex_init(&queue->mutex, NULL);
+	pthread_cond_init(&queue->condition, NULL);
+}
+
+void mpmcq_destroy(struct mpmcq *queue)
+{
+	pthread_mutex_destroy(&queue->mutex);
+	pthread_cond_destroy(&queue->condition);
+}
+
+void mpmcq_push(struct mpmcq *queue, struct mpmcq_entry *entry)
+{
+	pthread_mutex_lock(&queue->mutex);
+	entry->next = queue->head;
+	queue->head = entry;
+	pthread_cond_signal(&queue->condition);
+	pthread_mutex_unlock(&queue->mutex);
+}
+
+struct mpmcq_entry *mpmcq_pop(struct mpmcq *queue)
+{
+	struct mpmcq_entry *entry = NULL;
+
+	pthread_mutex_lock(&queue->mutex);
+	while (!queue->head && !queue->cancel)
+		pthread_cond_wait(&queue->condition, &queue->mutex);
+	if (!queue->cancel) {
+		entry = queue->head;
+		queue->head = entry->next;
+	}
+	pthread_mutex_unlock(&queue->mutex);
+	return entry;
+}
+
+void mpmcq_cancel(struct mpmcq *queue)
+{
+	struct mpmcq_entry *entry;
+
+	pthread_mutex_lock(&queue->mutex);
+	queue->cancel = 1;
+	pthread_cond_broadcast(&queue->condition);
+	pthread_mutex_unlock(&queue->mutex);
+}
diff --git a/mpmcqueue.h b/mpmcqueue.h
new file mode 100644
index 0000000000..7421e06aad
--- /dev/null
+++ b/mpmcqueue.h
@@ -0,0 +1,80 @@
+#ifndef MPMCQUEUE_H
+#define MPMCQUEUE_H
+
+#include "git-compat-util.h"
+#include <pthread.h>
+
+/*
+ * Generic implementation of an unbounded Multi-Producer-Multi-Consumer
+ * queue.
+ */
+
+/*
+ * struct mpmcq_entry is an opaque structure representing an entry in the
+ * queue.
+ */
+struct mpmcq_entry {
+	struct mpmcq_entry *next;
+};
+
+/*
+ * struct mpmcq is the concurrent queue structure. Members should not be
+ * modified directly.
+ */
+struct mpmcq {
+	struct mpmcq_entry *head;
+	pthread_mutex_t mutex;
+	pthread_cond_t condition;
+	int cancel;
+};
+
+/*
+ * Initializes a mpmcq_entry structure.
+ *
+ * `entry` points to the entry to initialize.
+ *
+ * The mpmcq_entry structure does not hold references to external resources,
+ * and it is safe to just discard it once you are done with it (i.e. if
+ * your structure was allocated with xmalloc(), you can just free() it,
+ * and if it is on stack, you can just let it go out of scope).
+ */
+static inline void mpmcq_entry_init(struct mpmcq_entry *entry)
+{
+	entry->next = NULL;
+}
+
+/*
+ * Initializes a mpmcq structure.
+ */
+extern void mpmcq_init(struct mpmcq *queue);
+
+/*
+ * Destroys a mpmcq structure.
+ */
+extern void mpmcq_destroy(struct mpmcq *queue);
+
+/*
+ * Pushes an entry on to the queue.
+ *
+ * `queue` is the mpmcq structure.
+ * `entry` is the entry to push.
+ */
+extern void mpmcq_push(struct mpmcq *queue, struct mpmcq_entry *entry);
+
+/*
+ * Pops an entry off the queue.
+ *
+ * `queue` is the mpmcq structure.
+ *
+ * Returns mpmcq_entry on success, NULL on cancel;
+ */
+extern struct mpmcq_entry *mpmcq_pop(struct mpmcq *queue);
+
+/*
+ * Cancels any pending pop requests.
+ *
+ * `queue` is the mpmcq structure.
+ */
+extern void mpmcq_cancel(struct mpmcq *queue);
+
+#endif
-- 
2.17.0.gvfs.1.123.g449c066





[Index of Archives]     [Linux Kernel Development]     [Gcc Help]     [IETF Annouce]     [DCCP]     [Netdev]     [Networking]     [Security]     [V4L]     [Bugtraq]     [Yosemite]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Linux SCSI]     [Fedora Users]

  Powered by Linux