new file mode 100644
@@ -0,0 +1,141 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "completion.h"
+
+#include <linux/kernel.h>
+
+#include "logger.h"
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+#include "vio.h"
+#include "vdo.h"
+
+/**
+ * DOC: vdo completions.
+ *
+ * Most of vdo's data structures are lock free, each either belonging to a single "zone," or
+ * divided into a number of zones whose accesses to the structure do not overlap. During normal
+ * operation, at most one thread will be operating in any given zone. Each zone has a
+ * vdo_work_queue which holds vdo_completions that are to be run in that zone. A completion may
+ * only be enqueued on one queue or operating in a single zone at a time.
+ *
+ * At each step of a multi-threaded operation, the completion performing the operation is given a
+ * callback, error handler, and thread id for the next step. A completion is "run" when it is
+ * operating on the correct thread (as specified by its callback_thread_id). If the value of its
+ * "result" field is an error (i.e. not VDO_SUCCESS), the function in its "error_handler" will be
+ * invoked. If the error_handler is NULL, or there is no error, the function set as its "callback"
+ * will be invoked. Generally, a completion will not be run directly, but rather will be
+ * "launched." In this case, it will check whether it is operating on the correct thread. If it is,
+ * it will run immediately. Otherwise, it will be enqueue on the vdo_work_queue associated with the
+ * completion's "callback_thread_id". When it is dequeued, it will be on the correct thread, and
+ * will get run. In some cases, the completion should get queued instead of running immediately,
+ * even if it is being launched from the correct thread. This is usually in cases where there is a
+ * long chain of callbacks, all on the same thread, which could overflow the stack. In such cases,
+ * the completion's "requeue" field should be set to true. Doing so will skip the current thread
+ * check and simply enqueue the completion.
+ *
+ * A completion may be "finished," in which case its "complete" field will be set to true before it
+ * is next run. It is a bug to attempt to set the result or re-finish a finished completion.
+ * Because a completion's fields are not safe to examine from any thread other than the one on
+ * which the completion is currently operating, this field is used only to aid in detecting
+ * programming errors. It can not be used for cross-thread checking on the status of an operation.
+ * A completion must be "reset" before it can be reused after it has been finished. Resetting will
+ * also clear any error from the result field.
+ **/
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+ struct vdo *vdo,
+ enum vdo_completion_type type)
+{
+ memset(completion, 0, sizeof(*completion));
+ completion->vdo = vdo;
+ completion->type = type;
+ vdo_reset_completion(completion);
+}
+
+static inline void assert_incomplete(struct vdo_completion *completion)
+{
+ ASSERT_LOG_ONLY(!completion->complete, "completion is not complete");
+}
+
+/**
+ * vdo_set_completion_result() - Set the result of a completion.
+ *
+ * Older errors will not be masked.
+ */
+void vdo_set_completion_result(struct vdo_completion *completion, int result)
+{
+ assert_incomplete(completion);
+ if (completion->result == VDO_SUCCESS)
+ completion->result = result;
+}
+
+/**
+ * vdo_launch_completion_with_priority() - Run or enqueue a completion.
+ * @priority: The priority at which to enqueue the completion.
+ *
+ * If called on the correct thread (i.e. the one specified in the completion's callback_thread_id
+ * field) and not marked for requeue, the completion will be run immediately. Otherwise, the
+ * completion will be enqueued on the specified thread.
+ */
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+ enum vdo_completion_priority priority)
+{
+ thread_id_t callback_thread = completion->callback_thread_id;
+
+ if (completion->requeue || (callback_thread != vdo_get_callback_thread_id())) {
+ vdo_enqueue_completion(completion, priority);
+ return;
+ }
+
+ vdo_run_completion(completion);
+}
+
+/** vdo_finish_completion() - Mark a completion as complete and then launch it. */
+void vdo_finish_completion(struct vdo_completion *completion)
+{
+ assert_incomplete(completion);
+ completion->complete = true;
+ if (completion->callback != NULL)
+ vdo_launch_completion(completion);
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+ enum vdo_completion_priority priority)
+{
+ struct vdo *vdo = completion->vdo;
+ thread_id_t thread_id = completion->callback_thread_id;
+
+ if (ASSERT(thread_id < vdo->thread_config.thread_count,
+ "thread_id %u (completion type %d) is less than thread count %u",
+ thread_id,
+ completion->type,
+ vdo->thread_config.thread_count) != UDS_SUCCESS)
+ BUG();
+
+ completion->requeue = false;
+ completion->priority = priority;
+ completion->my_queue = NULL;
+ vdo_enqueue_work_queue(vdo->threads[thread_id].queue, completion);
+}
+
+/**
+ * vdo_requeue_completion_if_needed() - Requeue a completion if not called on the specified thread.
+ *
+ * Return: True if the completion was requeued; callers may not access the completion in this case.
+ */
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+ thread_id_t callback_thread_id)
+{
+ if (vdo_get_callback_thread_id() == callback_thread_id)
+ return false;
+
+ completion->callback_thread_id = callback_thread_id;
+ vdo_enqueue_completion(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+ return true;
+}
new file mode 100644
@@ -0,0 +1,155 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef VDO_COMPLETION_H
+#define VDO_COMPLETION_H
+
+#include "permassert.h"
+
+#include "status-codes.h"
+#include "types.h"
+
+/**
+ * vdo_run_completion() - Run a completion's callback or error handler on the current thread.
+ *
+ * Context: This function must be called from the correct callback thread.
+ */
+static inline void vdo_run_completion(struct vdo_completion *completion)
+{
+ if ((completion->result != VDO_SUCCESS) && (completion->error_handler != NULL)) {
+ completion->error_handler(completion);
+ return;
+ }
+
+ completion->callback(completion);
+}
+
+void vdo_set_completion_result(struct vdo_completion *completion, int result);
+
+void vdo_initialize_completion(struct vdo_completion *completion,
+ struct vdo *vdo,
+ enum vdo_completion_type type);
+
+/**
+ * vdo_reset_completion() - Reset a completion to a clean state, while keeping the type, vdo and
+ * parent information.
+ */
+static inline void vdo_reset_completion(struct vdo_completion *completion)
+{
+ completion->result = VDO_SUCCESS;
+ completion->complete = false;
+}
+
+void vdo_launch_completion_with_priority(struct vdo_completion *completion,
+ enum vdo_completion_priority priority);
+
+/**
+ * vdo_launch_completion() - Launch a completion with default priority.
+ */
+static inline void vdo_launch_completion(struct vdo_completion *completion)
+{
+ vdo_launch_completion_with_priority(completion, VDO_WORK_Q_DEFAULT_PRIORITY);
+}
+
+/**
+ * vdo_continue_completion() - Continue processing a completion.
+ * @result: The current result (will not mask older errors).
+ *
+ * Continue processing a completion by setting the current result and calling
+ * vdo_launch_completion().
+ */
+static inline void vdo_continue_completion(struct vdo_completion *completion, int result)
+{
+ vdo_set_completion_result(completion, result);
+ vdo_launch_completion(completion);
+}
+
+void vdo_finish_completion(struct vdo_completion *completion);
+
+/**
+ * vdo_fail_completion() - Set the result of a completion if it does not already have an error,
+ * then finish it.
+ */
+static inline void vdo_fail_completion(struct vdo_completion *completion, int result)
+{
+ vdo_set_completion_result(completion, result);
+ vdo_finish_completion(completion);
+}
+
+/**
+ * vdo_assert_completion_type() - Assert that a completion is of the correct type.
+ *
+ * Return: VDO_SUCCESS or an error
+ */
+static inline int
+vdo_assert_completion_type(struct vdo_completion *completion, enum vdo_completion_type expected)
+{
+ return ASSERT(expected == completion->type,
+ "completion type should be %u, not %u",
+ expected,
+ completion->type);
+}
+
+static inline void vdo_set_completion_callback(struct vdo_completion *completion,
+ vdo_action *callback,
+ thread_id_t callback_thread_id)
+{
+ completion->callback = callback;
+ completion->callback_thread_id = callback_thread_id;
+}
+
+/**
+ * vdo_launch_completion_callback() - Set the callback for a completion and launch it immediately.
+ */
+static inline void vdo_launch_completion_callback(struct vdo_completion *completion,
+ vdo_action *callback,
+ thread_id_t callback_thread_id)
+{
+ vdo_set_completion_callback(completion, callback, callback_thread_id);
+ vdo_launch_completion(completion);
+}
+
+/**
+ * vdo_prepare_completion() - Prepare a completion for launch.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void vdo_prepare_completion(struct vdo_completion *completion,
+ vdo_action *callback,
+ vdo_action *error_handler,
+ thread_id_t callback_thread_id,
+ void *parent)
+{
+ vdo_reset_completion(completion);
+ vdo_set_completion_callback(completion, callback, callback_thread_id);
+ completion->error_handler = error_handler;
+ completion->parent = parent;
+}
+
+/**
+ * vdo_prepare_completion_for_requeue() - Prepare a completion for launch ensuring that it will
+ * always be requeued.
+ *
+ * Resets the completion, and then sets its callback, error handler, callback thread, and parent.
+ */
+static inline void
+vdo_prepare_completion_for_requeue(struct vdo_completion *completion,
+ vdo_action *callback,
+ vdo_action *error_handler,
+ thread_id_t callback_thread_id,
+ void *parent)
+{
+ vdo_prepare_completion(completion, callback, error_handler, callback_thread_id, parent);
+ completion->requeue = true;
+}
+
+void vdo_enqueue_completion(struct vdo_completion *completion,
+ enum vdo_completion_priority priority);
+
+
+bool vdo_requeue_completion_if_needed(struct vdo_completion *completion,
+ thread_id_t callback_thread_id);
+
+#endif /* VDO_COMPLETION_H */
new file mode 100644
@@ -0,0 +1,58 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_CPU_H
+#define UDS_CPU_H
+
+#include <linux/cache.h>
+
+/**
+ * uds_prefetch_address() - Minimize cache-miss latency by attempting to move data into a CPU cache
+ * before it is accessed.
+ *
+ * @address: the address to fetch (may be invalid)
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_address(const void *address, bool for_write)
+{
+ /*
+ * for_write won't be a constant if we are compiled with optimization turned off, in which
+ * case prefetching really doesn't matter. clang can't figure out that if for_write is a
+ * constant, it can be passed as the second, mandatorily constant argument to prefetch(),
+ * at least currently on llvm 12.
+ */
+ if (__builtin_constant_p(for_write)) {
+ if (for_write)
+ __builtin_prefetch(address, true);
+ else
+ __builtin_prefetch(address, false);
+ }
+}
+
+/**
+ * uds_prefetch_range() - Minimize cache-miss latency by attempting to move a range of addresses
+ * into a CPU cache before they are accessed.
+ *
+ * @start: the starting address to fetch (may be invalid)
+ * @size: the number of bytes in the address range
+ * @for_write: must be constant at compile time--false if for reading, true if for writing
+ */
+static inline void uds_prefetch_range(const void *start, unsigned int size, bool for_write)
+{
+ /*
+ * Count the number of cache lines to fetch, allowing for the address range to span an
+ * extra cache line boundary due to address alignment.
+ */
+ const char *address = (const char *) start;
+ unsigned int offset = ((uintptr_t) address % L1_CACHE_BYTES);
+ unsigned int cache_lines = (1 + ((size + offset) / L1_CACHE_BYTES));
+
+ while (cache_lines-- > 0) {
+ uds_prefetch_address(address, for_write);
+ address += L1_CACHE_BYTES;
+ }
+}
+
+#endif /* UDS_CPU_H */
new file mode 100644
@@ -0,0 +1,169 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "funnel-queue.h"
+
+#include "cpu.h"
+#include "memory-alloc.h"
+#include "permassert.h"
+#include "uds.h"
+
+int uds_make_funnel_queue(struct funnel_queue **queue_ptr)
+{
+ int result;
+ struct funnel_queue *queue;
+
+ result = UDS_ALLOCATE(1, struct funnel_queue, "funnel queue", &queue);
+ if (result != UDS_SUCCESS)
+ return result;
+
+ /*
+ * Initialize the stub entry and put it in the queue, establishing the invariant that
+ * queue->newest and queue->oldest are never null.
+ */
+ queue->stub.next = NULL;
+ queue->newest = &queue->stub;
+ queue->oldest = &queue->stub;
+
+ *queue_ptr = queue;
+ return UDS_SUCCESS;
+}
+
+void uds_free_funnel_queue(struct funnel_queue *queue)
+{
+ UDS_FREE(queue);
+}
+
+static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
+{
+ /*
+ * Barrier requirements: We need a read barrier between reading a "next" field pointer
+ * value and reading anything it points to. There's an accompanying barrier in
+ * uds_funnel_queue_put() between its caller setting up the entry and making it visible.
+ */
+ struct funnel_queue_entry *oldest = queue->oldest;
+ struct funnel_queue_entry *next = READ_ONCE(oldest->next);
+
+ if (oldest == &queue->stub) {
+ /*
+ * When the oldest entry is the stub and it has no successor, the queue is
+ * logically empty.
+ */
+ if (next == NULL)
+ return NULL;
+ /*
+ * The stub entry has a successor, so the stub can be dequeued and ignored without
+ * breaking the queue invariants.
+ */
+ oldest = next;
+ queue->oldest = oldest;
+ next = READ_ONCE(oldest->next);
+ }
+
+ /*
+ * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
+ * stub entry back on the queue first.
+ */
+ if (next == NULL) {
+ struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
+
+ if (oldest != newest)
+ /*
+ * Another thread has already swung queue->newest atomically, but not yet
+ * assigned previous->next. The queue is really still empty.
+ */
+ return NULL;
+
+ /*
+ * Put the stub entry back on the queue, ensuring a successor will eventually be
+ * seen.
+ */
+ uds_funnel_queue_put(queue, &queue->stub);
+
+ /* Check again for a successor. */
+ next = READ_ONCE(oldest->next);
+ if (next == NULL)
+ /*
+ * We lost a race with a producer who swapped queue->newest before we did,
+ * but who hasn't yet updated previous->next. Try again later.
+ */
+ return NULL;
+ }
+
+ return oldest;
+}
+
+/*
+ * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
+ * called from a single consumer thread.
+ */
+struct funnel_queue_entry *uds_funnel_queue_poll(struct funnel_queue *queue)
+{
+ struct funnel_queue_entry *oldest = get_oldest(queue);
+
+ if (oldest == NULL)
+ return oldest;
+
+ /*
+ * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
+ * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
+ * consumer and oldest->next is never used by a producer thread after it is swung from NULL
+ * to non-NULL.
+ */
+ queue->oldest = READ_ONCE(oldest->next);
+ /*
+ * Make sure the caller sees the proper stored data for this entry. Since we've already
+ * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
+ * to the next call we'll properly see the dependent data.
+ */
+ smp_rmb();
+ /*
+ * If "oldest" is a very light-weight work item, we'll be looking for the next one very
+ * soon, so prefetch it now.
+ */
+ uds_prefetch_address(queue->oldest, true);
+ WRITE_ONCE(oldest->next, NULL);
+ return oldest;
+}
+
+/*
+ * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
+ * or more entries being added such that the list view is incomplete, this function will report the
+ * queue as empty.
+ */
+bool uds_is_funnel_queue_empty(struct funnel_queue *queue)
+{
+ return get_oldest(queue) == NULL;
+}
+
+/*
+ * Check whether the funnel queue is idle or not. If the queue has entries available to be
+ * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
+ * added such that the list view is incomplete, it may not be possible to retrieve an entry with
+ * the uds_funnel_queue_poll() function, but the queue will not be considered idle.
+ */
+bool uds_is_funnel_queue_idle(struct funnel_queue *queue)
+{
+ /*
+ * Oldest is not the stub, so there's another entry, though if next is NULL we can't
+ * retrieve it yet.
+ */
+ if (queue->oldest != &queue->stub)
+ return false;
+
+ /*
+ * Oldest is the stub, but newest has been updated by _put(); either there's another,
+ * retrievable entry in the list, or the list is officially empty but in the intermediate
+ * state of having an entry added.
+ *
+ * Whether anything is retrievable depends on whether stub.next has been updated and become
+ * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
+ * update to newest would be visible to us at the same time or sooner.
+ */
+ if (READ_ONCE(queue->newest) != &queue->stub)
+ return false;
+
+ return true;
+}
new file mode 100644
@@ -0,0 +1,110 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_FUNNEL_QUEUE_H
+#define UDS_FUNNEL_QUEUE_H
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+
+/*
+ * A funnel queue is a simple (almost) lock-free queue that accepts entries from multiple threads
+ * (multi-producer) and delivers them to a single thread (single-consumer). "Funnel" is an attempt
+ * to evoke the image of requests from more than one producer being "funneled down" to a single
+ * consumer.
+ *
+ * This is an unsynchronized but thread-safe data structure when used as intended. There is no
+ * mechanism to ensure that only one thread is consuming from the queue. If more than one thread
+ * attempts to consume from the queue, the resulting behavior is undefined. Clients must not
+ * directly access or manipulate the internals of the queue, which are only exposed for the purpose
+ * of allowing the very simple enqueue operation to be inlined.
+ *
+ * The implementation requires that a funnel_queue_entry structure (a link pointer) is embedded in
+ * the queue entries, and pointers to those structures are used exclusively by the queue. No macros
+ * are defined to template the queue, so the offset of the funnel_queue_entry in the records placed
+ * in the queue must all be the same so the client can derive their structure pointer from the
+ * entry pointer returned by uds_funnel_queue_poll().
+ *
+ * Callers are wholly responsible for allocating and freeing the entries. Entries may be freed as
+ * soon as they are returned since this queue is not susceptible to the "ABA problem" present in
+ * many lock-free data structures. The queue is dynamically allocated to ensure cache-line
+ * alignment, but no other dynamic allocation is used.
+ *
+ * The algorithm is not actually 100% lock-free. There is a single point in uds_funnel_queue_put()
+ * at which a preempted producer will prevent the consumers from seeing items added to the queue by
+ * later producers, and only if the queue is short enough or the consumer fast enough for it to
+ * reach what was the end of the queue at the time of the preemption.
+ *
+ * The consumer function, uds_funnel_queue_poll(), will return NULL when the queue is empty. To
+ * wait for data to consume, spin (if safe) or combine the queue with a struct event_count to
+ * signal the presence of new entries.
+ */
+
+/* This queue link structure must be embedded in client entries. */
+struct funnel_queue_entry {
+ /* The next (newer) entry in the queue. */
+ struct funnel_queue_entry *next;
+};
+
+/*
+ * The dynamically allocated queue structure, which is allocated on a cache line boundary so the
+ * producer and consumer fields in the structure will land on separate cache lines. This should be
+ * consider opaque but it is exposed here so uds_funnel_queue_put() can be inlined.
+ */
+struct __aligned(L1_CACHE_BYTES) funnel_queue {
+ /*
+ * The producers' end of the queue, an atomically exchanged pointer that will never be
+ * NULL.
+ */
+ struct funnel_queue_entry *newest;
+
+ /* The consumer's end of the queue, which is owned by the consumer and never NULL. */
+ struct funnel_queue_entry *oldest __aligned(L1_CACHE_BYTES);
+
+ /* A dummy entry used to provide the non-NULL invariants above. */
+ struct funnel_queue_entry stub;
+};
+
+int __must_check uds_make_funnel_queue(struct funnel_queue **queue_ptr);
+
+void uds_free_funnel_queue(struct funnel_queue *queue);
+
+/*
+ * Put an entry on the end of the queue.
+ *
+ * The entry pointer must be to the struct funnel_queue_entry embedded in the caller's data
+ * structure. The caller must be able to derive the address of the start of their data structure
+ * from the pointer that passed in here, so every entry in the queue must have the struct
+ * funnel_queue_entry at the same offset within the client's structure.
+ */
+static inline void
+uds_funnel_queue_put(struct funnel_queue *queue, struct funnel_queue_entry *entry)
+{
+ struct funnel_queue_entry *previous;
+
+ /*
+ * Barrier requirements: All stores relating to the entry ("next" pointer, containing data
+ * structure fields) must happen before the previous->next store making it visible to the
+ * consumer. Also, the entry's "next" field initialization to NULL must happen before any
+ * other producer threads can see the entry (the xchg) and try to update the "next" field.
+ *
+ * xchg implements a full barrier.
+ */
+ WRITE_ONCE(entry->next, NULL);
+ previous = xchg(&queue->newest, entry);
+ /*
+ * Preemptions between these two statements hide the rest of the queue from the consumer,
+ * preventing consumption until the following assignment runs.
+ */
+ WRITE_ONCE(previous->next, entry);
+}
+
+struct funnel_queue_entry *__must_check uds_funnel_queue_poll(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_empty(struct funnel_queue *queue);
+
+bool __must_check uds_is_funnel_queue_idle(struct funnel_queue *queue);
+
+#endif /* UDS_FUNNEL_QUEUE_H */
new file mode 100644
@@ -0,0 +1,284 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "request-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/compiler.h>
+#include <linux/wait.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "uds-threads.h"
+
+/*
+ * This queue will attempt to handle requests in reasonably sized batches instead of reacting
+ * immediately to each new request. The wait time between batches is dynamically adjusted up or
+ * down to try to balance responsiveness against wasted thread run time.
+ *
+ * If the wait time becomes long enough, the queue will become dormant and must be explicitly
+ * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
+ * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
+ * wakeup of the worker thread.
+ *
+ * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
+ * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
+ * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
+ * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
+ * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
+ * kick the worker thread out of dormant mode and back into timer-based mode.
+ *
+ * Unbatched requests are used to communicate between different zone threads and will also cause
+ * the queue to awaken immediately.
+ */
+
+enum {
+ NANOSECOND = 1,
+ MICROSECOND = 1000 * NANOSECOND,
+ MILLISECOND = 1000 * MICROSECOND,
+ DEFAULT_WAIT_TIME = 20 * MICROSECOND,
+ MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
+ MAXIMUM_WAIT_TIME = MILLISECOND,
+ MINIMUM_BATCH = 32,
+ MAXIMUM_BATCH = 64,
+};
+
+struct uds_request_queue {
+ /* Wait queue for synchronizing producers and consumer */
+ struct wait_queue_head wait_head;
+ /* Function to process a request */
+ uds_request_queue_processor_t *processor;
+ /* Queue of new incoming requests */
+ struct funnel_queue *main_queue;
+ /* Queue of old requests to retry */
+ struct funnel_queue *retry_queue;
+ /* The thread id of the worker thread */
+ struct thread *thread;
+ /* True if the worker was started */
+ bool started;
+ /* When true, requests can be enqueued */
+ bool running;
+ /* A flag set when the worker is waiting without a timeout */
+ atomic_t dormant;
+};
+
+static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
+{
+ struct funnel_queue_entry *entry;
+
+ entry = uds_funnel_queue_poll(queue->retry_queue);
+ if (entry != NULL)
+ return container_of(entry, struct uds_request, queue_link);
+
+ entry = uds_funnel_queue_poll(queue->main_queue);
+ if (entry != NULL)
+ return container_of(entry, struct uds_request, queue_link);
+
+ return NULL;
+}
+
+static inline bool are_queues_idle(struct uds_request_queue *queue)
+{
+ return uds_is_funnel_queue_idle(queue->retry_queue) &&
+ uds_is_funnel_queue_idle(queue->main_queue);
+}
+
+/*
+ * Determine if there is a next request to process, and return it if there is. Also return flags
+ * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
+ * the thread did sleep before returning a new request.
+ */
+static inline bool dequeue_request(struct uds_request_queue *queue,
+ struct uds_request **request_ptr,
+ bool *waited_ptr)
+{
+ struct uds_request *request = poll_queues(queue);
+
+ if (request != NULL) {
+ *request_ptr = request;
+ return true;
+ }
+
+ if (!READ_ONCE(queue->running)) {
+ /* Wake the worker thread so it can exit. */
+ *request_ptr = NULL;
+ return true;
+ }
+
+ *request_ptr = NULL;
+ *waited_ptr = true;
+ return false;
+}
+
+static void wait_for_request(struct uds_request_queue *queue,
+ bool dormant,
+ unsigned long timeout,
+ struct uds_request **request,
+ bool *waited)
+{
+ if (dormant) {
+ wait_event_interruptible(queue->wait_head,
+ (dequeue_request(queue, request, waited) ||
+ !are_queues_idle(queue)));
+ return;
+ }
+
+ wait_event_interruptible_hrtimeout(queue->wait_head,
+ dequeue_request(queue, request, waited),
+ ns_to_ktime(timeout));
+}
+
+static void request_queue_worker(void *arg)
+{
+ struct uds_request_queue *queue = (struct uds_request_queue *) arg;
+ struct uds_request *request = NULL;
+ unsigned long time_batch = DEFAULT_WAIT_TIME;
+ bool dormant = atomic_read(&queue->dormant);
+ bool waited = false;
+ long current_batch = 0;
+
+ for (;;) {
+ wait_for_request(queue, dormant, time_batch, &request, &waited);
+ if (likely(request != NULL)) {
+ current_batch++;
+ queue->processor(request);
+ } else if (!READ_ONCE(queue->running)) {
+ break;
+ }
+
+ if (dormant) {
+ /*
+ * The queue has been roused from dormancy. Clear the flag so enqueuers can
+ * stop broadcasting. No fence is needed for this transition.
+ */
+ atomic_set(&queue->dormant, false);
+ dormant = false;
+ time_batch = DEFAULT_WAIT_TIME;
+ } else if (waited) {
+ /*
+ * We waited for this request to show up. Adjust the wait time to smooth
+ * out the batch size.
+ */
+ if (current_batch < MINIMUM_BATCH) {
+ /*
+ * If the last batch of requests was too small, increase the wait
+ * time.
+ */
+ time_batch += time_batch / 4;
+ if (time_batch >= MAXIMUM_WAIT_TIME) {
+ atomic_set(&queue->dormant, true);
+ dormant = true;
+ }
+ } else if (current_batch > MAXIMUM_BATCH) {
+ /*
+ * If the last batch of requests was too large, decrease the wait
+ * time.
+ */
+ time_batch -= time_batch / 4;
+ if (time_batch < MINIMUM_WAIT_TIME)
+ time_batch = MINIMUM_WAIT_TIME;
+ }
+ current_batch = 0;
+ }
+ }
+
+ /*
+ * Ensure that we process any remaining requests that were enqueued before trying to shut
+ * down. The corresponding write barrier is in uds_request_queue_finish().
+ */
+ smp_rmb();
+ while ((request = poll_queues(queue)) != NULL)
+ queue->processor(request);
+}
+
+int uds_make_request_queue(const char *queue_name,
+ uds_request_queue_processor_t *processor,
+ struct uds_request_queue **queue_ptr)
+{
+ int result;
+ struct uds_request_queue *queue;
+
+ result = UDS_ALLOCATE(1, struct uds_request_queue, __func__, &queue);
+ if (result != UDS_SUCCESS)
+ return result;
+
+ queue->processor = processor;
+ queue->running = true;
+ atomic_set(&queue->dormant, false);
+ init_waitqueue_head(&queue->wait_head);
+
+ result = uds_make_funnel_queue(&queue->main_queue);
+ if (result != UDS_SUCCESS) {
+ uds_request_queue_finish(queue);
+ return result;
+ }
+
+ result = uds_make_funnel_queue(&queue->retry_queue);
+ if (result != UDS_SUCCESS) {
+ uds_request_queue_finish(queue);
+ return result;
+ }
+
+ result = uds_create_thread(request_queue_worker, queue, queue_name, &queue->thread);
+ if (result != UDS_SUCCESS) {
+ uds_request_queue_finish(queue);
+ return result;
+ }
+
+ queue->started = true;
+ *queue_ptr = queue;
+ return UDS_SUCCESS;
+}
+
+static inline void wake_up_worker(struct uds_request_queue *queue)
+{
+ if (wq_has_sleeper(&queue->wait_head))
+ wake_up(&queue->wait_head);
+}
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request)
+{
+ struct funnel_queue *sub_queue;
+ bool unbatched = request->unbatched;
+
+ sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
+ uds_funnel_queue_put(sub_queue, &request->queue_link);
+
+ /*
+ * We must wake the worker thread when it is dormant. A read fence isn't needed here since
+ * we know the queue operation acts as one.
+ */
+ if (atomic_read(&queue->dormant) || unbatched)
+ wake_up_worker(queue);
+}
+
+void uds_request_queue_finish(struct uds_request_queue *queue)
+{
+ int result;
+
+ if (queue == NULL)
+ return;
+
+ /*
+ * This memory barrier ensures that any requests we queued will be seen. The point is that
+ * when dequeue_request() sees the following update to the running flag, it will also be
+ * able to see any change we made to a next field in the funnel queue entry. The
+ * corresponding read barrier is in request_queue_worker().
+ */
+ smp_wmb();
+ WRITE_ONCE(queue->running, false);
+
+ if (queue->started) {
+ wake_up_worker(queue);
+ result = uds_join_threads(queue->thread);
+ if (result != UDS_SUCCESS)
+ uds_log_warning_strerror(result, "Failed to join worker thread");
+ }
+
+ uds_free_funnel_queue(queue->main_queue);
+ uds_free_funnel_queue(queue->retry_queue);
+ UDS_FREE(queue);
+}
new file mode 100644
@@ -0,0 +1,30 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef UDS_REQUEST_QUEUE_H
+#define UDS_REQUEST_QUEUE_H
+
+#include "uds.h"
+
+/*
+ * A simple request queue which will handle new requests in the order in which they are received,
+ * and will attempt to handle requeued requests before new ones. However, the nature of the
+ * implementation means that it cannot guarantee this ordering; the prioritization is merely a
+ * hint.
+ */
+
+struct uds_request_queue;
+
+typedef void uds_request_queue_processor_t(struct uds_request *);
+
+int __must_check uds_make_request_queue(const char *queue_name,
+ uds_request_queue_processor_t *processor,
+ struct uds_request_queue **queue_ptr);
+
+void uds_request_queue_enqueue(struct uds_request_queue *queue, struct uds_request *request);
+
+void uds_request_queue_finish(struct uds_request_queue *queue);
+
+#endif /* UDS_REQUEST_QUEUE_H */
new file mode 100644
@@ -0,0 +1,659 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#include "work-queue.h"
+
+#include <linux/atomic.h>
+#include <linux/cache.h>
+#include <linux/completion.h>
+#include <linux/err.h>
+#include <linux/kthread.h>
+#include <linux/percpu.h>
+
+#include "funnel-queue.h"
+#include "logger.h"
+#include "memory-alloc.h"
+#include "numeric.h"
+#include "permassert.h"
+#include "string-utils.h"
+
+#include "completion.h"
+#include "status-codes.h"
+
+static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
+
+/**
+ * DOC: Work queue definition.
+ *
+ * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
+ * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
+ * Externally, both are represented via the same common sub-structure, though there's actually not
+ * a great deal of overlap between the two types internally.
+ */
+struct vdo_work_queue {
+ /* Name of just the work queue (e.g., "cpuQ12") */
+ char *name;
+ bool round_robin_mode;
+ struct vdo_thread *owner;
+ /* Life cycle functions, etc */
+ const struct vdo_work_queue_type *type;
+};
+
+struct simple_work_queue {
+ struct vdo_work_queue common;
+ struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
+ void *private;
+
+ /*
+ * The fields above are unchanged after setup but often read, and are good candidates for
+ * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
+ * The fields below are often modified as we sleep and wake, so we want a separate cache
+ * line for performance.
+ */
+
+ /* Any (0 or 1) worker threads waiting for new work to do */
+ wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
+ /* Hack to reduce wakeup calls if the worker thread is running */
+ atomic_t idle;
+
+ /* These are infrequently used so in terms of performance we don't care where they land. */
+ struct task_struct *thread;
+ /* Notify creator once worker has initialized */
+ struct completion *started;
+};
+
+struct round_robin_work_queue {
+ struct vdo_work_queue common;
+ struct simple_work_queue **service_queues;
+ unsigned int num_service_queues;
+};
+
+static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
+{
+ return ((queue == NULL) ? NULL : container_of(queue, struct simple_work_queue, common));
+}
+
+static inline struct round_robin_work_queue *
+as_round_robin_work_queue(struct vdo_work_queue *queue)
+{
+ return ((queue == NULL) ?
+ NULL :
+ container_of(queue, struct round_robin_work_queue, common));
+}
+
+/* Processing normal completions. */
+
+/*
+ * Dequeue and return the next waiting completion, if any.
+ *
+ * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
+ * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
+ * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
+ * enforcement of priorities becomes necessary, this function will need fixing.
+ */
+static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
+{
+ int i;
+
+ for (i = queue->common.type->max_priority; i >= 0; i--) {
+ struct funnel_queue_entry *link = uds_funnel_queue_poll(queue->priority_lists[i]);
+
+ if (link != NULL)
+ return container_of(link, struct vdo_completion, work_queue_entry_link);
+ }
+
+ return NULL;
+}
+
+static void
+enqueue_work_queue_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+ ASSERT_LOG_ONLY(completion->my_queue == NULL,
+ "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
+ completion,
+ completion->callback,
+ queue,
+ completion->my_queue);
+ if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
+ completion->priority = queue->common.type->default_priority;
+
+ if (ASSERT(completion->priority <= queue->common.type->max_priority,
+ "priority is in range for queue") != VDO_SUCCESS)
+ completion->priority = 0;
+
+ completion->my_queue = &queue->common;
+
+ /* Funnel queue handles the synchronization for the put. */
+ uds_funnel_queue_put(queue->priority_lists[completion->priority],
+ &completion->work_queue_entry_link);
+
+ /*
+ * Due to how funnel queue synchronization is handled (just atomic operations), the
+ * simplest safe implementation here would be to wake-up any waiting threads after
+ * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
+ * item to the queue, the consumer thread may not see this since it is not guaranteed to
+ * have the same view of the queue as a producer thread.
+ *
+ * However, the above is wasteful so instead we attempt to minimize the number of thread
+ * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
+ * able to determine when the worker thread might be asleep or going to sleep. We use
+ * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
+ * waking the worker thread, so multiple wakeups aren't tried at once.
+ *
+ * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
+ * first is any better or worse for other platforms, even other x86 configurations.
+ */
+ smp_mb();
+ if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
+ return;
+
+ /* There's a maximum of one thread in this list. */
+ wake_up(&queue->waiting_worker_threads);
+}
+
+static void run_start_hook(struct simple_work_queue *queue)
+{
+ if (queue->common.type->start != NULL)
+ queue->common.type->start(queue->private);
+}
+
+static void run_finish_hook(struct simple_work_queue *queue)
+{
+ if (queue->common.type->finish != NULL)
+ queue->common.type->finish(queue->private);
+}
+
+/*
+ * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
+ * for us to shut down.
+ *
+ * If kthread_should_stop says it's time to stop but we have pending completions return a
+ * completion.
+ *
+ * Also update statistics relating to scheduler interactions.
+ */
+static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
+{
+ struct vdo_completion *completion;
+ DEFINE_WAIT(wait);
+
+ while (true) {
+ prepare_to_wait(&queue->waiting_worker_threads, &wait, TASK_INTERRUPTIBLE);
+ /*
+ * Don't set the idle flag until a wakeup will not be lost.
+ *
+ * Force synchronization between setting the idle flag and checking the funnel
+ * queue; the producer side will do them in the reverse order. (There's still a
+ * race condition we've chosen to allow, because we've got a timeout below that
+ * unwedges us if we hit it, but this may narrow the window a little.)
+ */
+ atomic_set(&queue->idle, 1);
+ smp_mb(); /* store-load barrier between "idle" and funnel queue */
+
+ completion = poll_for_completion(queue);
+ if (completion != NULL)
+ break;
+
+ /*
+ * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
+ * above. Otherwise, schedule() will put the thread to sleep and might miss a
+ * wakeup from kthread_stop() call in vdo_finish_work_queue().
+ */
+ if (kthread_should_stop())
+ break;
+
+ schedule();
+
+ /*
+ * Most of the time when we wake, it should be because there's work to do. If it
+ * was a spurious wakeup, continue looping.
+ */
+ completion = poll_for_completion(queue);
+ if (completion != NULL)
+ break;
+ }
+
+ finish_wait(&queue->waiting_worker_threads, &wait);
+ atomic_set(&queue->idle, 0);
+
+ return completion;
+}
+
+static void process_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
+{
+ if (ASSERT(completion->my_queue == &queue->common,
+ "completion %px from queue %px marked as being in this queue (%px)",
+ completion,
+ queue,
+ completion->my_queue) == UDS_SUCCESS)
+ completion->my_queue = NULL;
+
+ vdo_run_completion(completion);
+}
+
+static void service_work_queue(struct simple_work_queue *queue)
+{
+ run_start_hook(queue);
+
+ while (true) {
+ struct vdo_completion *completion = poll_for_completion(queue);
+
+ if (completion == NULL)
+ completion = wait_for_next_completion(queue);
+
+ if (completion == NULL)
+ /* No completions but kthread_should_stop() was triggered. */
+ break;
+
+ process_completion(queue, completion);
+
+ /*
+ * Be friendly to a CPU that has other work to do, if the kernel has told us to.
+ * This speeds up some performance tests; that "other work" might include other VDO
+ * threads.
+ */
+ if (need_resched())
+ cond_resched();
+ }
+
+ run_finish_hook(queue);
+}
+
+static int work_queue_runner(void *ptr)
+{
+ struct simple_work_queue *queue = ptr;
+
+ complete(queue->started);
+ service_work_queue(queue);
+ return 0;
+}
+
+/* Creation & teardown */
+
+static void free_simple_work_queue(struct simple_work_queue *queue)
+{
+ unsigned int i;
+
+ for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
+ uds_free_funnel_queue(queue->priority_lists[i]);
+ UDS_FREE(queue->common.name);
+ UDS_FREE(queue);
+}
+
+static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+ struct simple_work_queue **queue_table = queue->service_queues;
+ unsigned int count = queue->num_service_queues;
+ unsigned int i;
+
+ queue->service_queues = NULL;
+
+ for (i = 0; i < count; i++)
+ free_simple_work_queue(queue_table[i]);
+ UDS_FREE(queue_table);
+ UDS_FREE(queue->common.name);
+ UDS_FREE(queue);
+}
+
+void vdo_free_work_queue(struct vdo_work_queue *queue)
+{
+ if (queue == NULL)
+ return;
+
+ vdo_finish_work_queue(queue);
+
+ if (queue->round_robin_mode)
+ free_round_robin_work_queue(as_round_robin_work_queue(queue));
+ else
+ free_simple_work_queue(as_simple_work_queue(queue));
+}
+
+static int make_simple_work_queue(const char *thread_name_prefix,
+ const char *name,
+ struct vdo_thread *owner,
+ void *private,
+ const struct vdo_work_queue_type *type,
+ struct simple_work_queue **queue_ptr)
+{
+ DECLARE_COMPLETION_ONSTACK(started);
+ struct simple_work_queue *queue;
+ int i;
+ struct task_struct *thread = NULL;
+ int result;
+
+ ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
+ "queue priority count %u within limit %u",
+ type->max_priority,
+ VDO_WORK_Q_MAX_PRIORITY);
+
+ result = UDS_ALLOCATE(1, struct simple_work_queue, "simple work queue", &queue);
+ if (result != UDS_SUCCESS)
+ return result;
+
+ queue->private = private;
+ queue->started = &started;
+ queue->common.type = type;
+ queue->common.owner = owner;
+ init_waitqueue_head(&queue->waiting_worker_threads);
+
+ result = uds_duplicate_string(name, "queue name", &queue->common.name);
+ if (result != VDO_SUCCESS) {
+ UDS_FREE(queue);
+ return -ENOMEM;
+ }
+
+ for (i = 0; i <= type->max_priority; i++) {
+ result = uds_make_funnel_queue(&queue->priority_lists[i]);
+ if (result != UDS_SUCCESS) {
+ free_simple_work_queue(queue);
+ return result;
+ }
+ }
+
+ thread = kthread_run(work_queue_runner,
+ queue,
+ "%s:%s",
+ thread_name_prefix,
+ queue->common.name);
+ if (IS_ERR(thread)) {
+ free_simple_work_queue(queue);
+ return (int) PTR_ERR(thread);
+ }
+
+ queue->thread = thread;
+
+ /*
+ * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
+ * errors elsewhere) could cause it to never get as far as running VDO, skipping the
+ * cleanup code.
+ *
+ * Eventually we should just make that path safe too, and then we won't need this
+ * synchronization.
+ */
+ wait_for_completion(&started);
+
+ *queue_ptr = queue;
+ return UDS_SUCCESS;
+}
+
+/**
+ * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
+ * be distributed to them in round-robin fashion.
+ *
+ * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
+ * of the actual number of queues and threads allocated here, code outside of the queue
+ * implementation will treat this as a single zone.
+ */
+int vdo_make_work_queue(const char *thread_name_prefix,
+ const char *name,
+ struct vdo_thread *owner,
+ const struct vdo_work_queue_type *type,
+ unsigned int thread_count,
+ void *thread_privates[],
+ struct vdo_work_queue **queue_ptr)
+{
+ struct round_robin_work_queue *queue;
+ int result;
+ char thread_name[TASK_COMM_LEN];
+ unsigned int i;
+
+ if (thread_count == 1) {
+ struct simple_work_queue *simple_queue;
+ void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
+
+ result = make_simple_work_queue(thread_name_prefix,
+ name,
+ owner,
+ context,
+ type,
+ &simple_queue);
+ if (result == VDO_SUCCESS)
+ *queue_ptr = &simple_queue->common;
+ return result;
+ }
+
+ result = UDS_ALLOCATE(1, struct round_robin_work_queue, "round-robin work queue", &queue);
+ if (result != UDS_SUCCESS)
+ return result;
+
+ result = UDS_ALLOCATE(thread_count,
+ struct simple_work_queue *,
+ "subordinate work queues",
+ &queue->service_queues);
+ if (result != UDS_SUCCESS) {
+ UDS_FREE(queue);
+ return result;
+ }
+
+ queue->num_service_queues = thread_count;
+ queue->common.round_robin_mode = true;
+ queue->common.owner = owner;
+
+ result = uds_duplicate_string(name, "queue name", &queue->common.name);
+ if (result != VDO_SUCCESS) {
+ UDS_FREE(queue->service_queues);
+ UDS_FREE(queue);
+ return -ENOMEM;
+ }
+
+ *queue_ptr = &queue->common;
+
+ for (i = 0; i < thread_count; i++) {
+ void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
+
+ snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
+ result = make_simple_work_queue(thread_name_prefix,
+ thread_name,
+ owner,
+ context,
+ type,
+ &queue->service_queues[i]);
+ if (result != VDO_SUCCESS) {
+ queue->num_service_queues = i;
+ /* Destroy previously created subordinates. */
+ vdo_free_work_queue(UDS_FORGET(*queue_ptr));
+ return result;
+ }
+ }
+
+ return VDO_SUCCESS;
+}
+
+static void finish_simple_work_queue(struct simple_work_queue *queue)
+{
+ if (queue->thread == NULL)
+ return;
+
+ /* Tells the worker thread to shut down and waits for it to exit. */
+ kthread_stop(queue->thread);
+ queue->thread = NULL;
+}
+
+static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
+{
+ struct simple_work_queue **queue_table = queue->service_queues;
+ unsigned int count = queue->num_service_queues;
+ unsigned int i;
+
+ for (i = 0; i < count; i++)
+ finish_simple_work_queue(queue_table[i]);
+}
+
+/* No enqueueing of completions should be done once this function is called. */
+void vdo_finish_work_queue(struct vdo_work_queue *queue)
+{
+ if (queue == NULL)
+ return;
+
+ if (queue->round_robin_mode)
+ finish_round_robin_work_queue(as_round_robin_work_queue(queue));
+ else
+ finish_simple_work_queue(as_simple_work_queue(queue));
+}
+
+/* Debugging dumps */
+
+static void dump_simple_work_queue(struct simple_work_queue *queue)
+{
+ const char *thread_status = "no threads";
+ char task_state_report = '-';
+
+ if (queue->thread != NULL) {
+ task_state_report = task_state_to_char(queue->thread);
+ thread_status = atomic_read(&queue->idle) ? "idle" : "running";
+ }
+
+ uds_log_info("workQ %px (%s) %s (%c)",
+ &queue->common,
+ queue->common.name,
+ thread_status,
+ task_state_report);
+
+ /* ->waiting_worker_threads wait queue status? anyone waiting? */
+}
+
+/*
+ * Write to the buffer some info about the completion, for logging. Since the common use case is
+ * dumping info about a lot of completions to syslog all at once, the format favors brevity over
+ * readability.
+ */
+void vdo_dump_work_queue(struct vdo_work_queue *queue)
+{
+ if (queue->round_robin_mode) {
+ struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+ unsigned int i;
+
+ for (i = 0; i < round_robin->num_service_queues; i++)
+ dump_simple_work_queue(round_robin->service_queues[i]);
+ } else {
+ dump_simple_work_queue(as_simple_work_queue(queue));
+ }
+}
+
+static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
+{
+ if (pointer == NULL) {
+ /*
+ * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
+ * sometimes use this when logging lots of data; don't be so verbose.
+ */
+ strscpy(buffer, "-", buffer_length);
+ } else {
+ /*
+ * Use a pragma to defeat gcc's format checking, which doesn't understand that
+ * "%ps" actually does support a precision spec in Linux kernel code.
+ */
+ char *space;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wformat"
+ snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
+#pragma GCC diagnostic pop
+
+ space = strchr(buffer, ' ');
+ if (space != NULL)
+ *space = '\0';
+ }
+}
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length)
+{
+ size_t current_length =
+ scnprintf(buffer,
+ length,
+ "%.*s/",
+ TASK_COMM_LEN,
+ (completion->my_queue == NULL ? "-" : completion->my_queue->name));
+
+ if (current_length < length - 1)
+ get_function_name((void *) completion->callback,
+ buffer + current_length,
+ length - current_length);
+}
+
+/* Completion submission */
+/*
+ * If the completion has a timeout that has already passed, the timeout handler function may be
+ * invoked by this function.
+ */
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion)
+{
+ /*
+ * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
+ * on.
+ */
+ struct simple_work_queue *simple_queue = NULL;
+
+ if (!queue->round_robin_mode) {
+ simple_queue = as_simple_work_queue(queue);
+ } else {
+ struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
+
+ /*
+ * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
+ * Any patterns that might develop are likely to be disrupted by random ordering of
+ * multiple completions and migration between cores, unless the load is so light as
+ * to be regular in ordering of tasks and the threads are confined to individual
+ * cores; with a load that light we won't care.
+ */
+ unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
+ unsigned int index = rotor % round_robin->num_service_queues;
+
+ simple_queue = round_robin->service_queues[index];
+ }
+
+ enqueue_work_queue_completion(simple_queue, completion);
+}
+
+/* Misc */
+
+/*
+ * Return the work queue pointer recorded at initialization time in the work-queue stack handle
+ * initialized on the stack of the current thread, if any.
+ */
+static struct simple_work_queue *get_current_thread_work_queue(void)
+{
+ /*
+ * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
+ * the queue for the thread which was interrupted. However, the interrupted thread may have
+ * been processing a completion, in which case starting to process another would violate
+ * our concurrency assumptions.
+ */
+ if (in_interrupt())
+ return NULL;
+ if (kthread_func(current) != work_queue_runner)
+ /* Not a VDO work queue thread. */
+ return NULL;
+ return kthread_data(current);
+}
+
+struct vdo_work_queue *vdo_get_current_work_queue(void)
+{
+ struct simple_work_queue *queue = get_current_thread_work_queue();
+
+ return (queue == NULL) ? NULL : &queue->common;
+}
+
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
+{
+ return queue->owner;
+}
+
+/**
+ * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
+ * queue, or NULL if none or if the current thread is not a
+ * work queue thread.
+ */
+void *vdo_get_work_queue_private_data(void)
+{
+ struct simple_work_queue *queue = get_current_thread_work_queue();
+
+ return (queue != NULL) ? queue->private : NULL;
+}
+
+bool vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type)
+{
+ return (queue->type == type);
+}
new file mode 100644
@@ -0,0 +1,53 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * Copyright 2023 Red Hat
+ */
+
+#ifndef VDO_WORK_QUEUE_H
+#define VDO_WORK_QUEUE_H
+
+#include <linux/sched.h> /* for TASK_COMM_LEN */
+
+#include "types.h"
+
+enum {
+ MAX_VDO_WORK_QUEUE_NAME_LEN = TASK_COMM_LEN,
+};
+
+struct vdo_work_queue_type {
+ void (*start)(void *context);
+ void (*finish)(void *context);
+ enum vdo_completion_priority max_priority;
+ enum vdo_completion_priority default_priority;
+};
+
+struct vdo_completion;
+struct vdo_thread;
+struct vdo_work_queue;
+
+int vdo_make_work_queue(const char *thread_name_prefix,
+ const char *name,
+ struct vdo_thread *owner,
+ const struct vdo_work_queue_type *type,
+ unsigned int thread_count,
+ void *thread_privates[],
+ struct vdo_work_queue **queue_ptr);
+
+void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion);
+
+void vdo_finish_work_queue(struct vdo_work_queue *queue);
+
+void vdo_free_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_work_queue(struct vdo_work_queue *queue);
+
+void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer, size_t length);
+
+void *vdo_get_work_queue_private_data(void);
+struct vdo_work_queue *vdo_get_current_work_queue(void);
+struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue);
+
+bool __must_check
+vdo_work_queue_type_is(struct vdo_work_queue *queue, const struct vdo_work_queue_type *type);
+
+#endif /* VDO_WORK_QUEUE_H */