[PATCH v5 07/40] dm vdo: add specialized request queueing functionality

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This patch adds funnel_queue, a mostly lock-free multi-producer,
single-consumer queue. It also adds the request queue used by the dm-vdo
deduplication index, and the work_queue used by the dm-vdo data store. Both
of these are built on top of funnel queue and are intended to support the
dispatching of many short-running tasks. The work_queue also supports
priorities. Finally, this patch adds vdo_completion, the structure which is
enqueued on work_queues.

Co-developed-by: J. corwin Coburn <corwin@xxxxxxxxxxxxxx>
Signed-off-by: J. corwin Coburn <corwin@xxxxxxxxxxxxxx>
Co-developed-by: Michael Sclafani <vdo-devel@xxxxxxxxxx>
Signed-off-by: Michael Sclafani <vdo-devel@xxxxxxxxxx>
Co-developed-by: Sweet Tea Dorminy <sweettea-kernel@xxxxxxxxxx>
Signed-off-by: Sweet Tea Dorminy <sweettea-kernel@xxxxxxxxxx>
Co-developed-by: Ken Raeburn <raeburn@xxxxxxxxxx>
Signed-off-by: Ken Raeburn <raeburn@xxxxxxxxxx>
Signed-off-by: Matthew Sakai <msakai@xxxxxxxxxx>
---
 drivers/md/dm-vdo/completion.c          | 140 ++++++
 drivers/md/dm-vdo/completion.h          | 152 ++++++
 drivers/md/dm-vdo/cpu.h                 |  59 +++
 drivers/md/dm-vdo/funnel-queue.c        | 171 +++++++
 drivers/md/dm-vdo/funnel-queue.h        | 110 ++++
 drivers/md/dm-vdo/funnel-requestqueue.c | 283 +++++++++++
 drivers/md/dm-vdo/funnel-requestqueue.h |  31 ++
 drivers/md/dm-vdo/funnel-workqueue.c    | 638 ++++++++++++++++++++++++
 drivers/md/dm-vdo/funnel-workqueue.h    |  51 ++
 9 files changed, 1635 insertions(+)
 create mode 100644 drivers/md/dm-vdo/completion.c
 create mode 100644 drivers/md/dm-vdo/completion.h
 create mode 100644 drivers/md/dm-vdo/cpu.h
 create mode 100644 drivers/md/dm-vdo/funnel-queue.c
 create mode 100644 drivers/md/dm-vdo/funnel-queue.h
 create mode 100644 drivers/md/dm-vdo/funnel-requestqueue.c
 create mode 100644 drivers/md/dm-vdo/funnel-requestqueue.h
 create mode 100644 drivers/md/dm-vdo/funnel-workqueue.c
 create mode 100644 drivers/md/dm-vdo/funnel-workqueue.h

diff --git a/drivers/md/dm-vdo/completion.c b/drivers/md/dm-vdo/completion.c
new file mode 100644
index 000000000000..9e2381dc3683
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.c
@@ -0,0 +1,140 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "completion.h"
+
+#include <linux/kernel.h>
+
+#include "logger.h"
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+#include "vio.h"
+#include "vdo.h"
+
+/**
+ * DOC: vdo completions.
+ *
+ * Most of vdo's data structures are lock free, each either belonging to a single "zone," or
+ * divided into a number of zones whose accesses to the structure do not overlap. During normal
+ * operation, at most one thread will be operating in any given zone. Each zone has a
+ * vdo_work_queue which holds vdo_completions that are to be run in that zone. A completion may
+ * only be enqueued on one queue or operating in a single zone at a time.
+ *
+ * At each step of a multi-threaded operation, the completion performing the operation is given a
+ * callback, error handler, and thread id for the next step. A completion is "run" when it is
+ * operating on the correct thread (as specified by its callback_thread_id). If the value of its
+ * "result" field is an error (i.e. not VDO_SUCCESS), the function in its "error_handler" will be
+ * invoked. If the error_handler is NULL, or there is no error, the function set as its "callback"
+ * will be invoked. Generally, a completion will not be run directly, but rather will be
+ * "launched." In this case, it will check whether it is operating on the correct thread. If it is,
+ * it will run immediately. Otherwise, it will be enqueue on the vdo_work_queue associated with the
+ * completion's "callback_thread_id". When it is dequeued, it will be on the correct thread, and
+ * will get run. In some cases, the completion should get queued instead of running immediately,
+ * even if it is being launched from the correct thread. This is usually in cases where there is a
+ * long chain of callbacks, all on the same thread, which could overflow the stack. In such cases,
+ * the completion's "requeue" field should be set to true. Doing so will skip the current thread
+ * check and simply enqueue the completion.
+ *
+ * A completion may be "finished," in which case its "complete" field will be set to true before it
+ * is next run. It is a bug to attempt to set the result or re-finish a finished completion.
+ * Because a completion's fields are not safe to examine from any thread other than the one on
+ * which the completion is currently operating, this field is used only to aid in detecting
+ * programming errors. It can not be used for cross-thread checking on the status of an operation.
+ * A completion must be "reset" before it can be reused after it has been finished. Resetting will
+ * also clear any error from the result field.
+ **/
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+			       struct vdo *vdo,
+			       enum vdo_completion_type type)
+{
+	memset(completion, 0, sizeof(*completion));
+	completion->vdo = vdo;
+	completion->type = type;
+	vdo_reset_completion(completion);
+}
+
+static inline void assert_incomplete(struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(!completion->complete, "completion is not complete");
+}
+
+/**
+ * vdo_set_completion_result() - Set the result of a completion.
+ *
+ * Older errors will not be masked.
+ */
+void vdo_set_completion_result(struct vdo_completion *completion, int result)
+{
+	assert_incomplete(completion);
+	if (completion->result == VDO_SUCCESS)
+		completion->result = result;
+}
+
+/**
+ * vdo_launch_completion_with_priority() - Run or enqueue a completion.
+ * @priority: The priority at which to enqueue the completion.
+ *
+ * If called on the correct thread (i.e. the one specified in the completion's callback_thread_id
+ * field) and not marked for requeue, the completion will be run immediately. Otherwise, the
+ * completion will be enqueued on the specified thread.
+ */
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority)
+{
+	thread_id_t callback_thread = completion->callback_thread_id;
+
+	if (completion->requeue || (callback_thread != vdo_get_callback_thread_id())) {
+		vdo_enqueue_completion(completion, priority);
+		return;
+	}
+
+	vdo_run_completion(completion);
+}
+
+/** vdo_finish_completion() - Mark a completion as complete and then launch it. */
+void vdo_finish_completion(struct vdo_completion *completion)
+{
+	assert_incomplete(completion);
+	completion->complete = true;
+	if (completion->callback != NULL)
+		vdo_launch_completion(completion);
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority)
+{
+	struct vdo *vdo = completion->vdo;
+	thread_id_t thread_id = completion->callback_thread_id;
+
+	if (ASSERT(thread_id < vdo->thread_config.thread_count,
+		   "thread_id %u (completion type %d) is less than thread count %u",
+		   thread_id, completion->type,
+		   vdo->thread_config.thread_count) != UDS_SUCCESS)
+		BUG();
+
+	completion->requeue = false;
+	completion->priority = priority;
+	completion->my_queue = NULL;
+	vdo_enqueue_work_queue(vdo->threads[thread_id].queue, completion);
+}
+
+/**
+ * vdo_requeue_completion_if_needed() - Requeue a completion if not called on the specified thread.
+ *
+ * Return: True if the completion was requeued; callers may not access the completion in this case.
+ */
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id)
+{
+	if (vdo_get_callback_thread_id() == callback_thread_id)
+		return false;
+
+	completion->callback_thread_id = callback_thread_id;
+	vdo_enqueue_completion(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+	return true;
+}
diff --git a/drivers/md/dm-vdo/completion.h b/drivers/md/dm-vdo/completion.h
new file mode 100644
index 000000000000..aa145d73a686
--- /dev/null
+++ b/drivers/md/dm-vdo/completion.h
@@ -0,0 +1,152 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef VDO_COMPLETION_H
+#define VDO_COMPLETION_H
+
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+
+/**
+ * vdo_run_completion() - Run a completion's callback or error handler on the current thread.
+ *
+ * Context: This function must be called from the correct callback thread.
+ */
+static inline void vdo_run_completion(struct vdo_completion *completion)
+{
+	if ((completion->result != VDO_SUCCESS) && (completion->error_handler != NULL)) {
+		completion->error_handler(completion);
+		return;
+	}
+
+	completion->callback(completion);
+}
+
+void vdo_set_completion_result(struct vdo_completion *completion, int result);
+
+void vdo_initialize_completion(struct vdo_completion *completion, struct vdo *vdo,
+			       enum vdo_completion_type type);
+
+/**
+ * vdo_reset_completion() - Reset a completion to a clean state, while keeping the type, vdo and
+ *                          parent information.
+ */
+static inline void vdo_reset_completion(struct vdo_completion *completion)
+{
+	completion->result = VDO_SUCCESS;
+	completion->complete = false;
+}
+
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+					 enum vdo_completion_priority priority);
+
+/**
+ * vdo_launch_completion() - Launch a completion with default priority.
+ */
+static inline void vdo_launch_completion(struct vdo_completion *completion)
+{
+	vdo_launch_completion_with_priority(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+}
+
+/**
+ * vdo_continue_completion() - Continue processing a completion.
+ * @result: The current result (will not mask older errors).
+ *
+ * Continue processing a completion by setting the current result and calling
+ * vdo_launch_completion().
+ */
+static inline void vdo_continue_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_launch_completion(completion);
+}
+
+void vdo_finish_completion(struct vdo_completion *completion);
+
+/**
+ * vdo_fail_completion() - Set the result of a completion if it does not already have an error,
+ *                         then finish it.
+ */
+static inline void vdo_fail_completion(struct vdo_completion *completion, int result)
+{
+	vdo_set_completion_result(completion, result);
+	vdo_finish_completion(completion);
+}
+
+/**
+ * vdo_assert_completion_type() - Assert that a completion is of the correct type.
+ *
+ * Return: VDO_SUCCESS or an error
+ */
+static inline int vdo_assert_completion_type(struct vdo_completion *completion,
+					     enum vdo_completion_type expected)
+{
+	return ASSERT(expected == completion->type,
+		      "completion type should be %u, not %u", expected,
+		      completion->type);
+}
+
+static inline void vdo_set_completion_callback(struct vdo_completion *completion,
+					       vdo_action_fn callback,
+					       thread_id_t callback_thread_id)
+{
+	completion->callback = callback;
+	completion->callback_thread_id = callback_thread_id;
+}
+
+/**
+ * vdo_launch_completion_callback() - Set the callback for a completion and launch it immediately.
+ */
+static inline void vdo_launch_completion_callback(struct vdo_completion *completion,
+						  vdo_action_fn callback,
+						  thread_id_t callback_thread_id)
+{
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	vdo_launch_completion(completion);
+}
+
+/**
+ * vdo_prepare_completion() - Prepare a completion for launch.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void vdo_prepare_completion(struct vdo_completion *completion,
+					  vdo_action_fn callback,
+					  vdo_action_fn error_handler,
+					  thread_id_t callback_thread_id, void *parent)
+{
+	vdo_reset_completion(completion);
+	vdo_set_completion_callback(completion, callback, callback_thread_id);
+	completion->error_handler = error_handler;
+	completion->parent = parent;
+}
+
+/**
+ * vdo_prepare_completion_for_requeue() - Prepare a completion for launch ensuring that it will
+ *                                        always be requeued.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void vdo_prepare_completion_for_requeue(struct vdo_completion *completion,
+						      vdo_action_fn callback,
+						      vdo_action_fn error_handler,
+						      thread_id_t callback_thread_id,
+						      void *parent)
+{
+	vdo_prepare_completion(completion, callback, error_handler,
+			       callback_thread_id, parent);
+	completion->requeue = true;
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+			    enum vdo_completion_priority priority);
+
+
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+				      thread_id_t callback_thread_id);
+
+#endif /* VDO_COMPLETION_H */
diff --git a/drivers/md/dm-vdo/cpu.h b/drivers/md/dm-vdo/cpu.h
new file mode 100644
index 000000000000..d6a2615ba657
--- /dev/null
+++ b/drivers/md/dm-vdo/cpu.h
@@ -0,0 +1,59 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_CPU_H
+#define UDS_CPU_H
+
+#include <linux/cache.h>
+
+/**
+ * uds_prefetch_address() - Minimize cache-miss latency by attempting to move data into a CPU cache
+ *                          before it is accessed.
+ *
+ * @address: the address to fetch (may be invalid)
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_address(const void *address, bool for_write)
+{
+	/*
+	 * for_write won't be a constant if we are compiled with optimization turned off, in which
+	 * case prefetching really doesn't matter. clang can't figure out that if for_write is a
+	 * constant, it can be passed as the second, mandatorily constant argument to prefetch(),
+	 * at least currently on llvm 12.
+	 */
+	if (__builtin_constant_p(for_write)) {
+		if (for_write)
+			__builtin_prefetch(address, true);
+		else
+			__builtin_prefetch(address, false);
+	}
+}
+
+/**
+ * uds_prefetch_range() - Minimize cache-miss latency by attempting to move a range of addresses
+ *                        into a CPU cache before they are accessed.
+ *
+ * @start: the starting address to fetch (may be invalid)
+ * @size: the number of bytes in the address range
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_range(const void *start, unsigned int size,
+				      bool for_write)
+{
+	/*
+	 * Count the number of cache lines to fetch, allowing for the address range to span an
+	 * extra cache line boundary due to address alignment.
+	 */
+	const char *address = (const char *) start;
+	unsigned int offset = ((uintptr_t) address % L1_CACHE_BYTES);
+	unsigned int cache_lines = (1 + ((size + offset) / L1_CACHE_BYTES));
+
+	while (cache_lines-- > 0) {
+		uds_prefetch_address(address, for_write);
+		address += L1_CACHE_BYTES;
+	}
+}
+
+#endif /* UDS_CPU_H */
diff --git a/drivers/md/dm-vdo/funnel-queue.c b/drivers/md/dm-vdo/funnel-queue.c
new file mode 100644
index 000000000000..6940b282086d
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.c
@@ -0,0 +1,171 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "funnel-queue.h"
+
+#include "cpu.h"
+#include "memory-alloc.h"
+#include "permassert.h"
+#include "uds.h"
+
+int uds_make_funnel_queue(struct funnel_queue **queue_ptr)
+{
+	int result;
+	struct funnel_queue *queue;
+
+	result = uds_allocate(1, struct funnel_queue, "funnel queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	/*
+	 * Initialize the stub entry and put it in the queue, establishing the invariant that
+	 * queue->newest and queue->oldest are never null.
+	 */
+	queue->stub.next = NULL;
+	queue->newest = &queue->stub;
+	queue->oldest = &queue->stub;
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+void uds_free_funnel_queue(struct funnel_queue *queue)
+{
+	uds_free(queue);
+}
+
+static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
+{
+	/*
+	 * Barrier requirements: We need a read barrier between reading a "next" field pointer
+	 * value and reading anything it points to. There's an accompanying barrier in
+	 * uds_funnel_queue_put() between its caller setting up the entry and making it visible.
+	 */
+	struct funnel_queue_entry *oldest = queue->oldest;
+	struct funnel_queue_entry *next = READ_ONCE(oldest->next);
+
+	if (oldest == &queue->stub) {
+		/*
+		 * When the oldest entry is the stub and it has no successor, the queue is
+		 * logically empty.
+		 */
+		if (next == NULL)
+			return NULL;
+		/*
+		 * The stub entry has a successor, so the stub can be dequeued and ignored without
+		 * breaking the queue invariants.
+		 */
+		oldest = next;
+		queue->oldest = oldest;
+		next = READ_ONCE(oldest->next);
+	}
+
+	/*
+	 * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
+	 * stub entry back on the queue first.
+	 */
+	if (next == NULL) {
+		struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
+
+		if (oldest != newest) {
+			/*
+			 * Another thread has already swung queue->newest atomically, but not yet
+			 * assigned previous->next. The queue is really still empty.
+			 */
+			return NULL;
+		}
+
+		/*
+		 * Put the stub entry back on the queue, ensuring a successor will eventually be
+		 * seen.
+		 */
+		uds_funnel_queue_put(queue, &queue->stub);
+
+		/* Check again for a successor. */
+		next = READ_ONCE(oldest->next);
+		if (next == NULL) {
+			/*
+			 * We lost a race with a producer who swapped queue->newest before we did,
+			 * but who hasn't yet updated previous->next. Try again later.
+			 */
+			return NULL;
+		}
+	}
+
+	return oldest;
+}
+
+/*
+ * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
+ * called from a single consumer thread.
+ */
+struct funnel_queue_entry *uds_funnel_queue_poll(struct funnel_queue *queue)
+{
+	struct funnel_queue_entry *oldest = get_oldest(queue);
+
+	if (oldest == NULL)
+		return oldest;
+
+	/*
+	 * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
+	 * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
+	 * consumer and oldest->next is never used by a producer thread after it is swung from NULL
+	 * to non-NULL.
+	 */
+	queue->oldest = READ_ONCE(oldest->next);
+	/*
+	 * Make sure the caller sees the proper stored data for this entry. Since we've already
+	 * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
+	 * to the next call we'll properly see the dependent data.
+	 */
+	smp_rmb();
+	/*
+	 * If "oldest" is a very light-weight work item, we'll be looking for the next one very
+	 * soon, so prefetch it now.
+	 */
+	uds_prefetch_address(queue->oldest, true);
+	WRITE_ONCE(oldest->next, NULL);
+	return oldest;
+}
+
+/*
+ * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
+ * or more entries being added such that the list view is incomplete, this function will report the
+ * queue as empty.
+ */
+bool uds_is_funnel_queue_empty(struct funnel_queue *queue)
+{
+	return get_oldest(queue) == NULL;
+}
+
+/*
+ * Check whether the funnel queue is idle or not. If the queue has entries available to be
+ * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
+ * added such that the list view is incomplete, it may not be possible to retrieve an entry with
+ * the uds_funnel_queue_poll() function, but the queue will not be considered idle.
+ */
+bool uds_is_funnel_queue_idle(struct funnel_queue *queue)
+{
+	/*
+	 * Oldest is not the stub, so there's another entry, though if next is NULL we can't
+	 * retrieve it yet.
+	 */
+	if (queue->oldest != &queue->stub)
+		return false;
+
+	/*
+	 * Oldest is the stub, but newest has been updated by _put(); either there's another,
+	 * retrievable entry in the list, or the list is officially empty but in the intermediate
+	 * state of having an entry added.
+	 *
+	 * Whether anything is retrievable depends on whether stub.next has been updated and become
+	 * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
+	 * update to newest would be visible to us at the same time or sooner.
+	 */
+	if (READ_ONCE(queue->newest) != &queue->stub)
+		return false;
+
+	return true;
+}
diff --git a/drivers/md/dm-vdo/funnel-queue.h b/drivers/md/dm-vdo/funnel-queue.h
new file mode 100644
index 000000000000..88a30c593fdc
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-queue.h
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_FUNNEL_QUEUE_H
+#define UDS_FUNNEL_QUEUE_H
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+
+/*
+ * A funnel queue is a simple (almost) lock-free queue that accepts entries from multiple threads
+ * (multi-producer) and delivers them to a single thread (single-consumer). "Funnel" is an attempt
+ * to evoke the image of requests from more than one producer being "funneled down" to a single
+ * consumer.
+ *
+ * This is an unsynchronized but thread-safe data structure when used as intended. There is no
+ * mechanism to ensure that only one thread is consuming from the queue. If more than one thread
+ * attempts to consume from the queue, the resulting behavior is undefined. Clients must not
+ * directly access or manipulate the internals of the queue, which are only exposed for the purpose
+ * of allowing the very simple enqueue operation to be inlined.
+ *
+ * The implementation requires that a funnel_queue_entry structure (a link pointer) is embedded in
+ * the queue entries, and pointers to those structures are used exclusively by the queue. No macros
+ * are defined to template the queue, so the offset of the funnel_queue_entry in the records placed
+ * in the queue must all be the same so the client can derive their structure pointer from the
+ * entry pointer returned by uds_funnel_queue_poll().
+ *
+ * Callers are wholly responsible for allocating and freeing the entries. Entries may be freed as
+ * soon as they are returned since this queue is not susceptible to the "ABA problem" present in
+ * many lock-free data structures. The queue is dynamically allocated to ensure cache-line
+ * alignment, but no other dynamic allocation is used.
+ *
+ * The algorithm is not actually 100% lock-free. There is a single point in uds_funnel_queue_put()
+ * at which a preempted producer will prevent the consumers from seeing items added to the queue by
+ * later producers, and only if the queue is short enough or the consumer fast enough for it to
+ * reach what was the end of the queue at the time of the preemption.
+ *
+ * The consumer function, uds_funnel_queue_poll(), will return NULL when the queue is empty. To
+ * wait for data to consume, spin (if safe) or combine the queue with a struct event_count to
+ * signal the presence of new entries.
+ */
+
+/* This queue link structure must be embedded in client entries. */
+struct funnel_queue_entry {
+	/* The next (newer) entry in the queue. */
+	struct funnel_queue_entry *next;
+};
+
+/*
+ * The dynamically allocated queue structure, which is allocated on a cache line boundary so the
+ * producer and consumer fields in the structure will land on separate cache lines. This should be
+ * consider opaque but it is exposed here so uds_funnel_queue_put() can be inlined.
+ */
+struct __aligned(L1_CACHE_BYTES) funnel_queue {
+	/*
+	 * The producers' end of the queue, an atomically exchanged pointer that will never be
+	 * NULL.
+	 */
+	struct funnel_queue_entry *newest;
+
+	/* The consumer's end of the queue, which is owned by the consumer and never NULL. */
+	struct funnel_queue_entry *oldest __aligned(L1_CACHE_BYTES);
+
+	/* A dummy entry used to provide the non-NULL invariants above. */
+	struct funnel_queue_entry stub;
+};
+
+int __must_check uds_make_funnel_queue(struct funnel_queue **queue_ptr);
+
+void uds_free_funnel_queue(struct funnel_queue *queue);
+
+/*
+ * Put an entry on the end of the queue.
+ *
+ * The entry pointer must be to the struct funnel_queue_entry embedded in the caller's data
+ * structure. The caller must be able to derive the address of the start of their data structure
+ * from the pointer that passed in here, so every entry in the queue must have the struct
+ * funnel_queue_entry at the same offset within the client's structure.
+ */
+static inline void uds_funnel_queue_put(struct funnel_queue *queue,
+					struct funnel_queue_entry *entry)
+{
+	struct funnel_queue_entry *previous;
+
+	/*
+	 * Barrier requirements: All stores relating to the entry ("next" pointer, containing data
+	 * structure fields) must happen before the previous->next store making it visible to the
+	 * consumer. Also, the entry's "next" field initialization to NULL must happen before any
+	 * other producer threads can see the entry (the xchg) and try to update the "next" field.
+	 *
+	 * xchg implements a full barrier.
+	 */
+	WRITE_ONCE(entry->next, NULL);
+	previous = xchg(&queue->newest, entry);
+	/*
+	 * Preemptions between these two statements hide the rest of the queue from the consumer,
+	 * preventing consumption until the following assignment runs.
+	 */
+	WRITE_ONCE(previous->next, entry);
+}
+
+struct funnel_queue_entry *__must_check uds_funnel_queue_poll(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_empty(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_idle(struct funnel_queue *queue);
+
+#endif /* UDS_FUNNEL_QUEUE_H */
diff --git a/drivers/md/dm-vdo/funnel-requestqueue.c b/drivers/md/dm-vdo/funnel-requestqueue.c
new file mode 100644
index 000000000000..c8ba04c1089c
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-requestqueue.c
@@ -0,0 +1,283 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "funnel-requestqueue.h"
+
+#include <linux/atomic.h>
+#include <linux/compiler.h>
+#include <linux/wait.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "uds-threads.h"
+
+/*
+ * This queue will attempt to handle requests in reasonably sized batches instead of reacting
+ * immediately to each new request. The wait time between batches is dynamically adjusted up or
+ * down to try to balance responsiveness against wasted thread run time.
+ *
+ * If the wait time becomes long enough, the queue will become dormant and must be explicitly
+ * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
+ * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
+ * wakeup of the worker thread.
+ *
+ * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
+ * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
+ * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
+ * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
+ * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
+ * kick the worker thread out of dormant mode and back into timer-based mode.
+ *
+ * Unbatched requests are used to communicate between different zone threads and will also cause
+ * the queue to awaken immediately.
+ */
+
+enum {
+	NANOSECOND = 1,
+	MICROSECOND = 1000 * NANOSECOND,
+	MILLISECOND = 1000 * MICROSECOND,
+	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
+	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
+	MAXIMUM_WAIT_TIME = MILLISECOND,
+	MINIMUM_BATCH = 32,
+	MAXIMUM_BATCH = 64,
+};
+
+struct uds_request_queue {
+	/* Wait queue for synchronizing producers and consumer */
+	struct wait_queue_head wait_head;
+	/* Function to process a request */
+	uds_request_queue_processor_fn processor;
+	/* Queue of new incoming requests */
+	struct funnel_queue *main_queue;
+	/* Queue of old requests to retry */
+	struct funnel_queue *retry_queue;
+	/* The thread id of the worker thread */
+	struct thread *thread;
+	/* True if the worker was started */
+	bool started;
+	/* When true, requests can be enqueued */
+	bool running;
+	/* A flag set when the worker is waiting without a timeout */
+	atomic_t dormant;
+};
+
+static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
+{
+	struct funnel_queue_entry *entry;
+
+	entry = uds_funnel_queue_poll(queue->retry_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	entry = uds_funnel_queue_poll(queue->main_queue);
+	if (entry != NULL)
+		return container_of(entry, struct uds_request, queue_link);
+
+	return NULL;
+}
+
+static inline bool are_queues_idle(struct uds_request_queue *queue)
+{
+	return uds_is_funnel_queue_idle(queue->retry_queue) &&
+	       uds_is_funnel_queue_idle(queue->main_queue);
+}
+
+/*
+ * Determine if there is a next request to process, and return it if there is. Also return flags
+ * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
+ * the thread did sleep before returning a new request.
+ */
+static inline bool dequeue_request(struct uds_request_queue *queue,
+				   struct uds_request **request_ptr, bool *waited_ptr)
+{
+	struct uds_request *request = poll_queues(queue);
+
+	if (request != NULL) {
+		*request_ptr = request;
+		return true;
+	}
+
+	if (!READ_ONCE(queue->running)) {
+		/* Wake the worker thread so it can exit. */
+		*request_ptr = NULL;
+		return true;
+	}
+
+	*request_ptr = NULL;
+	*waited_ptr = true;
+	return false;
+}
+
+static void wait_for_request(struct uds_request_queue *queue, bool dormant,
+			     unsigned long timeout, struct uds_request **request,
+			     bool *waited)
+{
+	if (dormant) {
+		wait_event_interruptible(queue->wait_head,
+					 (dequeue_request(queue, request, waited) ||
+					  !are_queues_idle(queue)));
+		return;
+	}
+
+	wait_event_interruptible_hrtimeout(queue->wait_head,
+					   dequeue_request(queue, request, waited),
+					   ns_to_ktime(timeout));
+}
+
+static void request_queue_worker(void *arg)
+{
+	struct uds_request_queue *queue = arg;
+	struct uds_request *request = NULL;
+	unsigned long time_batch = DEFAULT_WAIT_TIME;
+	bool dormant = atomic_read(&queue->dormant);
+	bool waited = false;
+	long current_batch = 0;
+
+	for (;;) {
+		wait_for_request(queue, dormant, time_batch, &request, &waited);
+		if (likely(request != NULL)) {
+			current_batch++;
+			queue->processor(request);
+		} else if (!READ_ONCE(queue->running)) {
+			break;
+		}
+
+		if (dormant) {
+			/*
+			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
+			 * stop broadcasting. No fence is needed for this transition.
+			 */
+			atomic_set(&queue->dormant, false);
+			dormant = false;
+			time_batch = DEFAULT_WAIT_TIME;
+		} else if (waited) {
+			/*
+			 * We waited for this request to show up. Adjust the wait time to smooth
+			 * out the batch size.
+			 */
+			if (current_batch < MINIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too small, increase the wait
+				 * time.
+				 */
+				time_batch += time_batch / 4;
+				if (time_batch >= MAXIMUM_WAIT_TIME) {
+					atomic_set(&queue->dormant, true);
+					dormant = true;
+				}
+			} else if (current_batch > MAXIMUM_BATCH) {
+				/*
+				 * If the last batch of requests was too large, decrease the wait
+				 * time.
+				 */
+				time_batch -= time_batch / 4;
+				if (time_batch < MINIMUM_WAIT_TIME)
+					time_batch = MINIMUM_WAIT_TIME;
+			}
+			current_batch = 0;
+		}
+	}
+
+	/*
+	 * Ensure that we process any remaining requests that were enqueued before trying to shut
+	 * down. The corresponding write barrier is in uds_request_queue_finish().
+	 */
+	smp_rmb();
+	while ((request = poll_queues(queue)) != NULL)
+		queue->processor(request);
+}
+
+int uds_make_request_queue(const char *queue_name,
+			   uds_request_queue_processor_fn processor,
+			   struct uds_request_queue **queue_ptr)
+{
+	int result;
+	struct uds_request_queue *queue;
+
+	result = uds_allocate(1, struct uds_request_queue, __func__, &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->processor = processor;
+	queue->running = true;
+	atomic_set(&queue->dormant, false);
+	init_waitqueue_head(&queue->wait_head);
+
+	result = uds_make_funnel_queue(&queue->main_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_make_funnel_queue(&queue->retry_queue);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	result = uds_create_thread(request_queue_worker, queue, queue_name,
+				   &queue->thread);
+	if (result != UDS_SUCCESS) {
+		uds_request_queue_finish(queue);
+		return result;
+	}
+
+	queue->started = true;
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+static inline void wake_up_worker(struct uds_request_queue *queue)
+{
+	if (wq_has_sleeper(&queue->wait_head))
+		wake_up(&queue->wait_head);
+}
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue,
+			       struct uds_request *request)
+{
+	struct funnel_queue *sub_queue;
+	bool unbatched = request->unbatched;
+
+	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
+	uds_funnel_queue_put(sub_queue, &request->queue_link);
+
+	/*
+	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
+	 * we know the queue operation acts as one.
+	 */
+	if (atomic_read(&queue->dormant) || unbatched)
+		wake_up_worker(queue);
+}
+
+void uds_request_queue_finish(struct uds_request_queue *queue)
+{
+	int result;
+
+	if (queue == NULL)
+		return;
+
+	/*
+	 * This memory barrier ensures that any requests we queued will be seen. The point is that
+	 * when dequeue_request() sees the following update to the running flag, it will also be
+	 * able to see any change we made to a next field in the funnel queue entry. The
+	 * corresponding read barrier is in request_queue_worker().
+	 */
+	smp_wmb();
+	WRITE_ONCE(queue->running, false);
+
+	if (queue->started) {
+		wake_up_worker(queue);
+		result = uds_join_threads(queue->thread);
+		if (result != UDS_SUCCESS)
+			uds_log_warning_strerror(result, "Failed to join worker thread");
+	}
+
+	uds_free_funnel_queue(queue->main_queue);
+	uds_free_funnel_queue(queue->retry_queue);
+	uds_free(queue);
+}
diff --git a/drivers/md/dm-vdo/funnel-requestqueue.h b/drivers/md/dm-vdo/funnel-requestqueue.h
new file mode 100644
index 000000000000..e74c231fe269
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-requestqueue.h
@@ -0,0 +1,31 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_REQUEST_QUEUE_H
+#define UDS_REQUEST_QUEUE_H
+
+#include "uds.h"
+
+/*
+ * A simple request queue which will handle new requests in the order in which they are received,
+ * and will attempt to handle requeued requests before new ones. However, the nature of the
+ * implementation means that it cannot guarantee this ordering; the prioritization is merely a
+ * hint.
+ */
+
+struct uds_request_queue;
+
+typedef void (*uds_request_queue_processor_fn)(struct uds_request *);
+
+int __must_check uds_make_request_queue(const char *queue_name,
+					uds_request_queue_processor_fn processor,
+					struct uds_request_queue **queue_ptr);
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue,
+			       struct uds_request *request);
+
+void uds_request_queue_finish(struct uds_request_queue *queue);
+
+#endif /* UDS_REQUEST_QUEUE_H */
diff --git a/drivers/md/dm-vdo/funnel-workqueue.c b/drivers/md/dm-vdo/funnel-workqueue.c
new file mode 100644
index 000000000000..8f0ada13e549
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-workqueue.c
@@ -0,0 +1,638 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "funnel-workqueue.h"
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+#include <linux/completion.h>
+#include <linux/err.h>
+#include <linux/kthread.h>
+#include <linux/percpu.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "numeric.h"
+#include "permassert.h"
+#include "string-utils.h"
+
+#include "completion.h"
+#include "status-codes.h"
+
+static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
+
+/**
+ * DOC: Work queue definition.
+ *
+ * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
+ * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
+ * Externally, both are represented via the same common sub-structure, though there's actually not
+ * a great deal of overlap between the two types internally.
+ */
+struct vdo_work_queue {
+	/* Name of just the work queue (e.g., "cpuQ12") */
+	char *name;
+	bool round_robin_mode;
+	struct vdo_thread *owner;
+	/* Life cycle functions, etc */
+	const struct vdo_work_queue_type *type;
+};
+
+struct simple_work_queue {
+	struct vdo_work_queue common;
+	struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
+	void *private;
+
+	/*
+	 * The fields above are unchanged after setup but often read, and are good candidates for
+	 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
+	 * The fields below are often modified as we sleep and wake, so we want a separate cache
+	 * line for performance.
+	 */
+
+	/* Any (0 or 1) worker threads waiting for new work to do */
+	wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
+	/* Hack to reduce wakeup calls if the worker thread is running */
+	atomic_t idle;
+
+	/* These are infrequently used so in terms of performance we don't care where they land. */
+	struct task_struct *thread;
+	/* Notify creator once worker has initialized */
+	struct completion *started;
+};
+
+struct round_robin_work_queue {
+	struct vdo_work_queue common;
+	struct simple_work_queue **service_queues;
+	unsigned int num_service_queues;
+};
+
+static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ?
+		NULL : container_of(queue, struct simple_work_queue, common));
+}
+
+static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
+{
+	return ((queue == NULL) ?
+		 NULL :
+		 container_of(queue, struct round_robin_work_queue, common));
+}
+
+/* Processing normal completions. */
+
+/*
+ * Dequeue and return the next waiting completion, if any.
+ *
+ * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
+ * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
+ * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
+ * enforcement of priorities becomes necessary, this function will need fixing.
+ */
+static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
+{
+	int i;
+
+	for (i = queue->common.type->max_priority; i >= 0; i--) {
+		struct funnel_queue_entry *link = uds_funnel_queue_poll(queue->priority_lists[i]);
+
+		if (link != NULL)
+			return container_of(link, struct vdo_completion, work_queue_entry_link);
+	}
+
+	return NULL;
+}
+
+static void enqueue_work_queue_completion(struct simple_work_queue *queue,
+					  struct vdo_completion *completion)
+{
+	ASSERT_LOG_ONLY(completion->my_queue == NULL,
+			"completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
+			completion, completion->callback, queue, completion->my_queue);
+	if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
+		completion->priority = queue->common.type->default_priority;
+
+	if (ASSERT(completion->priority <= queue->common.type->max_priority,
+		   "priority is in range for queue") != VDO_SUCCESS)
+		completion->priority = 0;
+
+	completion->my_queue = &queue->common;
+
+	/* Funnel queue handles the synchronization for the put. */
+	uds_funnel_queue_put(queue->priority_lists[completion->priority],
+			     &completion->work_queue_entry_link);
+
+	/*
+	 * Due to how funnel queue synchronization is handled (just atomic operations), the
+	 * simplest safe implementation here would be to wake-up any waiting threads after
+	 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
+	 * item to the queue, the consumer thread may not see this since it is not guaranteed to
+	 * have the same view of the queue as a producer thread.
+	 *
+	 * However, the above is wasteful so instead we attempt to minimize the number of thread
+	 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
+	 * able to determine when the worker thread might be asleep or going to sleep. We use
+	 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
+	 * waking the worker thread, so multiple wakeups aren't tried at once.
+	 *
+	 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
+	 * first is any better or worse for other platforms, even other x86 configurations.
+	 */
+	smp_mb();
+	if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
+		return;
+
+	/* There's a maximum of one thread in this list. */
+	wake_up(&queue->waiting_worker_threads);
+}
+
+static void run_start_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->start != NULL)
+		queue->common.type->start(queue->private);
+}
+
+static void run_finish_hook(struct simple_work_queue *queue)
+{
+	if (queue->common.type->finish != NULL)
+		queue->common.type->finish(queue->private);
+}
+
+/*
+ * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
+ * for us to shut down.
+ *
+ * If kthread_should_stop says it's time to stop but we have pending completions return a
+ * completion.
+ *
+ * Also update statistics relating to scheduler interactions.
+ */
+static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
+{
+	struct vdo_completion *completion;
+	DEFINE_WAIT(wait);
+
+	while (true) {
+		prepare_to_wait(&queue->waiting_worker_threads, &wait,
+				TASK_INTERRUPTIBLE);
+		/*
+		 * Don't set the idle flag until a wakeup will not be lost.
+		 *
+		 * Force synchronization between setting the idle flag and checking the funnel
+		 * queue; the producer side will do them in the reverse order. (There's still a
+		 * race condition we've chosen to allow, because we've got a timeout below that
+		 * unwedges us if we hit it, but this may narrow the window a little.)
+		 */
+		atomic_set(&queue->idle, 1);
+		smp_mb(); /* store-load barrier between "idle" and funnel queue */
+
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+
+		/*
+		 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
+		 * above. Otherwise, schedule() will put the thread to sleep and might miss a
+		 * wakeup from kthread_stop() call in vdo_finish_work_queue().
+		 */
+		if (kthread_should_stop())
+			break;
+
+		schedule();
+
+		/*
+		 * Most of the time when we wake, it should be because there's work to do. If it
+		 * was a spurious wakeup, continue looping.
+		 */
+		completion = poll_for_completion(queue);
+		if (completion != NULL)
+			break;
+	}
+
+	finish_wait(&queue->waiting_worker_threads, &wait);
+	atomic_set(&queue->idle, 0);
+
+	return completion;
+}
+
+static void process_completion(struct simple_work_queue *queue,
+			       struct vdo_completion *completion)
+{
+	if (ASSERT(completion->my_queue == &queue->common,
+		   "completion %px from queue %px marked as being in this queue (%px)",
+		   completion, queue, completion->my_queue) == UDS_SUCCESS)
+		completion->my_queue = NULL;
+
+	vdo_run_completion(completion);
+}
+
+static void service_work_queue(struct simple_work_queue *queue)
+{
+	run_start_hook(queue);
+
+	while (true) {
+		struct vdo_completion *completion = poll_for_completion(queue);
+
+		if (completion == NULL)
+			completion = wait_for_next_completion(queue);
+
+		if (completion == NULL) {
+			/* No completions but kthread_should_stop() was triggered. */
+			break;
+		}
+
+		process_completion(queue, completion);
+
+		/*
+		 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
+		 * This speeds up some performance tests; that "other work" might include other VDO
+		 * threads.
+		 */
+		if (need_resched())
+			cond_resched();
+	}
+
+	run_finish_hook(queue);
+}
+
+static int work_queue_runner(void *ptr)
+{
+	struct simple_work_queue *queue = ptr;
+
+	complete(queue->started);
+	service_work_queue(queue);
+	return 0;
+}
+
+/* Creation & teardown */
+
+static void free_simple_work_queue(struct simple_work_queue *queue)
+{
+	unsigned int i;
+
+	for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
+		uds_free_funnel_queue(queue->priority_lists[i]);
+	uds_free(queue->common.name);
+	uds_free(queue);
+}
+
+static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	queue->service_queues = NULL;
+
+	for (i = 0; i < count; i++)
+		free_simple_work_queue(queue_table[i]);
+	uds_free(queue_table);
+	uds_free(queue->common.name);
+	uds_free(queue);
+}
+
+void vdo_free_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	vdo_finish_work_queue(queue);
+
+	if (queue->round_robin_mode)
+		free_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		free_simple_work_queue(as_simple_work_queue(queue));
+}
+
+static int make_simple_work_queue(const char *thread_name_prefix, const char *name,
+				  struct vdo_thread *owner, void *private,
+				  const struct vdo_work_queue_type *type,
+				  struct simple_work_queue **queue_ptr)
+{
+	DECLARE_COMPLETION_ONSTACK(started);
+	struct simple_work_queue *queue;
+	int i;
+	struct task_struct *thread = NULL;
+	int result;
+
+	ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
+			"queue priority count %u within limit %u", type->max_priority,
+			VDO_WORK_Q_MAX_PRIORITY);
+
+	result = uds_allocate(1, struct simple_work_queue, "simple work queue", &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	queue->private = private;
+	queue->started = &started;
+	queue->common.type = type;
+	queue->common.owner = owner;
+	init_waitqueue_head(&queue->waiting_worker_threads);
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		uds_free(queue);
+		return -ENOMEM;
+	}
+
+	for (i = 0; i <= type->max_priority; i++) {
+		result = uds_make_funnel_queue(&queue->priority_lists[i]);
+		if (result != UDS_SUCCESS) {
+			free_simple_work_queue(queue);
+			return result;
+		}
+	}
+
+	thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
+			     queue->common.name);
+	if (IS_ERR(thread)) {
+		free_simple_work_queue(queue);
+		return (int) PTR_ERR(thread);
+	}
+
+	queue->thread = thread;
+
+	/*
+	 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
+	 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
+	 * cleanup code.
+	 *
+	 * Eventually we should just make that path safe too, and then we won't need this
+	 * synchronization.
+	 */
+	wait_for_completion(&started);
+
+	*queue_ptr = queue;
+	return UDS_SUCCESS;
+}
+
+/**
+ * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
+ *                         be distributed to them in round-robin fashion.
+ *
+ * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
+ * of the actual number of queues and threads allocated here, code outside of the queue
+ * implementation will treat this as a single zone.
+ */
+int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
+			struct vdo_thread *owner, const struct vdo_work_queue_type *type,
+			unsigned int thread_count, void *thread_privates[],
+			struct vdo_work_queue **queue_ptr)
+{
+	struct round_robin_work_queue *queue;
+	int result;
+	char thread_name[TASK_COMM_LEN];
+	unsigned int i;
+
+	if (thread_count == 1) {
+		struct simple_work_queue *simple_queue;
+		void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
+
+		result = make_simple_work_queue(thread_name_prefix, name, owner, context,
+						type, &simple_queue);
+		if (result == VDO_SUCCESS)
+			*queue_ptr = &simple_queue->common;
+		return result;
+	}
+
+	result = uds_allocate(1, struct round_robin_work_queue, "round-robin work queue",
+			      &queue);
+	if (result != UDS_SUCCESS)
+		return result;
+
+	result = uds_allocate(thread_count, struct simple_work_queue *,
+			      "subordinate work queues", &queue->service_queues);
+	if (result != UDS_SUCCESS) {
+		uds_free(queue);
+		return result;
+	}
+
+	queue->num_service_queues = thread_count;
+	queue->common.round_robin_mode = true;
+	queue->common.owner = owner;
+
+	result = uds_duplicate_string(name, "queue name", &queue->common.name);
+	if (result != VDO_SUCCESS) {
+		uds_free(queue->service_queues);
+		uds_free(queue);
+		return -ENOMEM;
+	}
+
+	*queue_ptr = &queue->common;
+
+	for (i = 0; i < thread_count; i++) {
+		void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
+
+		snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
+		result = make_simple_work_queue(thread_name_prefix, thread_name, owner,
+						context, type, &queue->service_queues[i]);
+		if (result != VDO_SUCCESS) {
+			queue->num_service_queues = i;
+			/* Destroy previously created subordinates. */
+			vdo_free_work_queue(uds_forget(*queue_ptr));
+			return result;
+		}
+	}
+
+	return VDO_SUCCESS;
+}
+
+static void finish_simple_work_queue(struct simple_work_queue *queue)
+{
+	if (queue->thread == NULL)
+		return;
+
+	/* Tells the worker thread to shut down and waits for it to exit. */
+	kthread_stop(queue->thread);
+	queue->thread = NULL;
+}
+
+static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+	struct simple_work_queue **queue_table = queue->service_queues;
+	unsigned int count = queue->num_service_queues;
+	unsigned int i;
+
+	for (i = 0; i < count; i++)
+		finish_simple_work_queue(queue_table[i]);
+}
+
+/* No enqueueing of completions should be done once this function is called. */
+void vdo_finish_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue == NULL)
+		return;
+
+	if (queue->round_robin_mode)
+		finish_round_robin_work_queue(as_round_robin_work_queue(queue));
+	else
+		finish_simple_work_queue(as_simple_work_queue(queue));
+}
+
+/* Debugging dumps */
+
+static void dump_simple_work_queue(struct simple_work_queue *queue)
+{
+	const char *thread_status = "no threads";
+	char task_state_report = '-';
+
+	if (queue->thread != NULL) {
+		task_state_report = task_state_to_char(queue->thread);
+		thread_status = atomic_read(&queue->idle) ? "idle" : "running";
+	}
+
+	uds_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
+		     thread_status, task_state_report);
+
+	/* ->waiting_worker_threads wait queue status? anyone waiting? */
+}
+
+/*
+ * Write to the buffer some info about the completion, for logging. Since the common use case is
+ * dumping info about a lot of completions to syslog all at once, the format favors brevity over
+ * readability.
+ */
+void vdo_dump_work_queue(struct vdo_work_queue *queue)
+{
+	if (queue->round_robin_mode) {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+		unsigned int i;
+
+		for (i = 0; i < round_robin->num_service_queues; i++)
+			dump_simple_work_queue(round_robin->service_queues[i]);
+	} else {
+		dump_simple_work_queue(as_simple_work_queue(queue));
+	}
+}
+
+static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
+{
+	if (pointer == NULL) {
+		/*
+		 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
+		 * sometimes use this when logging lots of data; don't be so verbose.
+		 */
+		strscpy(buffer, "-", buffer_length);
+	} else {
+		/*
+		 * Use a pragma to defeat gcc's format checking, which doesn't understand that
+		 * "%ps" actually does support a precision spec in Linux kernel code.
+		 */
+		char *space;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wformat"
+		snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
+#pragma GCC diagnostic pop
+
+		space = strchr(buffer, ' ');
+		if (space != NULL)
+			*space = '\0';
+	}
+}
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
+				   size_t length)
+{
+	size_t current_length =
+		scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
+			  (completion->my_queue == NULL ? "-" : completion->my_queue->name));
+
+	if (current_length < length - 1) {
+		get_function_name((void *) completion->callback, buffer + current_length,
+				  length - current_length);
+	}
+}
+
+/* Completion submission */
+/*
+ * If the completion has a timeout that has already passed, the timeout handler function may be
+ * invoked by this function.
+ */
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
+			    struct vdo_completion *completion)
+{
+	/*
+	 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
+	 * on.
+	 */
+	struct simple_work_queue *simple_queue = NULL;
+
+	if (!queue->round_robin_mode) {
+		simple_queue = as_simple_work_queue(queue);
+	} else {
+		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+
+		/*
+		 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
+		 * Any patterns that might develop are likely to be disrupted by random ordering of
+		 * multiple completions and migration between cores, unless the load is so light as
+		 * to be regular in ordering of tasks and the threads are confined to individual
+		 * cores; with a load that light we won't care.
+		 */
+		unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
+		unsigned int index = rotor % round_robin->num_service_queues;
+
+		simple_queue = round_robin->service_queues[index];
+	}
+
+	enqueue_work_queue_completion(simple_queue, completion);
+}
+
+/* Misc */
+
+/*
+ * Return the work queue pointer recorded at initialization time in the work-queue stack handle
+ * initialized on the stack of the current thread, if any.
+ */
+static struct simple_work_queue *get_current_thread_work_queue(void)
+{
+	/*
+	 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
+	 * the queue for the thread which was interrupted. However, the interrupted thread may have
+	 * been processing a completion, in which case starting to process another would violate
+	 * our concurrency assumptions.
+	 */
+	if (in_interrupt())
+		return NULL;
+
+	if (kthread_func(current) != work_queue_runner)
+		/* Not a VDO work queue thread. */
+		return NULL;
+
+	return kthread_data(current);
+}
+
+struct vdo_work_queue *vdo_get_current_work_queue(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue == NULL) ? NULL : &queue->common;
+}
+
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
+{
+	return queue->owner;
+}
+
+/**
+ * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
+ *                                     queue, or NULL if none or if the current thread is not a
+ *                                     work queue thread.
+ */
+void *vdo_get_work_queue_private_data(void)
+{
+	struct simple_work_queue *queue = get_current_thread_work_queue();
+
+	return (queue != NULL) ? queue->private : NULL;
+}
+
+bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
+			    const struct vdo_work_queue_type *type)
+{
+	return (queue->type == type);
+}
diff --git a/drivers/md/dm-vdo/funnel-workqueue.h b/drivers/md/dm-vdo/funnel-workqueue.h
new file mode 100644
index 000000000000..b5be6e9e83bc
--- /dev/null
+++ b/drivers/md/dm-vdo/funnel-workqueue.h
@@ -0,0 +1,51 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef VDO_WORK_QUEUE_H
+#define VDO_WORK_QUEUE_H
+
+#include <linux/sched.h> /* for TASK_COMM_LEN */
+
+#include "types.h"
+
+enum {
+	MAX_VDO_WORK_QUEUE_NAME_LEN = TASK_COMM_LEN,
+};
+
+struct vdo_work_queue_type {
+	void (*start)(void *context);
+	void (*finish)(void *context);
+	enum vdo_completion_priority max_priority;
+	enum vdo_completion_priority default_priority;
+};
+
+struct vdo_completion;
+struct vdo_thread;
+struct vdo_work_queue;
+
+int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
+			struct vdo_thread *owner, const struct vdo_work_queue_type *type,
+			unsigned int thread_count, void *thread_privates[],
+			struct vdo_work_queue **queue_ptr);
+
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion);
+
+void vdo_finish_work_queue(struct vdo_work_queue *queue);
+
+void vdo_free_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
+				   size_t length);
+
+void *vdo_get_work_queue_private_data(void);
+struct vdo_work_queue *vdo_get_current_work_queue(void);
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue);
+
+bool __must_check vdo_work_queue_type_is(struct vdo_work_queue *queue,
+					 const struct vdo_work_queue_type *type);
+
+#endif /* VDO_WORK_QUEUE_H */
-- 
2.40.0





[Index of Archives]     [DM Crypt]     [Fedora Desktop]     [ATA RAID]     [Fedora Marketing]     [Fedora Packaging]     [Fedora SELinux]     [Yosemite Discussion]     [KDE Users]     [Fedora Docs]

  Powered by Linux